1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred.cooccurrence;
17
18 import net.sf.eos.hadoop.DistributedCacheStrategy;
19 import net.sf.eos.hadoop.FullyDistributedCacheStrategy;
20 import net.sf.eos.hadoop.mapred.AbstractEosDriver;
21 import net.sf.eos.trie.TrieLoader;
22
23 import org.apache.commons.cli.CommandLine;
24 import org.apache.commons.cli.GnuParser;
25 import org.apache.commons.cli.Option;
26 import org.apache.commons.cli.Options;
27 import org.apache.commons.cli.Parser;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.filecache.DistributedCache;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.io.Text;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.util.GenericOptionsParser;
36 import org.apache.hadoop.util.ToolRunner;
37
38
39
40
41
42
43
44
45
46 public class DictionaryBasedEntityRecognizerMapReduceDriver
47 extends AbstractEosDriver {
48
49
50
51
52
53 @SuppressWarnings("nls")
54 public static final String TRIE_SHORT_CMD_ARG = "t";
55
56
57
58
59
60 @SuppressWarnings("nls")
61 public static final String TRIE_LONG_CMD_ARG = "trie";
62
63
64 private static final Log LOG =
65 LogFactory.getLog(DictionaryBasedEntityRecognizerMapReduceDriver.class.getName());
66
67
68
69
70
71
72
73
74
75
76 public static void main(final String[] args) throws Exception {
77 final int res = ToolRunner.run(
78 new Configuration(),
79 new DictionaryBasedEntityRecognizerMapReduceDriver(),
80 args);
81 System.exit(res);
82 }
83
84
85 @Override
86 public int run(final String[] args) throws Exception {
87 super.run(args);
88 final JobConf conf = getJobConf();
89
90 final Parser parser = new GnuParser();
91 final Options options = createOptions();
92 final CommandLine cmdLine = parser.parse(options, args);
93
94 final String triePath = cmdLine.getOptionValue(TRIE_LONG_CMD_ARG);
95 if (triePath == null || triePath.length() == 0) {
96 LOG.fatal("No Trie data path given - exiting");
97 return 1;
98 }
99 LOG.info("Trie path: " + triePath);
100 DistributedCache.addCacheFile(new Path(triePath).toUri(), conf);
101
102 if (conf.get(DistributedCacheStrategy.STRATEGY_IMPL_CONFIG_NAME) == null) {
103 conf.set(DistributedCacheStrategy.STRATEGY_IMPL_CONFIG_NAME,
104 FullyDistributedCacheStrategy.class.getName());
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("No CacheStrategy given. Use '"
107 + FullyDistributedCacheStrategy.class.getName()
108 + "'");
109 }
110 }
111
112 conf.setJobName("\u03b5\u00b7\u03bf\u00b7s\u00b7\u00b7\u00b7 Entity");
113
114 conf.setOutputKeyClass(Text.class);
115 conf.setOutputValueClass(Text.class);
116 conf.setMapOutputKeyClass(Text.class);
117 conf.setMapOutputValueClass(Text.class);
118
119 conf.setMapperClass(DictionaryBasedEntityRecognizerMapper.class);
120 conf.setReducerClass(DictionaryBasedEntityRecognizerReducer.class);
121
122 return doJob(conf);
123 }
124
125 @Override
126 protected Options createOptions() {
127 final Options options = super.createOptions();
128 final Option option =
129 new Option(TRIE_SHORT_CMD_ARG,
130 TRIE_LONG_CMD_ARG,
131 true,
132 "Path to trie data");
133 option.setRequired(true);
134
135 return options.addOption(option);
136 }
137 }