View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred.cooccurrence;
17  
18  import net.sf.eos.EosException;
19  import net.sf.eos.analyzer.ResettableTokenizer;
20  import net.sf.eos.analyzer.TokenizerSupplier;
21  import net.sf.eos.analyzer.TokenizerException;
22  import net.sf.eos.config.Configuration;
23  import net.sf.eos.config.HadoopConfigurationAdapter;
24  import net.sf.eos.config.Service;
25  import net.sf.eos.config.Services;
26  import net.sf.eos.document.EosDocument;
27  import net.sf.eos.hadoop.DistributedCacheStrategy;
28  import net.sf.eos.hadoop.FullyDistributedCacheStrategy;
29  import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
30  import net.sf.eos.hadoop.mapred.Index;
31  import net.sf.eos.trie.AbstractTrieLoader;
32  import net.sf.eos.trie.CharSequenceKeyAnalyzer;
33  import net.sf.eos.trie.PatriciaTrie;
34  import net.sf.eos.trie.Trie;
35  import net.sf.eos.trie.TrieLoader;
36  import net.sf.eos.trie.PatriciaTrie.KeyAnalyzer;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.io.LongWritable;
42  import org.apache.hadoop.io.Text;
43  import org.apache.hadoop.mapred.JobConf;
44  import org.apache.hadoop.mapred.Mapper;
45  import org.apache.hadoop.mapred.OutputCollector;
46  import org.apache.hadoop.mapred.Reporter;
47  
48  import java.io.IOException;
49  import java.io.InputStream;
50  import java.util.Map;
51  import java.util.Set;
52  import java.util.Map.Entry;
53  
54  @Services(
55      services={
56          @Service(
57              factory=TokenizerSupplier.class,
58              description="Tokenizer for coocurence analyzing."
59          ),
60          @Service(
61              factory=AbstractTrieLoader.class,
62              description="Trie contains the look up data for cooccurance analyzis."
63          )
64      }
65  )
66  public class DictionaryBasedEntityRecognizerMapper
67          extends EosDocumentSupportMapReduceBase
68          implements Mapper<LongWritable, Text, Text, Text> {
69  
70      /** For logging. */
71      private static final Log LOG =
72          LogFactory.getLog(DictionaryBasedEntityRecognizerMapper.class.getName());
73  
74      private JobConf conf;
75  
76      private Trie<CharSequence, Set<CharSequence>> entities = null;
77      private DistributedCacheStrategy strategy =
78          new FullyDistributedCacheStrategy();
79  
80  
81      public void map(final LongWritable positionInFile,
82                      final Text eosDoc,
83                      final OutputCollector<Text, Text> outputCollector,
84                      final Reporter reporter) throws IOException {
85  
86          try {
87              final EosDocument doc = textToEosDocument(eosDoc);
88              final DictionaryBasedEntityIdKeyGenerator generator =
89                  new DictionaryBasedEntityIdKeyGenerator();
90              final Configuration lConf = new Configuration();
91              HadoopConfigurationAdapter.addHadoopConfigToEosConfig(conf, lConf);
92              generator.configure(lConf);
93  
94              final Trie<CharSequence, Set<CharSequence>> lTrie = getTrie();
95              generator.setTrie(lTrie);
96  
97              final Map<Text, EosDocument> idMap =
98                  generator.createKeysForDocument(doc); 
99  
100             for (final Entry<Text, EosDocument> entry : idMap.entrySet()) {
101                 final Text key = entry.getKey();
102                 final EosDocument newdoc = entry.getValue();
103                 final Text newTextDoc = this.eosDocumentToText(newdoc);
104                 outputCollector.collect(key, newTextDoc);
105                 reporter.incrCounter(Index.MAP, 1);
106             }
107 
108         } catch (final EosException e) {
109             reporter.incrCounter(Index.EOS_EXCEPTION, 1);
110             final IOException te = new IOException(e.getMessage());
111             te.initCause(e);
112             throw te;
113         } catch (final IOException e) {
114             reporter.incrCounter(Index.IO_EXCEPTION, 1);
115             throw e;
116         } catch (final Exception e) {
117             reporter.incrCounter(Index.OTHER_EXCEPTION, 1);
118             final IOException te = new IOException(e.getMessage());
119             te.initCause(e);
120             throw te;
121         }
122     }
123 
124     /**
125      * Configures the trie. After finishing the method {@link #getTrie()}.
126      * Uses the value of {@link DistributedCacheStrategy#STRATEGY_IMPL_CONFIG_NAME}
127      * if setted to get the distributed cache strategy.
128      */
129     protected void configureTrie() {
130         synchronized(DictionaryBasedEntityRecognizerMapper.class) {
131             try {
132                 assert this.conf != null;
133 
134                 final String strageyClassName =
135                     this.conf.get(DistributedCacheStrategy.STRATEGY_IMPL_CONFIG_NAME);
136                 if (strageyClassName != null) {
137                     try {
138                         this.strategy = (DistributedCacheStrategy) 
139                             Class.forName(strageyClassName).newInstance();
140                     } catch (final InstantiationException e) {
141                         throw new RuntimeException(e);
142                     } catch (final IllegalAccessException e) {
143                         throw new RuntimeException(e);
144                     } catch (final ClassNotFoundException e) {
145                         throw new RuntimeException(e);
146                     }
147                 }
148 
149                 final Path[] recognizerDataFile =
150                     this.strategy.distributedCachePathes((this.conf));
151                 LOG.info("strategy: "
152                          + this.strategy.getClass().getCanonicalName());
153                 LOG.info("path: " + recognizerDataFile[0]);
154                 final InputStream in =
155                     recognizerDataFile[0].toUri().toURL().openStream();
156 
157                 final Configuration lconf =
158                     new HadoopConfigurationAdapter(this.conf);
159                 final TrieLoader newInstance =
160                     AbstractTrieLoader.newInstance(lconf);
161                 final TrieLoader<CharSequence, Set<CharSequence>> loader = 
162                     newInstance;
163 
164                 final KeyAnalyzer<CharSequence> analyzer =
165                     new CharSequenceKeyAnalyzer();
166                 this.entities = 
167                     new PatriciaTrie<CharSequence, Set<CharSequence>>(analyzer);
168                 loader.loadTrie(in, this.entities);
169             } catch (final Exception e) {
170                 throw new IllegalStateException(e);
171             }
172         }
173     }
174 
175     /**
176      * Returns a {@code Tokenizer} as <em>source</em> for the
177      * recognizer.
178      * @return the <em>source</em> for the recognizer
179      * @throws TokenizerException if an error occurs
180      */
181     protected ResettableTokenizer getTokenizer() throws TokenizerException {
182 
183         assert this.conf != null;
184 
185         try {
186             final Configuration lconf = new HadoopConfigurationAdapter(this.conf);
187             final TokenizerSupplier tokenBuilder =
188                 TokenizerSupplier.newInstance(lconf);
189             final ResettableTokenizer tokenizer = tokenBuilder.get();
190 
191             return tokenizer;
192         } catch (final Exception e) {
193             throw new TokenizerException(e);
194         }
195     }
196 
197     /**
198      * Returns a {@code Trie} instance. See contract in
199      * {@link #configureTrie()}
200      * @return a {@code Trie} instance
201      */
202     protected Trie<CharSequence, Set<CharSequence>> getTrie() {
203         assert this.entities != null;
204         return this.entities;
205     }
206 
207     /**
208      * Sets the configuration and calls {@link #configureTrie()}
209      * @param conf the configuration
210      */
211     @Override
212     public void configure(@SuppressWarnings("hiding") final JobConf conf) {
213         super.configure(conf);
214         this.conf = conf;
215         configureTrie();
216     }
217 
218     @Override
219     public void close() throws IOException {
220         super.close();
221     }
222 }