View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred.index;
17  
18  import net.sf.eos.EosException;
19  import net.sf.eos.config.Configuration;
20  import net.sf.eos.config.HadoopConfigurationAdapter;
21  import net.sf.eos.document.EosDocument;
22  import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
23  import net.sf.eos.hadoop.mapred.Index;
24  import net.sf.eos.lucene.LuceneDocumentCreator;
25  
26  import org.apache.hadoop.io.ObjectWritable;
27  import org.apache.hadoop.io.Text;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapred.JobConf;
30  import org.apache.hadoop.mapred.OutputCollector;
31  import org.apache.hadoop.mapred.Reducer;
32  import org.apache.hadoop.mapred.Reporter;
33  import org.apache.lucene.document.Document;
34  
35  import java.io.IOException;
36  import java.util.Iterator;
37  
38  public class IndexReducer<K extends WritableComparable>
39          extends EosDocumentSupportMapReduceBase
40          implements Reducer<K, Text, K, ObjectWritable> {
41  
42      private JobConf conf;
43  
44      public void reduce(final K key,
45                         final Iterator<Text> lineIterator,
46                         final OutputCollector<K, ObjectWritable> output,
47                         final Reporter reporter) throws IOException {
48  
49          assert this.conf != null;
50          final Configuration lconf =
51              new HadoopConfigurationAdapter(this.conf);
52          try {
53              final LuceneDocumentCreator creator =
54                  LuceneDocumentCreator.newInstance(lconf);
55  
56              while (lineIterator.hasNext()) {
57                  final Text text = lineIterator.next();
58                  final EosDocument doc = textToEosDocument(text);
59                  final Document lDoc =
60                      creator.createLuceneForEosDocument(doc);
61  
62                  if (lDoc != null) {
63                      output.collect(key, new ObjectWritable(lDoc));
64                      reporter.incrCounter(Index.REDUCE, 1);
65                  }
66              }
67          } catch (final EosException e) {
68              final IOException ioe = new IOException();
69              ioe.initCause(e);
70              reporter.incrCounter(Index.EOS_EXCEPTION, 1);
71              throw ioe;
72          } catch (final IOException e) {
73              reporter.incrCounter(Index.IO_EXCEPTION, 1);
74              throw e;
75          } catch (final Exception e) {
76              final IOException ioe = new IOException();
77              reporter.incrCounter(Index.OTHER_EXCEPTION, 1);
78              ioe.initCause(e);
79              throw ioe;
80          }
81      }
82  
83      /**
84       * @param conf the configuration
85       */
86      @Override
87      public void configure(@SuppressWarnings("hiding") final JobConf conf) {
88          super.configure(conf);
89          this.conf = conf;
90      }
91  
92      @Override
93      public void close() throws IOException {
94          super.close();
95      }
96  }