1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package net.sf.eos.hadoop.mapred;
17  
18  import static net.sf.eos.util.Conditions.checkState;
19  
20  import org.apache.commons.cli.CommandLine;
21  import org.apache.commons.cli.GnuParser;
22  import org.apache.commons.cli.Options;
23  import org.apache.commons.cli.Parser;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.mapred.JobClient;
30  import org.apache.hadoop.mapred.JobConf;
31  import org.apache.hadoop.mapred.RunningJob;
32  import org.apache.hadoop.util.Tool;
33  
34  
35  
36  
37  
38  
39  
40  
41  public abstract class AbstractEosDriver extends Configured implements Tool {
42  
43      
44      private static final Log LOG =
45          LogFactory.getLog(AbstractEosDriver.class.getName());
46  
47      
48   
49      @SuppressWarnings("nls")
50      public static final String SOURCE_SHORT_CMD_ARG = "s";
51      
52   
53      @SuppressWarnings("nls")
54      public static final String SOURCE_LONG_CMD_ARG = "source";
55  
56      
57   
58      @SuppressWarnings("nls")
59      public static final String DESTINATION_SHORT_CMD_ARG = "d";
60      
61   
62      @SuppressWarnings("nls")
63      public static final String DESTINATION_LONG_CMD_ARG = "dest";
64  
65      @SuppressWarnings("nls")
66  
67      private JobConf jobConf = null;
68  
69      
70  
71  
72  
73  
74  
75  
76      @SuppressWarnings("nls")
77      public int run(final String[] args) throws Exception {
78          Configuration conf = getConf();
79          if (conf == null) {
80              conf = new Configuration();
81          }
82          this.jobConf = new JobConf(conf, this.getClass());
83          final Parser parser = new GnuParser();
84          final Options options = createOptions();
85          final CommandLine cmdLine = parser.parse(options, args);
86  
87          final String source = cmdLine.getOptionValue(SOURCE_SHORT_CMD_ARG);
88          final String dest = cmdLine.getOptionValue(DESTINATION_SHORT_CMD_ARG);
89  
90          if (source == null || source.length() == 0) {
91              LOG.warn("Arguments contains no source path.");
92          } else {
93              final Path in = new Path(source);
94              this.jobConf.setInputPath(in);
95          }
96          if (dest == null || dest.length() == 0) {
97              LOG.warn("Arguments contains no destination path.");
98          } else {
99              final Path out = new Path(dest);
100             this.jobConf.setOutputPath(out);
101         }
102 
103         return 0;
104     }
105 
106     
107 
108 
109 
110 
111 
112     @SuppressWarnings("nls")
113     protected final JobConf getJobConf() {
114         checkState(this.jobConf != null, "Called before run(String[]) thru super.");
115 
116         return this.jobConf;
117     }
118 
119     protected Options createOptions() {
120         return new Options()
121             .addOption(SOURCE_SHORT_CMD_ARG,
122                        SOURCE_LONG_CMD_ARG,
123                        true,
124                        "Path to the source folder of the input data")
125             .addOption(DESTINATION_SHORT_CMD_ARG,
126                        DESTINATION_LONG_CMD_ARG,
127                        true,
128                        "Path to the destination folder of the output data");
129     }
130 
131     
132 
133 
134 
135 
136 
137     protected int doJob(final JobConf conf) throws Exception {
138         final RunningJob job = JobClient.runJob(conf);
139         job.waitForCompletion();
140         return job.isSuccessful() ? 0 : 1;
141     }
142 }