1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred;
17
18 import net.sf.eos.EosException;
19 import net.sf.eos.config.Configuration;
20 import net.sf.eos.config.HadoopConfigurationAdapter;
21 import net.sf.eos.config.Service;
22 import net.sf.eos.document.EosDocument;
23 import net.sf.eos.document.Serializer;
24 import net.sf.eos.document.XmlSerializer;
25
26 import org.apache.commons.io.input.CharSequenceReader;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.io.Text;
30 import org.apache.hadoop.mapred.JobConf;
31 import org.apache.hadoop.mapred.MapReduceBase;
32
33 import java.io.IOException;
34 import java.io.Reader;
35 import java.io.StringWriter;
36 import java.io.Writer;
37
38
39
40
41
42 @Service(
43 factory=Serializer.class,
44 implementation=XmlSerializer.class,
45 description="Implementations support the serialization and deserialization "
46 + "of EosDocuments."
47 )
48 public abstract class EosDocumentSupportMapReduceBase extends MapReduceBase {
49
50
51 private static final Log LOG =
52 LogFactory.getLog(EosDocumentSupportMapReduceBase.class.getName());
53
54 private JobConf conf;
55
56
57
58
59
60
61
62
63 @SuppressWarnings("nls")
64 protected Serializer getSerializer() throws EosException {
65 assert this.conf != null;
66
67 final Configuration config = new HadoopConfigurationAdapter(this.conf);
68 final Serializer serializer = Serializer.newInstance(config);
69
70 if (LOG.isDebugEnabled()) {
71 LOG.debug("Serializer instanceof " + serializer.getClass());
72 }
73
74 return serializer;
75 }
76
77
78
79
80
81
82
83
84 @SuppressWarnings("nls")
85 protected Text eosDocumentToText(final EosDocument doc)
86 throws IOException, Exception {
87 final Serializer serializer = getSerializer();
88 final Writer writer = new StringWriter();
89 serializer.serialize(doc, writer);
90 final String docAsString = writer.toString();
91 final Text docAsText = new Text(docAsString);
92 if (LOG.isDebugEnabled()) {
93 LOG.debug("seralized EosDocument: " + docAsText);
94 }
95
96 return docAsText;
97 }
98
99
100
101
102
103
104
105
106 @SuppressWarnings("nls")
107 protected EosDocument textToEosDocument(final Text eosDoc)
108 throws Exception, IOException {
109 final Serializer serializer = getSerializer();
110 final CharSequence text = eosDoc.toString();
111 final Reader reader = new CharSequenceReader(text);
112 final EosDocument doc = serializer.deserialize(reader);
113 if (LOG.isDebugEnabled()) {
114 LOG.debug("doc: " + doc);
115 }
116
117 return doc;
118 }
119
120 @Override
121 public void configure(final JobConf conf) {
122 super.configure(conf);
123 this.conf = conf;
124 }
125
126 @Override
127 public void close() throws IOException {
128 super.close();
129 }
130 }