1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred.cooccurrence;
17
18 import static net.sf.eos.config.ConfigurationKey.Type.CLASSNAME;
19 import net.sf.eos.EosException;
20 import net.sf.eos.analyzer.TextBuilder;
21 import net.sf.eos.config.Configuration;
22 import net.sf.eos.config.ConfigurationKey;
23 import net.sf.eos.config.HadoopConfigurationAdapter;
24 import net.sf.eos.document.EosDocument;
25 import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
26 import net.sf.eos.hadoop.mapred.Index;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.io.Text;
31 import org.apache.hadoop.mapred.JobConf;
32 import org.apache.hadoop.mapred.OutputCollector;
33 import org.apache.hadoop.mapred.Reducer;
34 import org.apache.hadoop.mapred.Reporter;
35
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.StringTokenizer;
45 import java.util.Map.Entry;
46
47 public class DictionaryBasedEntityRecognizerReducer
48 extends EosDocumentSupportMapReduceBase
49 implements Reducer<Text, Text, Text, Text> {
50
51
52 private static final Log LOG =
53 LogFactory.getLog(DictionaryBasedEntityRecognizerReducer.class.getName());
54
55 private static final Text EMPTY = new Text();
56
57 private final static String NULL = "<null>";
58 private final static String UNDERLINE = "_";
59
60
61
62 @SuppressWarnings("nls")
63 @ConfigurationKey(description="Metakeys, comma delimited, for separation.")
64 public static final String META_FIELD_FOR_SEPARATION_CONFIG_NAME =
65 "net.sf.eos.hadoop.mapred.cooccurrence.DictionaryBasedEntityRecognizerReducer.metaKeys";
66
67 private JobConf conf;
68
69 public void reduce(final Text key,
70 final Iterator<Text> valuesIterator,
71 final OutputCollector<Text, Text> outputCollector,
72 final Reporter reporter) throws IOException {
73 try {
74 final Map<String, EosDocument> docs =
75 createCombinedEosDocumentsFromIterator(valuesIterator);
76
77 for (final Entry<String, EosDocument> entry : docs.entrySet()) {
78
79 final EosDocument doc = entry.getValue();
80 final Text textValue = this.eosDocumentToText(doc);
81 outputCollector.collect(EMPTY, textValue);
82
83 reporter.incrCounter(Index.REDUCE, 1);
84 }
85
86 } catch (final EosException e) {
87 reporter.incrCounter(Index.EOS_EXCEPTION, 1);
88 throw new IOException(e.getMessage());
89 } catch (final Exception e) {
90 reporter.incrCounter(Index.IO_EXCEPTION, 1);
91 throw new IOException(e.getMessage());
92 }
93 }
94
95 final Map<String, EosDocument> createCombinedEosDocumentsFromIterator(
96 final Iterator<Text> valuesIterator) throws Exception, IOException {
97
98 final List<String> keys = getMetaKeys();
99
100
101 final Map<String, EosDocument> retval =
102 new HashMap<String, EosDocument>();
103
104 while (valuesIterator.hasNext()) {
105 final Text eosDoc = valuesIterator.next();
106 if (LOG.isTraceEnabled()) {
107 LOG.trace("EosDocument to handle: " + eosDoc.toString());
108 }
109 final EosDocument doc = textToEosDocument(eosDoc);
110 assert doc != null;
111
112 final StringBuilder newKey = new StringBuilder();
113 final Map<String, List<String>> meta = doc.getMeta();
114
115 if (meta != null) {
116 for (final String key : keys) {
117 final List<String> values = meta.get(key);
118
119 if (values != null) {
120
121 final List<String> sortedValues =
122 new ArrayList<String>();
123 sortedValues.addAll(values);
124 Collections.sort(sortedValues);
125
126 for (final String value : sortedValues) {
127 if (value != null) {
128 newKey.append(value);
129 } else {
130 newKey.append(NULL);
131 }
132 }
133 } else {
134 newKey.append(NULL);
135 }
136 newKey.append(UNDERLINE);
137 }
138 }
139 if (newKey.length() == 0) {
140 newKey.append(NULL);
141 }
142
143 final String keyRaw = newKey.toString();
144 final String key = replaceWhitespaceWithUnderline(keyRaw);
145 final EosDocument valueDoc = retval.get(key);
146
147 if (valueDoc == null) {
148 retval.put(key, doc);
149 } else {
150 combineDocuments(doc, valueDoc);
151 }
152 }
153
154 return retval;
155 }
156
157 final String replaceWhitespaceWithUnderline(final String toReplace) {
158 final StringBuilder sb = new StringBuilder();
159 final int length = toReplace.length();
160 for (int i = 0; i < length; i++) {
161 final char c = toReplace.charAt(i);
162 if (Character.isWhitespace(c)) {
163 sb.append(UNDERLINE);
164 } else {
165 sb.append(c);
166 }
167 }
168 return sb.toString();
169 }
170
171 final Map<String, List<String>>
172 metadataSetToList(final Map<String, Set<String>> meta) {
173
174 assert meta != null;
175 final Map<String, List<String>> newMetaData =
176 new HashMap<String, List<String>>();
177
178 for (final Entry<String, Set<String>> entry : meta.entrySet()) {
179 final String key = entry.getKey();
180 final Set<String> oldValue = entry.getValue();
181 final List<String> newValue = new ArrayList<String>();
182 newValue.addAll(oldValue);
183 newMetaData.put(key, newValue);
184 }
185
186 return newMetaData;
187 }
188
189 @Override
190 public void configure(final JobConf conf) {
191 super.configure(conf);
192 this.conf = conf;
193 }
194
195 @Override
196 public void close() throws IOException {
197 super.close();
198 }
199
200 final void combineDocuments(final EosDocument from, final EosDocument to)
201 throws EosException {
202
203 if (from != null) {
204 assert this.conf != null;
205 final Configuration lconf =
206 new HadoopConfigurationAdapter(this.conf);
207 final TextBuilder builder = TextBuilder.newInstance(lconf);
208
209 final CharSequence fromTitle = from.getTitle();
210 if (fromTitle != null) {
211 final CharSequence toTitle = to.getTitle();
212 if (toTitle == null) {
213 to.setTitle(fromTitle);
214 } else {
215 final CharSequence combined =
216 builder.buildText(toTitle, fromTitle);
217 to.setTitle(combined);
218 }
219 }
220
221 final CharSequence fromText = from.getText();
222 if (fromText != null) {
223 final CharSequence toText = to.getText();
224 if (toText == null) {
225 to.setText(fromText);
226 } else {
227 final CharSequence combined =
228 builder.buildText(toText, fromText);
229 to.setText(combined);
230 }
231 }
232
233 final Map<String, List<String>> fromMeta = from.getMeta();
234 final Map<String, List<String>> toMeta = to.getMeta();
235
236 if (toMeta == null || toMeta.size() == 0) {
237 to.setMeta(fromMeta);
238 } else if (fromMeta != null && fromMeta.size() != 0) {
239
240 for (final Entry<String, List<String>> entry
241 : fromMeta.entrySet()) {
242
243 final String key = entry.getKey();
244 final List<String> values = entry.getValue();
245 final List<String> toValues = toMeta.get(key);
246
247 if (toValues == null || toValues.size() == 0) {
248 toMeta.put(key, values);
249 } else {
250 for (final String fromValue : values) {
251 if (! toValues.contains(fromValue)) {
252 toValues.add(fromValue);
253 }
254 }
255 }
256 }
257 }
258 }
259 }
260
261 final List<String> getMetaKeys() {
262 assert this.conf != null;
263 final Configuration lconf =
264 new HadoopConfigurationAdapter(this.conf);
265 final List<String> keys = new ArrayList<String>();
266 final String value = lconf.get(META_FIELD_FOR_SEPARATION_CONFIG_NAME,
267 EosDocument.ID_META_KEY);
268 for (final StringTokenizer st = new StringTokenizer(value, ", ");
269 st.hasMoreTokens(); ) {
270 final String key = st.nextToken();
271 keys.add(key);
272 }
273 if (keys.size() == 0) {
274 keys.add(EosDocument.ID_META_KEY);
275 }
276 return keys;
277 }
278 }