1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package net.sf.eos.hadoop.mapred.index;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.conf.Configured;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.util.StringUtils;
32 import org.apache.hadoop.util.Tool;
33 import org.apache.hadoop.util.ToolRunner;
34 import org.apache.lucene.index.IndexWriter;
35 import org.apache.lucene.store.Directory;
36
37
38
39
40
41
42
43
44 public class IndexMerger extends Configured implements Tool {
45
46 public static final Log LOG = LogFactory.getLog(IndexMerger.class);
47
48 public static final String DONE_NAME = "merge.done";
49
50 public IndexMerger() {
51
52 }
53
54 public IndexMerger(final Configuration conf) {
55 setConf(conf);
56 }
57
58
59
60
61 public void merge(Path[] indexes, Path outputIndex, Path localWorkingDir)
62 throws IOException {
63
64 if (LOG.isInfoEnabled()) {
65 LOG.info("merging indexes to: " + outputIndex);
66 }
67 final FileSystem localFs = FileSystem.getLocal(getConf());
68
69 if (localWorkingDir == null) {
70 localWorkingDir = new Path("indexmerger-" + System.currentTimeMillis());
71 }
72
73 if (localFs.exists(localWorkingDir)) {
74 localFs.delete(localWorkingDir);
75 }
76
77 localFs.mkdirs(localWorkingDir);
78
79
80 final FileSystem fs = FileSystem.get(getConf());
81 final Path tmpLocalOutput = new Path(localWorkingDir, "merge-output");
82 final Path localOutput = fs.startLocalOutput(outputIndex, tmpLocalOutput);
83
84 final Directory[] dirs = new Directory[indexes.length];
85 for (int i = 0; i < indexes.length; i++) {
86 if (LOG.isInfoEnabled()) { LOG.info("Adding " + indexes[i]); }
87 dirs[i] = new FsDirectory(fs, indexes[i], false, getConf());
88 }
89
90
91 IndexWriter writer = new IndexWriter(localOutput.toString(), null, true);
92 writer.setMergeFactor(getConf().getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
93 writer.setMaxBufferedDocs(getConf().getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
94 writer.setMaxMergeDocs(getConf().getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
95 writer.setTermIndexInterval(getConf().getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
96 writer.setUseCompoundFile(false);
97 writer.addIndexes(dirs);
98 writer.close();
99
100
101 LOG.info("completeLocalOutput: from '" + tmpLocalOutput.toString() + "' to '" + outputIndex.toString() + "'");
102 fs.completeLocalOutput(outputIndex, tmpLocalOutput);
103 FileSystem.getLocal(getConf()).delete(localWorkingDir);
104 if (LOG.isInfoEnabled()) {
105 LOG.info("done merging");
106 }
107
108 }
109
110 public int run(String[] args) throws Exception {
111 String usage = "IndexMerger [-workingdir <workingdir>] outputIndex indexesDir...";
112 if (args.length < 2) {
113 System.err.println("Usage: " + usage);
114 return -1;
115 }
116
117
118
119
120 final FileSystem fs = FileSystem.get(getConf());
121 List<Path> indexDirs = new ArrayList<Path>();
122
123 Path workDir = null;
124 int i = 0;
125 if ("-workingdir".equals(args[i])) {
126 i++;
127 workDir = new Path(args[i++], "indexmerger-" + System.currentTimeMillis());
128 }
129
130 Path outputIndex = new Path(args[i++]);
131
132 for (; i < args.length; i++) {
133 indexDirs.addAll(Arrays.asList(fs.listPaths(new Path[] {new Path(args[i])})));
134 }
135
136
137
138
139
140 Path[] indexFiles = (Path[])indexDirs.toArray(new Path[indexDirs.size()]);
141
142 try {
143 merge(indexFiles, outputIndex, workDir);
144 return 0;
145 } catch (Exception e) {
146 LOG.fatal("IndexMerger: " + StringUtils.stringifyException(e));
147 return -1;
148 }
149 }
150
151
152
153
154 public static void main(final String[] args) throws Exception {
155 final int res = ToolRunner.run(new Configuration(),
156 new IndexMerger(),
157 args);
158 System.exit(res);
159 }
160
161 }