View Javadoc

1   /* Licensed to the Apache Software Foundation (ASF) under one or more
2    * contributor license agreements.  See the NOTICE file distributed with
3    * this work for additional information regarding copyright ownership.
4    * The ASF licenses this file to You under the Apache License, Version 2.0
5    * (the "License"); you may not use this file except in compliance with
6    * the License.  You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.sf.eos.hadoop.mapred.index;
18  
19  
20  import static net.sf.eos.config.ConfigurationKey.Type.INTEGER;
21  import net.sf.eos.EosException;
22  import net.sf.eos.config.Configuration;
23  import net.sf.eos.config.ConfigurationKey;
24  import net.sf.eos.config.HadoopConfigurationAdapter;
25  import net.sf.eos.config.Service;
26  import net.sf.eos.lucene.AnalyzerSupplier;
27  import net.sf.eos.lucene.SimilaritySupplier;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.io.ObjectWritable;
34  import org.apache.hadoop.io.WritableComparable;
35  import org.apache.hadoop.mapred.JobConf;
36  import org.apache.hadoop.mapred.OutputFormatBase;
37  import org.apache.hadoop.mapred.RecordWriter;
38  import org.apache.hadoop.mapred.Reporter;
39  import org.apache.hadoop.util.Progressable;
40  import org.apache.lucene.analysis.Analyzer;
41  import org.apache.lucene.document.Document;
42  import org.apache.lucene.index.IndexWriter;
43  import org.apache.lucene.search.Similarity;
44  
45  import java.io.IOException;
46  import java.util.Random;
47  
48  /**
49   * Support to write a Lucene index in a Hadoop filesystem.
50   * <p>Parts are copied from Nutch source code.</p>
51   * @author Nutch Team
52   * @author Sascha Kohlmann
53   */
54  @Service(
55      factory=AnalyzerSupplier.class
56  )
57  public class LuceneOutputFormat<K extends WritableComparable,
58                                  V extends ObjectWritable>
59          extends OutputFormatBase<K, V> {
60  
61      /** The name of the merge factory value. Default value is 10. */
62      @ConfigurationKey(type=INTEGER,
63                              defaultValue="10",
64                              description="The merge factory value.")
65      public static final String MERGE_FACTOR_CONFIG_NAME =
66          "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.mergeFactor";
67  
68      /** The name of the max buffered docs value. Default value is 10. */
69      @ConfigurationKey(type=INTEGER,
70                              defaultValue="10",
71                              description="The max buffered docs value.")
72      public static final String MAX_BUFFERED_DOCS_CONFIG_NAME =
73          "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.maxBufferedDocs";
74  
75      /** The name of the max merge docs value. Default value is
76       * {@link Integer#MAX_VALUE}. */
77      @ConfigurationKey(type=INTEGER,
78                              defaultValue="" + Integer.MAX_VALUE,
79                              description="The max merge docs value.")
80      public static final String MAX_MERGE_DOCS_CONFIG_NAME =
81          "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.maxMergeDocs";
82  
83      /** The RAM buffer size in MB. Default value is 200. */
84      @ConfigurationKey(type=INTEGER,
85                              defaultValue="200",
86                              description="The RAM buffer size in MB.")
87      public static final String RAM_BUFFER_SIZE_MB_CONFIG_NAME =
88          "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.RAMBufferSizeMB";
89  
90      /** The maximum field length. Default value is 100000. */
91      @ConfigurationKey(type=INTEGER,
92                              defaultValue="100000",
93                              description="The maximum field length.")
94      public static final String MAX_FIELD_LENGTH_CONFIG_NAME =
95          "net.sf.eos.hadoop.lucene.LuceneOutputFormat.writer.maxFieldLength";
96  
97      public static final String DONE_NAME = "index.done";
98  
99      /** For logging. */
100     private static final Log LOG =
101         LogFactory.getLog(LuceneOutputFormat.class.getName());
102 
103     /**
104      * To configure see <code><em>XXX</em>_CONFIG_NAME</code> keys. Uses
105      * internally the instances of {@link AnalyzerSupplier} and
106      * {@link SimilaritySupplier}.
107      */
108     @SuppressWarnings("cast")
109     @Override
110     public RecordWriter<K, V> getRecordWriter(final FileSystem fileSystem,
111                                               final JobConf job,
112                                               final String name,
113                                               final Progressable progress)
114             throws IOException {
115 
116         final Path perm = new Path(job.getOutputPath(), name);
117         final Path temp = job.getLocalPath(
118                 "index/_"+Integer.toString(new Random().nextInt()));
119 
120         if (LOG.isInfoEnabled()) {
121             LOG.info("path (perm): " + perm.getName());
122             LOG.info("path (temp): " + temp.getName());
123         }
124 
125         fileSystem.delete(perm);    // delete old, if any
126 
127         final Configuration config = new Configuration();
128         HadoopConfigurationAdapter.addHadoopConfigToEosConfig(job, config);
129 
130         try {
131             AnalyzerSupplier analyzerProvider =
132             	AnalyzerSupplier.newInstance(config);
133             final Analyzer analyzer = analyzerProvider.get();
134 
135             final IndexWriter writer =  // build locally first
136                 new IndexWriter(
137                         fileSystem.startLocalOutput(perm, temp).toString(),
138                         analyzer,
139                         true);
140 
141             final int mergeFactor = job.getInt(MERGE_FACTOR_CONFIG_NAME, 10);
142             writer.setMergeFactor(mergeFactor);
143             final int maxBufferedDocs = 
144                 job.getInt(MAX_BUFFERED_DOCS_CONFIG_NAME, 10);
145             writer.setMaxBufferedDocs(maxBufferedDocs);
146             final int maxMergeDocs = 
147                 job.getInt(MAX_MERGE_DOCS_CONFIG_NAME, Integer.MAX_VALUE);
148             writer.setMaxMergeDocs(maxMergeDocs);
149             final double ramBufferSize = (double)
150                 job.getFloat(RAM_BUFFER_SIZE_MB_CONFIG_NAME, 200.0f);
151             writer.setRAMBufferSizeMB(ramBufferSize);
152             final int maxFieldLength = 
153                 job.getInt(MAX_FIELD_LENGTH_CONFIG_NAME, 100000);
154             writer.setMaxFieldLength(maxFieldLength);
155 
156             final SimilaritySupplier similarityFactory =
157                 SimilaritySupplier.newInstance(config);
158             final Similarity similarity = similarityFactory.get();
159 
160             writer.setSimilarity(similarity);
161 
162             if (LOG.isDebugEnabled()) {
163                 LOG.debug("mergeFactor: " + mergeFactor);
164                 LOG.debug("maxBufferedDocs: " + maxBufferedDocs);
165                 LOG.debug("maxMergeDocs: " + maxMergeDocs);
166                 LOG.debug("RAMBufferSizeDB: " + ramBufferSize);
167                 LOG.debug("maxFieldLength: " + maxFieldLength);
168                 LOG.debug("Lucene Analyzer instance: "
169                           + analyzer.getClass().getName());
170                 LOG.debug("Lucene Similarity instance: "
171                           + similarity.getClass().getName());
172             }
173 
174             final RecordWriterImpl<K, V> recordWriter
175                 = new RecordWriterImpl<K, V>(fileSystem, writer, analyzer);
176             recordWriter.setPermanentPath(perm);
177             recordWriter.setTemporaryPath(temp);
178 
179             return recordWriter;
180 
181         } catch (final EosException e) {
182             final IOException ioe = new IOException();
183             ioe.initCause(e);
184             throw ioe;
185         }
186     }
187 
188     static class RecordWriterImpl<K extends WritableComparable,
189                                   V extends ObjectWritable>
190             implements RecordWriter<K, V> {
191 
192         private boolean closed;
193 
194         /** The logging of this class. */
195         private static final Log LOG =
196             LogFactory.getLog(RecordWriterImpl.class.getName());
197 
198         private IndexWriter writer;
199         private Analyzer luceneAnalyzer;
200         private FileSystem fs;
201         private Path permanentPath;
202         private Path temporaryPath;
203 
204         public RecordWriterImpl(final FileSystem fileSystem,
205                                 final IndexWriter indexWriter,
206                                 final Analyzer analyzer) {
207             this.writer = indexWriter;
208             this.luceneAnalyzer = analyzer;
209             this.fs = fileSystem;
210         }
211 
212         public void setTemporaryPath(final Path temp) {
213             this.temporaryPath = temp;
214         }
215 
216         public void setPermanentPath(final Path perm) {
217             this.permanentPath = perm;
218         }
219 
220         /** {@inheritDocs} */
221         public void write(final K key, final V value)
222                 throws IOException {
223 
224             final Document doc = (Document) value.get();
225             LOG.trace("Indexing document ....");
226             this.writer.addDocument(doc, this.luceneAnalyzer);
227         }
228 
229         /** {@inheritDocs} */
230         public void close(final Reporter reporter) throws IOException {
231             // spawn a thread to give progress heartbeats
232             final Thread prog = new Thread() {
233                 @Override
234                 public void run() {
235                     while (! RecordWriterImpl.this.closed) {
236                         try {
237                             reporter.setStatus("closing");
238                             Thread.sleep(1000);
239                         } catch (final InterruptedException e) {
240                             continue; 
241                         } catch (Throwable e) {
242                             return;
243                         }
244                     }
245                 }
246             };
247 
248             try {
249                 prog.start();
250                 LOG.info("Optimizing index --> " + this.writer.docCount()
251                         + " documents.");
252                 // optimize & close index
253                 this.writer.optimize();
254                 this.writer.close();
255                 //copy to dfs
256                 this.fs.completeLocalOutput(this.permanentPath,
257                                             this.temporaryPath);
258 
259                 this.fs.createNewFile(new Path(this.permanentPath, DONE_NAME));
260             } finally {
261                 this.closed = true;
262             }
263         }
264     };
265 }