View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package net.sf.eos.hadoop.mapred.index;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.conf.Configured;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.util.StringUtils;
32  import org.apache.hadoop.util.Tool;
33  import org.apache.hadoop.util.ToolRunner;
34  import org.apache.lucene.index.IndexWriter;
35  import org.apache.lucene.store.Directory;
36  
37  /**
38   * IndexMerger creates an index for the output corresponding to a
39   * single fetcher run.
40   *
41   * @author Doug Cutting
42   * @author Mike Cafarella
43   */
44  public class IndexMerger extends Configured implements Tool {
45  
46      public static final Log LOG = LogFactory.getLog(IndexMerger.class);
47  
48    public static final String DONE_NAME = "merge.done";
49  
50    public IndexMerger() {
51  
52    }
53  
54    public IndexMerger(final Configuration conf) {
55      setConf(conf);
56    }
57  
58    /**
59     * Merge all input indexes to the single output index
60     */
61    public void merge(Path[] indexes, Path outputIndex, Path localWorkingDir)
62            throws IOException {
63  
64      if (LOG.isInfoEnabled()) {
65        LOG.info("merging indexes to: " + outputIndex);
66      }
67      final FileSystem localFs = FileSystem.getLocal(getConf());
68  
69      if (localWorkingDir == null) {
70        localWorkingDir = new Path("indexmerger-" + System.currentTimeMillis());
71      }
72  
73      if (localFs.exists(localWorkingDir)) {
74        localFs.delete(localWorkingDir);
75      }
76  
77      localFs.mkdirs(localWorkingDir);
78  
79      // Get local output target //
80      final FileSystem fs = FileSystem.get(getConf());
81      final Path tmpLocalOutput = new Path(localWorkingDir, "merge-output");
82      final Path localOutput = fs.startLocalOutput(outputIndex, tmpLocalOutput);
83  
84      final Directory[] dirs = new Directory[indexes.length];
85      for (int i = 0; i < indexes.length; i++) {
86        if (LOG.isInfoEnabled()) { LOG.info("Adding " + indexes[i]); }
87        dirs[i] = new FsDirectory(fs, indexes[i], false, getConf());
88      }
89  
90      // Merge indices //
91      IndexWriter writer = new IndexWriter(localOutput.toString(), null, true);
92      writer.setMergeFactor(getConf().getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
93      writer.setMaxBufferedDocs(getConf().getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
94      writer.setMaxMergeDocs(getConf().getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
95      writer.setTermIndexInterval(getConf().getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
96      writer.setUseCompoundFile(false);
97      writer.addIndexes(dirs);
98      writer.close();
99  
100     // Put target back //
101     LOG.info("completeLocalOutput: from '" + tmpLocalOutput.toString() + "' to '" + outputIndex.toString() + "'");
102     fs.completeLocalOutput(outputIndex, tmpLocalOutput);
103     FileSystem.getLocal(getConf()).delete(localWorkingDir);
104     if (LOG.isInfoEnabled()) {  
105         LOG.info("done merging");
106     }
107 
108   }
109 
110   public int run(String[] args) throws Exception {
111     String usage = "IndexMerger [-workingdir <workingdir>] outputIndex indexesDir...";
112     if (args.length < 2) {
113       System.err.println("Usage: " + usage);
114       return -1;
115     }
116 
117     //
118     // Parse args, read all index directories to be processed
119     //
120     final FileSystem fs = FileSystem.get(getConf());
121     List<Path> indexDirs = new ArrayList<Path>();
122 
123     Path workDir = null;
124     int i = 0;
125     if ("-workingdir".equals(args[i])) {
126       i++;
127       workDir = new Path(args[i++], "indexmerger-" + System.currentTimeMillis());
128     }
129 
130     Path outputIndex = new Path(args[i++]);
131 
132     for (; i < args.length; i++) {
133       indexDirs.addAll(Arrays.asList(fs.listPaths(new Path[] {new Path(args[i])})));
134     }
135 
136     //
137     // Merge the indices
138     //
139 
140     Path[] indexFiles = (Path[])indexDirs.toArray(new Path[indexDirs.size()]);
141 
142     try {
143       merge(indexFiles, outputIndex, workDir);
144       return 0;
145     } catch (Exception e) {
146       LOG.fatal("IndexMerger: " + StringUtils.stringifyException(e));
147       return -1;
148     }
149   }
150   
151   /**
152    * @param args
153    */
154   public static void main(final String[] args) throws Exception {
155       final int res = ToolRunner.run(new Configuration(),
156                                      new IndexMerger(),
157                                      args);
158       System.exit(res);
159   }
160 
161 }