View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred.cooccurrence;
17  
18  import static net.sf.eos.config.ConfigurationKey.Type.CLASSNAME;
19  import net.sf.eos.EosException;
20  import net.sf.eos.analyzer.TextBuilder;
21  import net.sf.eos.config.Configuration;
22  import net.sf.eos.config.ConfigurationKey;
23  import net.sf.eos.config.HadoopConfigurationAdapter;
24  import net.sf.eos.document.EosDocument;
25  import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
26  import net.sf.eos.hadoop.mapred.Index;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.io.Text;
31  import org.apache.hadoop.mapred.JobConf;
32  import org.apache.hadoop.mapred.OutputCollector;
33  import org.apache.hadoop.mapred.Reducer;
34  import org.apache.hadoop.mapred.Reporter;
35  
36  import java.io.IOException;
37  import java.util.ArrayList;
38  import java.util.Collections;
39  import java.util.HashMap;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Set;
44  import java.util.StringTokenizer;
45  import java.util.Map.Entry;
46  
47  public class DictionaryBasedEntityRecognizerReducer
48          extends EosDocumentSupportMapReduceBase
49          implements Reducer<Text, Text, Text, Text> {
50  
51      /** For logging. */
52      private static final Log LOG =
53          LogFactory.getLog(DictionaryBasedEntityRecognizerReducer.class.getName());
54  
55      private static final Text EMPTY = new Text();
56  
57      private final static String NULL = "<null>";
58      private final static String UNDERLINE = "_";
59  
60      /** The meta field for separation.
61       * Default value is {@link EosDocument#ID_META_KEY}. */
62      @SuppressWarnings("nls")
63      @ConfigurationKey(description="Metakeys, comma delimited, for separation.")
64      public static final String META_FIELD_FOR_SEPARATION_CONFIG_NAME =
65          "net.sf.eos.hadoop.mapred.cooccurrence.DictionaryBasedEntityRecognizerReducer.metaKeys";
66  
67      private JobConf conf;
68  
69      public void reduce(final Text key,
70                         final Iterator<Text> valuesIterator,
71                         final OutputCollector<Text, Text> outputCollector,
72                         final Reporter reporter) throws IOException {
73          try {
74              final Map<String, EosDocument> docs =
75                  createCombinedEosDocumentsFromIterator(valuesIterator);
76  
77              for (final Entry<String, EosDocument> entry : docs.entrySet()) {
78  
79                  final EosDocument doc = entry.getValue();
80                  final Text textValue = this.eosDocumentToText(doc);
81                  outputCollector.collect(EMPTY, textValue);
82  
83                  reporter.incrCounter(Index.REDUCE, 1);
84              }
85  
86          } catch (final EosException e) {
87              reporter.incrCounter(Index.EOS_EXCEPTION, 1);
88              throw new IOException(e.getMessage());
89          } catch (final Exception e) {
90              reporter.incrCounter(Index.IO_EXCEPTION, 1);
91              throw new IOException(e.getMessage());
92          }
93      }
94  
95      final Map<String, EosDocument> createCombinedEosDocumentsFromIterator(
96              final Iterator<Text> valuesIterator) throws Exception, IOException {
97  
98          final List<String> keys = getMetaKeys();
99  
100         // For meta collecting
101         final Map<String, EosDocument> retval = 
102             new HashMap<String, EosDocument>();
103 
104         while (valuesIterator.hasNext()) {
105             final Text eosDoc = valuesIterator.next();
106             if (LOG.isTraceEnabled()) {
107                 LOG.trace("EosDocument to handle: " + eosDoc.toString());
108             }
109             final EosDocument doc = textToEosDocument(eosDoc);
110             assert doc != null;
111 
112             final StringBuilder newKey = new StringBuilder();
113             final Map<String, List<String>> meta = doc.getMeta();
114 
115             if (meta != null) {
116                 for (final String key : keys) {
117                     final List<String> values = meta.get(key);
118 
119                     if (values != null) {
120 
121                         final List<String> sortedValues =
122                             new ArrayList<String>();
123                         sortedValues.addAll(values);
124                         Collections.sort(sortedValues);
125 
126                         for (final String value : sortedValues) {
127                             if (value != null) {
128                                 newKey.append(value);
129                             } else {
130                                 newKey.append(NULL);
131                             }
132                         }
133                     } else {
134                         newKey.append(NULL);
135                     }
136                     newKey.append(UNDERLINE);
137                 }
138             }
139             if (newKey.length() == 0) {
140                 newKey.append(NULL);
141             }
142 
143             final String keyRaw = newKey.toString();
144             final String key = replaceWhitespaceWithUnderline(keyRaw);
145             final EosDocument valueDoc = retval.get(key);
146 
147             if (valueDoc == null) {
148                 retval.put(key, doc);
149             } else {
150                 combineDocuments(doc, valueDoc);
151             }
152         }
153 
154         return retval;
155     }
156 
157     final String replaceWhitespaceWithUnderline(final String toReplace) {
158         final StringBuilder sb = new StringBuilder();
159         final int length = toReplace.length();
160         for (int i = 0; i < length; i++) {
161             final char c = toReplace.charAt(i);
162             if (Character.isWhitespace(c)) {
163                 sb.append(UNDERLINE);
164             } else {
165                 sb.append(c);
166             }
167         }
168         return sb.toString();
169     }
170 
171     final Map<String, List<String>> 
172             metadataSetToList(final Map<String, Set<String>> meta) {
173 
174         assert meta != null;
175         final Map<String, List<String>> newMetaData =
176             new HashMap<String, List<String>>();
177 
178         for (final Entry<String, Set<String>> entry : meta.entrySet()) {
179             final String key = entry.getKey();
180             final Set<String> oldValue = entry.getValue();
181             final List<String> newValue = new ArrayList<String>();
182             newValue.addAll(oldValue);
183             newMetaData.put(key, newValue);
184         }
185 
186         return newMetaData;
187     }
188 
189     @Override
190     public void configure(final JobConf conf) {
191         super.configure(conf);
192         this.conf = conf;
193     }
194 
195     @Override
196     public void close() throws IOException {
197         super.close();
198     }
199 
200     final void combineDocuments(final EosDocument from, final EosDocument to)
201             throws EosException {
202 
203         if (from != null) {
204             assert this.conf != null;
205             final Configuration lconf =
206                 new HadoopConfigurationAdapter(this.conf);
207             final TextBuilder builder = TextBuilder.newInstance(lconf);
208 
209             final CharSequence fromTitle = from.getTitle();
210             if (fromTitle != null) {
211                 final CharSequence toTitle = to.getTitle();
212                 if (toTitle == null) {
213                     to.setTitle(fromTitle);
214                 } else {
215                     final CharSequence combined =
216                         builder.buildText(toTitle, fromTitle);
217                     to.setTitle(combined);
218                 }
219             }
220 
221             final CharSequence fromText = from.getText();
222             if (fromText != null) {
223                 final CharSequence toText = to.getText();
224                 if (toText == null) {
225                     to.setText(fromText);
226                 } else {
227                     final CharSequence combined =
228                         builder.buildText(toText, fromText);
229                     to.setText(combined);
230                 }
231             }
232 
233             final Map<String, List<String>> fromMeta = from.getMeta();
234             final Map<String, List<String>> toMeta = to.getMeta();
235 
236             if (toMeta == null || toMeta.size() == 0) {
237                 to.setMeta(fromMeta);
238             } else if (fromMeta != null && fromMeta.size() != 0) {
239 
240                 for (final Entry<String, List<String>> entry
241                         : fromMeta.entrySet()) {
242 
243                     final String key = entry.getKey();
244                     final List<String> values = entry.getValue();
245                     final List<String> toValues = toMeta.get(key);
246 
247                     if (toValues == null || toValues.size() == 0) {
248                         toMeta.put(key, values);
249                     } else {
250                         for (final String fromValue : values) {
251                             if (! toValues.contains(fromValue)) {
252                                 toValues.add(fromValue);
253                             }
254                         }
255                     }
256                 }
257             }
258         }
259     }
260 
261     final List<String> getMetaKeys() {
262         assert this.conf != null;
263         final Configuration lconf =
264             new HadoopConfigurationAdapter(this.conf);
265         final List<String> keys = new ArrayList<String>();
266         final String value = lconf.get(META_FIELD_FOR_SEPARATION_CONFIG_NAME,
267                                        EosDocument.ID_META_KEY);
268         for (final StringTokenizer st = new StringTokenizer(value, ", ");
269                 st.hasMoreTokens(); ) {
270             final String key = st.nextToken();
271             keys.add(key);
272         }
273         if (keys.size() == 0) {
274             keys.add(EosDocument.ID_META_KEY);
275         }
276         return keys;
277     }
278 }