1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.sf.eos.hadoop.mapred.index;
18
19
20 import static net.sf.eos.config.ConfigurationKey.Type.INTEGER;
21 import net.sf.eos.EosException;
22 import net.sf.eos.config.Configuration;
23 import net.sf.eos.config.ConfigurationKey;
24 import net.sf.eos.config.HadoopConfigurationAdapter;
25 import net.sf.eos.config.Service;
26 import net.sf.eos.lucene.AnalyzerSupplier;
27 import net.sf.eos.lucene.SimilaritySupplier;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.io.ObjectWritable;
34 import org.apache.hadoop.io.WritableComparable;
35 import org.apache.hadoop.mapred.JobConf;
36 import org.apache.hadoop.mapred.OutputFormatBase;
37 import org.apache.hadoop.mapred.RecordWriter;
38 import org.apache.hadoop.mapred.Reporter;
39 import org.apache.hadoop.util.Progressable;
40 import org.apache.lucene.analysis.Analyzer;
41 import org.apache.lucene.document.Document;
42 import org.apache.lucene.index.IndexWriter;
43 import org.apache.lucene.search.Similarity;
44
45 import java.io.IOException;
46 import java.util.Random;
47
48
49
50
51
52
53
54 @Service(
55 factory=AnalyzerSupplier.class
56 )
57 public class LuceneOutputFormat<K extends WritableComparable,
58 V extends ObjectWritable>
59 extends OutputFormatBase<K, V> {
60
61
62 @ConfigurationKey(type=INTEGER,
63 defaultValue="10",
64 description="The merge factory value.")
65 public static final String MERGE_FACTOR_CONFIG_NAME =
66 "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.mergeFactor";
67
68
69 @ConfigurationKey(type=INTEGER,
70 defaultValue="10",
71 description="The max buffered docs value.")
72 public static final String MAX_BUFFERED_DOCS_CONFIG_NAME =
73 "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.maxBufferedDocs";
74
75
76
77 @ConfigurationKey(type=INTEGER,
78 defaultValue="" + Integer.MAX_VALUE,
79 description="The max merge docs value.")
80 public static final String MAX_MERGE_DOCS_CONFIG_NAME =
81 "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.maxMergeDocs";
82
83
84 @ConfigurationKey(type=INTEGER,
85 defaultValue="200",
86 description="The RAM buffer size in MB.")
87 public static final String RAM_BUFFER_SIZE_MB_CONFIG_NAME =
88 "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.RAMBufferSizeMB";
89
90
91 @ConfigurationKey(type=INTEGER,
92 defaultValue="100000",
93 description="The maximum field length.")
94 public static final String MAX_FIELD_LENGTH_CONFIG_NAME =
95 "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.maxFieldLength";
96
97 public static final String DONE_NAME = "index.done";
98
99
100 private static final Log LOG =
101 LogFactory.getLog(LuceneOutputFormat.class.getName());
102
103
104
105
106
107
108 @SuppressWarnings("cast")
109 @Override
110 public RecordWriter<K, V> getRecordWriter(final FileSystem fileSystem,
111 final JobConf job,
112 final String name,
113 final Progressable progress)
114 throws IOException {
115
116 final Path perm = new Path(job.getOutputPath(), name);
117 final Path temp = job.getLocalPath(
118 "index/_"+Integer.toString(new Random().nextInt()));
119
120 if (LOG.isInfoEnabled()) {
121 LOG.info("path (perm): " + perm.getName());
122 LOG.info("path (temp): " + temp.getName());
123 }
124
125 fileSystem.delete(perm);
126
127 final Configuration config = new Configuration();
128 HadoopConfigurationAdapter.addHadoopConfigToEosConfig(job, config);
129
130 try {
131 AnalyzerSupplier analyzerProvider =
132 AnalyzerSupplier.newInstance(config);
133 final Analyzer analyzer = analyzerProvider.get();
134
135 final IndexWriter writer =
136 new IndexWriter(
137 fileSystem.startLocalOutput(perm, temp).toString(),
138 analyzer,
139 true);
140
141 final int mergeFactor = job.getInt(MERGE_FACTOR_CONFIG_NAME, 10);
142 writer.setMergeFactor(mergeFactor);
143 final int maxBufferedDocs =
144 job.getInt(MAX_BUFFERED_DOCS_CONFIG_NAME, 10);
145 writer.setMaxBufferedDocs(maxBufferedDocs);
146 final int maxMergeDocs =
147 job.getInt(MAX_MERGE_DOCS_CONFIG_NAME, Integer.MAX_VALUE);
148 writer.setMaxMergeDocs(maxMergeDocs);
149 final double ramBufferSize = (double)
150 job.getFloat(RAM_BUFFER_SIZE_MB_CONFIG_NAME, 200.0f);
151 writer.setRAMBufferSizeMB(ramBufferSize);
152 final int maxFieldLength =
153 job.getInt(MAX_FIELD_LENGTH_CONFIG_NAME, 100000);
154 writer.setMaxFieldLength(maxFieldLength);
155
156 final SimilaritySupplier similarityFactory =
157 SimilaritySupplier.newInstance(config);
158 final Similarity similarity = similarityFactory.get();
159
160 writer.setSimilarity(similarity);
161
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("mergeFactor: " + mergeFactor);
164 LOG.debug("maxBufferedDocs: " + maxBufferedDocs);
165 LOG.debug("maxMergeDocs: " + maxMergeDocs);
166 LOG.debug("RAMBufferSizeDB: " + ramBufferSize);
167 LOG.debug("maxFieldLength: " + maxFieldLength);
168 LOG.debug("Lucene Analyzer instance: "
169 + analyzer.getClass().getName());
170 LOG.debug("Lucene Similarity instance: "
171 + similarity.getClass().getName());
172 }
173
174 final RecordWriterImpl<K, V> recordWriter
175 = new RecordWriterImpl<K, V>(fileSystem, writer, analyzer);
176 recordWriter.setPermanentPath(perm);
177 recordWriter.setTemporaryPath(temp);
178
179 return recordWriter;
180
181 } catch (final EosException e) {
182 final IOException ioe = new IOException();
183 ioe.initCause(e);
184 throw ioe;
185 }
186 }
187
188 static class RecordWriterImpl<K extends WritableComparable,
189 V extends ObjectWritable>
190 implements RecordWriter<K, V> {
191
192 private boolean closed;
193
194
195 private static final Log LOG =
196 LogFactory.getLog(RecordWriterImpl.class.getName());
197
198 private IndexWriter writer;
199 private Analyzer luceneAnalyzer;
200 private FileSystem fs;
201 private Path permanentPath;
202 private Path temporaryPath;
203
204 public RecordWriterImpl(final FileSystem fileSystem,
205 final IndexWriter indexWriter,
206 final Analyzer analyzer) {
207 this.writer = indexWriter;
208 this.luceneAnalyzer = analyzer;
209 this.fs = fileSystem;
210 }
211
212 public void setTemporaryPath(final Path temp) {
213 this.temporaryPath = temp;
214 }
215
216 public void setPermanentPath(final Path perm) {
217 this.permanentPath = perm;
218 }
219
220
221 public void write(final K key, final V value)
222 throws IOException {
223
224 final Document doc = (Document) value.get();
225 LOG.trace("Indexing document ....");
226 this.writer.addDocument(doc, this.luceneAnalyzer);
227 }
228
229
230 public void close(final Reporter reporter) throws IOException {
231
232 final Thread prog = new Thread() {
233 @Override
234 public void run() {
235 while (! RecordWriterImpl.this.closed) {
236 try {
237 reporter.setStatus("closing");
238 Thread.sleep(1000);
239 } catch (final InterruptedException e) {
240 continue;
241 } catch (Throwable e) {
242 return;
243 }
244 }
245 }
246 };
247
248 try {
249 prog.start();
250 LOG.info("Optimizing index --> " + this.writer.docCount()
251 + " documents.");
252
253 this.writer.optimize();
254 this.writer.close();
255
256 this.fs.completeLocalOutput(this.permanentPath,
257 this.temporaryPath);
258
259 this.fs.createNewFile(new Path(this.permanentPath, DONE_NAME));
260 } finally {
261 this.closed = true;
262 }
263 }
264 };
265 }