1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred.cooccurrence;
17
18 import net.sf.eos.EosException;
19 import net.sf.eos.analyzer.ResettableTokenizer;
20 import net.sf.eos.analyzer.TokenizerSupplier;
21 import net.sf.eos.analyzer.TokenizerException;
22 import net.sf.eos.config.Configuration;
23 import net.sf.eos.config.HadoopConfigurationAdapter;
24 import net.sf.eos.config.Service;
25 import net.sf.eos.config.Services;
26 import net.sf.eos.document.EosDocument;
27 import net.sf.eos.hadoop.DistributedCacheStrategy;
28 import net.sf.eos.hadoop.FullyDistributedCacheStrategy;
29 import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
30 import net.sf.eos.hadoop.mapred.Index;
31 import net.sf.eos.trie.AbstractTrieLoader;
32 import net.sf.eos.trie.CharSequenceKeyAnalyzer;
33 import net.sf.eos.trie.PatriciaTrie;
34 import net.sf.eos.trie.Trie;
35 import net.sf.eos.trie.TrieLoader;
36 import net.sf.eos.trie.PatriciaTrie.KeyAnalyzer;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.io.LongWritable;
42 import org.apache.hadoop.io.Text;
43 import org.apache.hadoop.mapred.JobConf;
44 import org.apache.hadoop.mapred.Mapper;
45 import org.apache.hadoop.mapred.OutputCollector;
46 import org.apache.hadoop.mapred.Reporter;
47
48 import java.io.IOException;
49 import java.io.InputStream;
50 import java.util.Map;
51 import java.util.Set;
52 import java.util.Map.Entry;
53
54 @Services(
55 services={
56 @Service(
57 factory=TokenizerSupplier.class,
58 description="Tokenizer for coocurence analyzing."
59 ),
60 @Service(
61 factory=AbstractTrieLoader.class,
62 description="Trie contains the look up data for cooccurance analyzis."
63 )
64 }
65 )
66 public class DictionaryBasedEntityRecognizerMapper
67 extends EosDocumentSupportMapReduceBase
68 implements Mapper<LongWritable, Text, Text, Text> {
69
70
71 private static final Log LOG =
72 LogFactory.getLog(DictionaryBasedEntityRecognizerMapper.class.getName());
73
74 private JobConf conf;
75
76 private Trie<CharSequence, Set<CharSequence>> entities = null;
77 private DistributedCacheStrategy strategy =
78 new FullyDistributedCacheStrategy();
79
80
81 public void map(final LongWritable positionInFile,
82 final Text eosDoc,
83 final OutputCollector<Text, Text> outputCollector,
84 final Reporter reporter) throws IOException {
85
86 try {
87 final EosDocument doc = textToEosDocument(eosDoc);
88 final DictionaryBasedEntityIdKeyGenerator generator =
89 new DictionaryBasedEntityIdKeyGenerator();
90 final Configuration lConf = new Configuration();
91 HadoopConfigurationAdapter.addHadoopConfigToEosConfig(conf, lConf);
92 generator.configure(lConf);
93
94 final Trie<CharSequence, Set<CharSequence>> lTrie = getTrie();
95 generator.setTrie(lTrie);
96
97 final Map<Text, EosDocument> idMap =
98 generator.createKeysForDocument(doc);
99
100 for (final Entry<Text, EosDocument> entry : idMap.entrySet()) {
101 final Text key = entry.getKey();
102 final EosDocument newdoc = entry.getValue();
103 final Text newTextDoc = this.eosDocumentToText(newdoc);
104 outputCollector.collect(key, newTextDoc);
105 reporter.incrCounter(Index.MAP, 1);
106 }
107
108 } catch (final EosException e) {
109 reporter.incrCounter(Index.EOS_EXCEPTION, 1);
110 final IOException te = new IOException(e.getMessage());
111 te.initCause(e);
112 throw te;
113 } catch (final IOException e) {
114 reporter.incrCounter(Index.IO_EXCEPTION, 1);
115 throw e;
116 } catch (final Exception e) {
117 reporter.incrCounter(Index.OTHER_EXCEPTION, 1);
118 final IOException te = new IOException(e.getMessage());
119 te.initCause(e);
120 throw te;
121 }
122 }
123
124
125
126
127
128
129 protected void configureTrie() {
130 synchronized(DictionaryBasedEntityRecognizerMapper.class) {
131 try {
132 assert this.conf != null;
133
134 final String strageyClassName =
135 this.conf.get(DistributedCacheStrategy.STRATEGY_IMPL_CONFIG_NAME);
136 if (strageyClassName != null) {
137 try {
138 this.strategy = (DistributedCacheStrategy)
139 Class.forName(strageyClassName).newInstance();
140 } catch (final InstantiationException e) {
141 throw new RuntimeException(e);
142 } catch (final IllegalAccessException e) {
143 throw new RuntimeException(e);
144 } catch (final ClassNotFoundException e) {
145 throw new RuntimeException(e);
146 }
147 }
148
149 final Path[] recognizerDataFile =
150 this.strategy.distributedCachePathes((this.conf));
151 LOG.info("strategy: "
152 + this.strategy.getClass().getCanonicalName());
153 LOG.info("path: " + recognizerDataFile[0]);
154 final InputStream in =
155 recognizerDataFile[0].toUri().toURL().openStream();
156
157 final Configuration lconf =
158 new HadoopConfigurationAdapter(this.conf);
159 final TrieLoader newInstance =
160 AbstractTrieLoader.newInstance(lconf);
161 final TrieLoader<CharSequence, Set<CharSequence>> loader =
162 newInstance;
163
164 final KeyAnalyzer<CharSequence> analyzer =
165 new CharSequenceKeyAnalyzer();
166 this.entities =
167 new PatriciaTrie<CharSequence, Set<CharSequence>>(analyzer);
168 loader.loadTrie(in, this.entities);
169 } catch (final Exception e) {
170 throw new IllegalStateException(e);
171 }
172 }
173 }
174
175
176
177
178
179
180
181 protected ResettableTokenizer getTokenizer() throws TokenizerException {
182
183 assert this.conf != null;
184
185 try {
186 final Configuration lconf = new HadoopConfigurationAdapter(this.conf);
187 final TokenizerSupplier tokenBuilder =
188 TokenizerSupplier.newInstance(lconf);
189 final ResettableTokenizer tokenizer = tokenBuilder.get();
190
191 return tokenizer;
192 } catch (final Exception e) {
193 throw new TokenizerException(e);
194 }
195 }
196
197
198
199
200
201
202 protected Trie<CharSequence, Set<CharSequence>> getTrie() {
203 assert this.entities != null;
204 return this.entities;
205 }
206
207
208
209
210
211 @Override
212 public void configure(@SuppressWarnings("hiding") final JobConf conf) {
213 super.configure(conf);
214 this.conf = conf;
215 configureTrie();
216 }
217
218 @Override
219 public void close() throws IOException {
220 super.close();
221 }
222 }