1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred.decompose;
17
18
19 import net.sf.eos.EosException;
20 import net.sf.eos.analyzer.ResettableTokenizer;
21 import net.sf.eos.analyzer.SentenceTokenizer;
22 import net.sf.eos.analyzer.TextBuilder;
23 import net.sf.eos.analyzer.TokenizerSupplier;
24 import net.sf.eos.analyzer.TextBuilder.SpaceBuilder;
25 import net.sf.eos.config.Configuration;
26 import net.sf.eos.config.HadoopConfigurationAdapter;
27 import net.sf.eos.config.Service;
28 import net.sf.eos.config.Services;
29 import net.sf.eos.document.EosDocument;
30 import net.sf.eos.hadoop.mapred.AbstractKeyGenerator;
31 import static net.sf.eos.hadoop.mapred.AbstractKeyGenerator.ABSTRACT_KEY_GENERATOR_IMPL_CONFIG_NAME;
32 import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
33 import net.sf.eos.hadoop.mapred.Index;
34 import net.sf.eos.hadoop.mapred.KeyGenerator;
35 import net.sf.eos.sentence.Sentencer;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.io.LongWritable;
40 import org.apache.hadoop.io.Text;
41 import org.apache.hadoop.mapred.JobConf;
42 import org.apache.hadoop.mapred.Mapper;
43 import org.apache.hadoop.mapred.OutputCollector;
44 import org.apache.hadoop.mapred.Reporter;
45
46 import java.io.IOException;
47 import java.util.Map;
48 import java.util.Map.Entry;
49
50 @Services(
51 services={
52 @Service(
53 factory=AbstractKeyGenerator.class,
54 implementation=TextMetaKeyGenerator.class,
55 description="Tokenizer for coocurence analyzing."
56 ),
57 @Service(
58 factory=TextBuilder.class,
59 implementation=SpaceBuilder.class
60 ),
61 @Service(
62 factory=TokenizerSupplier.class
63 ),
64 @Service(
65 factory=Sentencer.class
66 )
67 }
68 )
69 public class SentencerMapper extends EosDocumentSupportMapReduceBase
70 implements Mapper<LongWritable, Text, Text, Text> {
71
72
73 private static final Log LOG =
74 LogFactory.getLog(SentencerMapper.class.getName());
75
76 private JobConf conf;
77
78 public void map(final LongWritable positionInFile,
79 final Text eosDoc,
80 final OutputCollector<Text, Text> outputCollector,
81 final Reporter reporter) throws IOException {
82
83 final Configuration config = new HadoopConfigurationAdapter(this.conf);
84
85 try {
86 final EosDocument doc = textToEosDocument(eosDoc);
87 final TokenizerSupplier tokenBuilder =
88 TokenizerSupplier.newInstance(config);
89 if (LOG.isDebugEnabled()) {
90 LOG.debug("TokenizerBuilder instanceof "
91 + tokenBuilder.getClass());
92 }
93 final TextBuilder textBuilder =
94 TextBuilder.newInstance(config);
95 if (LOG.isDebugEnabled()) {
96 LOG.debug("TextBuilder instanceof " + textBuilder.getClass());
97 }
98 final Sentencer sentencer = Sentencer.newInstance(config);
99 if (LOG.isDebugEnabled()) {
100 LOG.debug("Sentencer instanceof " + sentencer.getClass());
101 }
102
103 final ResettableTokenizer tokenizer =
104 tokenBuilder.get();
105 final SentenceTokenizer sentenceTokenizer =
106 new SentenceTokenizer();
107
108 final Map<String, EosDocument> docs =
109 sentencer.toSentenceDocuments(doc,
110 sentenceTokenizer,
111 tokenizer,
112 textBuilder);
113
114 final KeyGenerator<Text> generator = newGenerator();
115 for (final Entry<String, EosDocument> entry : docs.entrySet()) {
116 final String key = entry.getKey();
117 final EosDocument toWrite = entry.getValue();
118 final Map<Text, EosDocument> toStore =
119 generator.createKeysForDocument(toWrite);
120
121 for (final Entry<Text, EosDocument> e : toStore.entrySet()) {
122 final Text textKey = e.getKey();
123 final String asString = textKey.toString();
124 final String addKey = key + "+" + asString;
125 final Text keyAsText = new Text(addKey);
126 final Text docAsText = eosDocumentToText(toWrite);
127 outputCollector.collect(keyAsText, docAsText);
128 reporter.incrCounter(Index.MAP, 1);
129 }
130 }
131
132 } catch (final EosException e) {
133 reporter.incrCounter(Index.EOS_EXCEPTION, 1);
134 throw new IOException(e.getMessage());
135 } catch (final Exception e) {
136 if (e instanceof RuntimeException) {
137 throw (RuntimeException) e;
138 }
139 reporter.incrCounter(Index.IO_EXCEPTION, 1);
140 throw new IOException("" + e.getClass() + " - " + e.getMessage());
141 }
142 }
143
144 protected KeyGenerator<Text> newGenerator() throws EosException {
145 final Configuration lconf = new Configuration();
146 HadoopConfigurationAdapter.addHadoopConfigToEosConfig(this.conf, lconf);
147
148 final String implName =
149 lconf.get(ABSTRACT_KEY_GENERATOR_IMPL_CONFIG_NAME);
150 if (implName == null || implName.length() == 0) {
151 lconf.set(ABSTRACT_KEY_GENERATOR_IMPL_CONFIG_NAME,
152 TextMetaKeyGenerator.class.getName());
153 }
154 final KeyGenerator<Text> newInstance =
155 (KeyGenerator<Text>) AbstractKeyGenerator.newInstance(lconf);
156
157 if (LOG.isDebugEnabled()) {
158 LOG.debug("KeyGenerator<Text> instance is "
159 + newInstance.getClass());
160 }
161 return newInstance;
162 }
163
164 @Override
165 public void configure(final JobConf conf) {
166 super.configure(conf);
167 this.conf = conf;
168 }
169
170 @Override
171 public void close() throws IOException {
172 super.close();
173 }
174 }