View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred.decompose;
17  
18  
19  import net.sf.eos.EosException;
20  import net.sf.eos.document.EosDocument;
21  import net.sf.eos.hadoop.mapred.EosDocumentSupportMapReduceBase;
22  import net.sf.eos.hadoop.mapred.Index;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.io.Text;
27  import org.apache.hadoop.mapred.JobConf;
28  import org.apache.hadoop.mapred.OutputCollector;
29  import org.apache.hadoop.mapred.Reducer;
30  import org.apache.hadoop.mapred.Reporter;
31  
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.Collection;
35  import java.util.HashMap;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.Set;
41  import java.util.Map.Entry;
42  
43  public class SentencerReducer extends EosDocumentSupportMapReduceBase
44                                implements Reducer<Text, Text, Text, Text> {
45  
46      /** For logging. */
47      private static final Log LOG =
48          LogFactory.getLog(SentencerReducer.class.getName());
49  
50      private static final Text EMPTY = new Text();
51  
52      private JobConf conf;
53  
54      /*
55       * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
56       */
57      @SuppressWarnings("nls")
58      public void reduce(final Text key,
59                         final Iterator<Text> valuesIterator,
60                         final OutputCollector<Text, Text> outputCollector,
61                         final Reporter reporter) throws IOException {
62  
63          try {
64              final EosDocument doc =
65                  createEosDocumentFromIterator(valuesIterator);
66              final Text docAsText = eosDocumentToText(doc);
67  
68              outputCollector.collect(EMPTY, docAsText);
69              reporter.incrCounter(Index.REDUCE, 1);
70  
71          } catch (final EosException e) {
72              reporter.incrCounter(Index.EOS_EXCEPTION, 1);
73              throw new IOException(e.getMessage());
74          } catch (final Exception e) {
75              reporter.incrCounter(Index.IO_EXCEPTION, 1);
76              throw new IOException(e.getMessage());
77          }
78      }
79  
80      final EosDocument createEosDocumentFromIterator(
81              final Iterator<Text> valuesIterator) throws Exception, IOException {
82  
83          final Map<String, Set<String>> metaData =
84              new HashMap<String, Set<String>>();
85          EosDocument doc = null;
86  
87          while (valuesIterator.hasNext()) {
88              final Text eosDoc = valuesIterator.next();
89              doc = textToEosDocument(eosDoc);
90              assert doc != null;
91  
92              final Map<String, List<String>> meta = doc.getMeta();
93              if (meta != null) {
94                  for (final Entry<String, List<String>> entry 
95                          : meta.entrySet()) {
96                      final String metaKey = entry.getKey();
97                      Set<String> collector = metaData.get(metaKey);
98                      if (collector == null) {
99                          collector = new HashSet<String>();
100                         metaData.put(metaKey, collector);
101                     }
102                     final List<String> value = entry.getValue();
103                     collector.addAll(value);
104                 }
105             }
106         }
107 
108         final Map<String, List<String>> newMeta = transformToList(metaData);
109         assert doc != null;
110         doc.setMeta(newMeta);
111 
112         return doc;
113     }
114 
115     /**
116      * Transforms a {@code Map} of metadata {@code Set}s into a
117      * {@code Map} of metadata {@code List}s.
118      * @param metaData the metadata to transform
119      * @return the transformed map of lists.
120      */
121     final Map<String, List<String>> transformToList(
122             final Map<String, Set<String>> metaData) {
123 
124         final Map<String, List<String>> newMeta =
125             new HashMap<String, List<String>>();
126         for (final Entry<String, Set<String>> entry : metaData.entrySet()) {
127             final String metaKey = entry.getKey();
128             final List<String> metaValue = new ArrayList<String>();
129             final Collection<String> metaCol = entry.getValue();
130             metaValue.addAll(metaCol);
131             newMeta.put(metaKey, metaValue);
132         }
133         return newMeta;
134     }
135 
136     @Override
137     public void configure(final JobConf conf) {
138         super.configure(conf);
139         this.conf = conf;
140     }
141 
142     @Override
143     public void close() throws IOException {
144         super.close();
145     }
146 }