1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred.decompose;
17
18
19 import net.sf.eos.EosException;
20 import net.sf.eos.document.EosDocument;
21 import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
22 import net.sf.eos.hadoop.mapred.Index;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapred.JobConf;
28 import org.apache.hadoop.mapred.OutputCollector;
29 import org.apache.hadoop.mapred.Reducer;
30 import org.apache.hadoop.mapred.Reporter;
31
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.Map.Entry;
42
43 public class SentencerReducer extends EosDocumentSupportMapReduceBase
44 implements Reducer<Text, Text, Text, Text> {
45
46
47 private static final Log LOG =
48 LogFactory.getLog(SentencerReducer.class.getName());
49
50 private static final Text EMPTY = new Text();
51
52 private JobConf conf;
53
54
55
56
57 @SuppressWarnings("nls")
58 public void reduce(final Text key,
59 final Iterator<Text> valuesIterator,
60 final OutputCollector<Text, Text> outputCollector,
61 final Reporter reporter) throws IOException {
62
63 try {
64 final EosDocument doc =
65 createEosDocumentFromIterator(valuesIterator);
66 final Text docAsText = eosDocumentToText(doc);
67
68 outputCollector.collect(EMPTY, docAsText);
69 reporter.incrCounter(Index.REDUCE, 1);
70
71 } catch (final EosException e) {
72 reporter.incrCounter(Index.EOS_EXCEPTION, 1);
73 throw new IOException(e.getMessage());
74 } catch (final Exception e) {
75 reporter.incrCounter(Index.IO_EXCEPTION, 1);
76 throw new IOException(e.getMessage());
77 }
78 }
79
80 final EosDocument createEosDocumentFromIterator(
81 final Iterator<Text> valuesIterator) throws Exception, IOException {
82
83 final Map<String, Set<String>> metaData =
84 new HashMap<String, Set<String>>();
85 EosDocument doc = null;
86
87 while (valuesIterator.hasNext()) {
88 final Text eosDoc = valuesIterator.next();
89 doc = textToEosDocument(eosDoc);
90 assert doc != null;
91
92 final Map<String, List<String>> meta = doc.getMeta();
93 if (meta != null) {
94 for (final Entry<String, List<String>> entry
95 : meta.entrySet()) {
96 final String metaKey = entry.getKey();
97 Set<String> collector = metaData.get(metaKey);
98 if (collector == null) {
99 collector = new HashSet<String>();
100 metaData.put(metaKey, collector);
101 }
102 final List<String> value = entry.getValue();
103 collector.addAll(value);
104 }
105 }
106 }
107
108 final Map<String, List<String>> newMeta = transformToList(metaData);
109 assert doc != null;
110 doc.setMeta(newMeta);
111
112 return doc;
113 }
114
115
116
117
118
119
120
121 final Map<String, List<String>> transformToList(
122 final Map<String, Set<String>> metaData) {
123
124 final Map<String, List<String>> newMeta =
125 new HashMap<String, List<String>>();
126 for (final Entry<String, Set<String>> entry : metaData.entrySet()) {
127 final String metaKey = entry.getKey();
128 final List<String> metaValue = new ArrayList<String>();
129 final Collection<String> metaCol = entry.getValue();
130 metaValue.addAll(metaCol);
131 newMeta.put(metaKey, metaValue);
132 }
133 return newMeta;
134 }
135
136 @Override
137 public void configure(final JobConf conf) {
138 super.configure(conf);
139 this.conf = conf;
140 }
141
142 @Override
143 public void close() throws IOException {
144 super.close();
145 }
146 }