1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred.index;
17
18 import net.sf.eos.EosException;
19 import net.sf.eos.config.Configuration;
20 import net.sf.eos.config.HadoopConfigurationAdapter;
21 import net.sf.eos.document.EosDocument;
22 import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
23 import net.sf.eos.hadoop.mapred.Index;
24 import net.sf.eos.lucene.LuceneDocumentCreator;
25
26 import org.apache.hadoop.io.ObjectWritable;
27 import org.apache.hadoop.io.Text;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapred.JobConf;
30 import org.apache.hadoop.mapred.OutputCollector;
31 import org.apache.hadoop.mapred.Reducer;
32 import org.apache.hadoop.mapred.Reporter;
33 import org.apache.lucene.document.Document;
34
35 import java.io.IOException;
36 import java.util.Iterator;
37
38 public class IndexReducer<K extends WritableComparable>
39 extends EosDocumentSupportMapReduceBase
40 implements Reducer<K, Text, K, ObjectWritable> {
41
42 private JobConf conf;
43
44 public void reduce(final K key,
45 final Iterator<Text> lineIterator,
46 final OutputCollector<K, ObjectWritable> output,
47 final Reporter reporter) throws IOException {
48
49 assert this.conf != null;
50 final Configuration lconf =
51 new HadoopConfigurationAdapter(this.conf);
52 try {
53 final LuceneDocumentCreator creator =
54 LuceneDocumentCreator.newInstance(lconf);
55
56 while (lineIterator.hasNext()) {
57 final Text text = lineIterator.next();
58 final EosDocument doc = textToEosDocument(text);
59 final Document lDoc =
60 creator.createLuceneForEosDocument(doc);
61
62 if (lDoc != null) {
63 output.collect(key, new ObjectWritable(lDoc));
64 reporter.incrCounter(Index.REDUCE, 1);
65 }
66 }
67 } catch (final EosException e) {
68 final IOException ioe = new IOException();
69 ioe.initCause(e);
70 reporter.incrCounter(Index.EOS_EXCEPTION, 1);
71 throw ioe;
72 } catch (final IOException e) {
73 reporter.incrCounter(Index.IO_EXCEPTION, 1);
74 throw e;
75 } catch (final Exception e) {
76 final IOException ioe = new IOException();
77 reporter.incrCounter(Index.OTHER_EXCEPTION, 1);
78 ioe.initCause(e);
79 throw ioe;
80 }
81 }
82
83
84
85
86 @Override
87 public void configure(@SuppressWarnings("hiding") final JobConf conf) {
88 super.configure(conf);
89 this.conf = conf;
90 }
91
92 @Override
93 public void close() throws IOException {
94 super.close();
95 }
96 }