View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred;
17  
18  import net.sf.eos.EosException;
19  import net.sf.eos.config.Configuration;
20  import net.sf.eos.config.HadoopConfigurationAdapter;
21  import net.sf.eos.config.Service;
22  import net.sf.eos.document.EosDocument;
23  import net.sf.eos.document.Serializer;
24  import net.sf.eos.document.XmlSerializer;
25  
26  import org.apache.commons.io.input.CharSequenceReader;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.io.Text;
30  import org.apache.hadoop.mapred.JobConf;
31  import org.apache.hadoop.mapred.MapReduceBase;
32  
33  import java.io.IOException;
34  import java.io.Reader;
35  import java.io.StringWriter;
36  import java.io.Writer;
37  
38  /**
39   * Support for handling Map/Reduce jobs with {@link EosDocument}.
40   * @author Sascha Kohlmann
41   */
42  @Service(
43      factory=Serializer.class,
44      implementation=XmlSerializer.class,
45      description="Implementations support the serialization and deserialization "
46                  + "of EosDocuments."
47  )
48  public abstract class EosDocumentSupportMapReduceBase extends MapReduceBase {
49  
50      /** For logging. */
51      private static final Log LOG =
52          LogFactory.getLog(EosDocumentSupportMapReduceBase.class.getName());
53  
54      private JobConf conf;
55  
56      /**
57       * Returns a {@code Serializer} instance. Uses the instance defined in
58       * {@link Serializer#SERIALIZER_IMPL_CONFIG_NAME}. If no configuration
59       * is defined, the default implementation is used.
60       * @return a {@code Serializer} instance
61       * @throws EosException if an error occurs
62       */
63      @SuppressWarnings("nls")
64      protected Serializer getSerializer() throws EosException {
65          assert this.conf != null;
66  
67          final Configuration config = new HadoopConfigurationAdapter(this.conf);
68          final Serializer serializer = Serializer.newInstance(config);
69  
70          if (LOG.isDebugEnabled()) {
71              LOG.debug("Serializer instanceof " + serializer.getClass());
72          }
73  
74          return serializer;
75      }
76  
77      /**
78       * Transforms a {@code EosDocument} to an Hadoop {@code Text}.
79       * @param doc the {@code EosDocument} to transform
80       * @return a serialized document
81       * @throws Exception if an error occurs
82       * @throws IOException if an I/O error occurs
83       */
84      @SuppressWarnings("nls")
85      protected Text eosDocumentToText(final EosDocument doc)
86              throws IOException, Exception {
87          final Serializer serializer = getSerializer();
88          final Writer writer = new StringWriter();
89          serializer.serialize(doc, writer);
90          final String docAsString = writer.toString();
91          final Text docAsText = new Text(docAsString);
92          if (LOG.isDebugEnabled()) {
93              LOG.debug("seralized EosDocument: " + docAsText);
94          }
95  
96          return docAsText;
97      }
98  
99      /**
100      * Transforms a Hadoop {@code Text} to an {@code EosDocument}.
101      * @param eosDoc the document as Hadoop {@code Text}.
102      * @return a deserialized document
103      * @throws Exception if an error occurs
104      * @throws IOException if an I/O error occurs
105      */
106     @SuppressWarnings("nls")
107     protected EosDocument textToEosDocument(final Text eosDoc)
108             throws Exception, IOException {
109         final Serializer serializer = getSerializer();
110         final CharSequence text = eosDoc.toString();
111         final Reader reader = new CharSequenceReader(text);
112         final EosDocument doc = serializer.deserialize(reader);
113         if (LOG.isDebugEnabled()) {
114             LOG.debug("doc: " + doc);
115         }
116 
117         return doc;
118     }
119 
120     @Override
121     public void configure(final JobConf conf) {
122         super.configure(conf);
123         this.conf = conf;
124     }
125 
126     @Override
127     public void close() throws IOException {
128         super.close();
129     }
130 }