View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred.decompose;
17  
18  
19  import net.sf.eos.EosException;
20  import net.sf.eos.analyzer.ResettableTokenizer;
21  import net.sf.eos.analyzer.SentenceTokenizer;
22  import net.sf.eos.analyzer.TextBuilder;
23  import net.sf.eos.analyzer.TokenizerSupplier;
24  import net.sf.eos.analyzer.TextBuilder.SpaceBuilder;
25  import net.sf.eos.config.Configuration;
26  import net.sf.eos.config.HadoopConfigurationAdapter;
27  import net.sf.eos.config.Service;
28  import net.sf.eos.config.Services;
29  import net.sf.eos.document.EosDocument;
30  import net.sf.eos.hadoop.mapred.AbstractKeyGenerator;
31  import static net.sf.eos.hadoop.mapred.AbstractKeyGenerator.ABSTRACT_KEY_GENERATOR_IMPL_CONFIG_NAME;
32  import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
33  import net.sf.eos.hadoop.mapred.Index;
34  import net.sf.eos.hadoop.mapred.KeyGenerator;
35  import net.sf.eos.sentence.Sentencer;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.io.LongWritable;
40  import org.apache.hadoop.io.Text;
41  import org.apache.hadoop.mapred.JobConf;
42  import org.apache.hadoop.mapred.Mapper;
43  import org.apache.hadoop.mapred.OutputCollector;
44  import org.apache.hadoop.mapred.Reporter;
45  
46  import java.io.IOException;
47  import java.util.Map;
48  import java.util.Map.Entry;
49  
50  @Services(
51      services={
52          @Service(
53              factory=AbstractKeyGenerator.class,
54              implementation=TextMetaKeyGenerator.class,
55              description="Tokenizer for coocurence analyzing."
56          ),
57          @Service(
58              factory=TextBuilder.class,
59              implementation=SpaceBuilder.class
60          ),
61          @Service(
62              factory=TokenizerSupplier.class
63          ),
64          @Service(
65              factory=Sentencer.class
66          )
67      }
68  )
69  public class SentencerMapper extends EosDocumentSupportMapReduceBase
70          implements Mapper<LongWritable, Text, Text, Text> {
71  
72      /** For logging. */
73      private static final Log LOG =
74          LogFactory.getLog(SentencerMapper.class.getName());
75  
76      private JobConf conf;
77  
78      public void map(final LongWritable positionInFile,
79                      final Text eosDoc,
80                      final OutputCollector<Text, Text> outputCollector,
81                      final Reporter reporter) throws IOException {
82  
83          final Configuration config = new HadoopConfigurationAdapter(this.conf);
84  
85          try {
86              final EosDocument doc = textToEosDocument(eosDoc);
87              final TokenizerSupplier tokenBuilder =
88                  TokenizerSupplier.newInstance(config);
89              if (LOG.isDebugEnabled()) {
90                  LOG.debug("TokenizerBuilder instanceof "
91                            + tokenBuilder.getClass());
92              }
93              final TextBuilder textBuilder =
94                  TextBuilder.newInstance(config);
95              if (LOG.isDebugEnabled()) {
96                  LOG.debug("TextBuilder instanceof " + textBuilder.getClass());
97              }
98              final Sentencer sentencer = Sentencer.newInstance(config);
99              if (LOG.isDebugEnabled()) {
100                 LOG.debug("Sentencer instanceof " + sentencer.getClass());
101             }
102 
103             final ResettableTokenizer tokenizer =
104                 tokenBuilder.get();
105             final SentenceTokenizer sentenceTokenizer =
106                 new SentenceTokenizer();
107 
108             final Map<String, EosDocument> docs = 
109                 sentencer.toSentenceDocuments(doc,
110                                               sentenceTokenizer,
111                                               tokenizer,
112                                               textBuilder);
113 
114             final KeyGenerator<Text> generator = newGenerator();
115             for (final Entry<String, EosDocument> entry : docs.entrySet()) {
116                 final String key = entry.getKey();
117                 final EosDocument toWrite = entry.getValue();
118                 final Map<Text, EosDocument> toStore =
119                     generator.createKeysForDocument(toWrite);
120 
121                 for (final Entry<Text, EosDocument> e : toStore.entrySet()) {
122                     final Text textKey = e.getKey();
123                     final String asString = textKey.toString();
124                     final String addKey = key + "+" + asString;
125                     final Text keyAsText = new Text(addKey);
126                     final Text docAsText = eosDocumentToText(toWrite);
127                     outputCollector.collect(keyAsText, docAsText);
128                     reporter.incrCounter(Index.MAP, 1);
129                 }
130             }
131 
132         } catch (final EosException e) {
133             reporter.incrCounter(Index.EOS_EXCEPTION, 1);
134             throw new IOException(e.getMessage());
135         } catch (final Exception e) {
136             if (e instanceof RuntimeException) {
137                 throw (RuntimeException) e;
138             }
139             reporter.incrCounter(Index.IO_EXCEPTION, 1);
140             throw new IOException("" + e.getClass() + " - " + e.getMessage());
141         }
142     }
143 
144     protected KeyGenerator<Text> newGenerator() throws EosException {
145         final Configuration lconf = new Configuration(); 
146         HadoopConfigurationAdapter.addHadoopConfigToEosConfig(this.conf, lconf);
147         
148         final String implName =
149             lconf.get(ABSTRACT_KEY_GENERATOR_IMPL_CONFIG_NAME);
150         if (implName == null || implName.length() == 0) {
151             lconf.set(ABSTRACT_KEY_GENERATOR_IMPL_CONFIG_NAME,
152                     TextMetaKeyGenerator.class.getName());
153         }
154         final KeyGenerator<Text> newInstance =
155             (KeyGenerator<Text>) AbstractKeyGenerator.newInstance(lconf);
156 
157         if (LOG.isDebugEnabled()) {
158             LOG.debug("KeyGenerator<Text> instance is "
159                       + newInstance.getClass());
160         }
161         return newInstance;
162     }
163 
164     @Override
165     public void configure(final JobConf conf) {
166         super.configure(conf);
167         this.conf = conf;
168     }
169 
170     @Override
171     public void close() throws IOException {
172         super.close();
173     }
174 }