1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.sf.eos.hadoop.mapred;
17
18 import static net.sf.eos.util.Conditions.checkState;
19
20 import org.apache.commons.cli.CommandLine;
21 import org.apache.commons.cli.GnuParser;
22 import org.apache.commons.cli.Options;
23 import org.apache.commons.cli.Parser;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.conf.Configured;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.mapred.JobClient;
30 import org.apache.hadoop.mapred.JobConf;
31 import org.apache.hadoop.mapred.RunningJob;
32 import org.apache.hadoop.util.Tool;
33
34
35
36
37
38
39
40
41 public abstract class AbstractEosDriver extends Configured implements Tool {
42
43
44 private static final Log LOG =
45 LogFactory.getLog(AbstractEosDriver.class.getName());
46
47
48
49 @SuppressWarnings("nls")
50 public static final String SOURCE_SHORT_CMD_ARG = "s";
51
52
53 @SuppressWarnings("nls")
54 public static final String SOURCE_LONG_CMD_ARG = "source";
55
56
57
58 @SuppressWarnings("nls")
59 public static final String DESTINATION_SHORT_CMD_ARG = "d";
60
61
62 @SuppressWarnings("nls")
63 public static final String DESTINATION_LONG_CMD_ARG = "dest";
64
65 @SuppressWarnings("nls")
66
67 private JobConf jobConf = null;
68
69
70
71
72
73
74
75
76 @SuppressWarnings("nls")
77 public int run(final String[] args) throws Exception {
78 Configuration conf = getConf();
79 if (conf == null) {
80 conf = new Configuration();
81 }
82 this.jobConf = new JobConf(conf, this.getClass());
83 final Parser parser = new GnuParser();
84 final Options options = createOptions();
85 final CommandLine cmdLine = parser.parse(options, args);
86
87 final String source = cmdLine.getOptionValue(SOURCE_SHORT_CMD_ARG);
88 final String dest = cmdLine.getOptionValue(DESTINATION_SHORT_CMD_ARG);
89
90 if (source == null || source.length() == 0) {
91 LOG.warn("Arguments contains no source path.");
92 } else {
93 final Path in = new Path(source);
94 this.jobConf.setInputPath(in);
95 }
96 if (dest == null || dest.length() == 0) {
97 LOG.warn("Arguments contains no destination path.");
98 } else {
99 final Path out = new Path(dest);
100 this.jobConf.setOutputPath(out);
101 }
102
103 return 0;
104 }
105
106
107
108
109
110
111
112 @SuppressWarnings("nls")
113 protected final JobConf getJobConf() {
114 checkState(this.jobConf != null, "Called before run(String[]) thru super.");
115
116 return this.jobConf;
117 }
118
119 protected Options createOptions() {
120 return new Options()
121 .addOption(SOURCE_SHORT_CMD_ARG,
122 SOURCE_LONG_CMD_ARG,
123 true,
124 "Path to the source folder of the input data")
125 .addOption(DESTINATION_SHORT_CMD_ARG,
126 DESTINATION_LONG_CMD_ARG,
127 true,
128 "Path to the destination folder of the output data");
129 }
130
131
132
133
134
135
136
137 protected int doJob(final JobConf conf) throws Exception {
138 final RunningJob job = JobClient.runJob(conf);
139 job.waitForCompletion();
140 return job.isSuccessful() ? 0 : 1;
141 }
142 }