View Javadoc

1   /* Copyright (c) 2008 Sascha Kohlmann
2    *
3    * This program is free software: you can redistribute it and/or modify
4    * it under the terms of the GNU Affero General Public License as published by
5    * the Free Software Foundation, either version 3 of the License, or
6    * (at your option) any later version.
7    *
8    * This program is distributed in the hope that it will be useful,
9    * but WITHOUT ANY WARRANTY; without even the implied warranty of
10   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   * GNU Affero General Public License for more details.
12   *
13   * You should have received a copy of the GNU Affero General Public License
14   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15   */
16  package net.sf.eos.hadoop.mapred;
17  
18  import static net.sf.eos.util.Conditions.checkState;
19  
20  import org.apache.commons.cli.CommandLine;
21  import org.apache.commons.cli.GnuParser;
22  import org.apache.commons.cli.Options;
23  import org.apache.commons.cli.Parser;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.mapred.JobClient;
30  import org.apache.hadoop.mapred.JobConf;
31  import org.apache.hadoop.mapred.RunningJob;
32  import org.apache.hadoop.util.Tool;
33  
34  /**
35   * Support for some generic configuration data. The {@link #run(String[])}
36   * analyze the parameter "<tt>-s</tt>" or "<tt>--source</tt>" for the source
37   * (<em>input</em>-Path) parameter and "<tt>-d</tt>" or "<tt>--dest</tt>" for
38   * the destination (<em>output</em>-Path).
39   * @author Sascha Kohlmann
40   */
41  public abstract class AbstractEosDriver extends Configured implements Tool {
42  
43      /** For logging. */
44      private static final Log LOG =
45          LogFactory.getLog(AbstractEosDriver.class.getName());
46  
47      /** Short commandline parameter name for the <em>source</em> or
48       * <em>input</em> path. */ 
49      @SuppressWarnings("nls")
50      public static final String SOURCE_SHORT_CMD_ARG = "s";
51      /** Long commandline parameter name for the <em>source</em> or
52       * <em>input</em> path. */ 
53      @SuppressWarnings("nls")
54      public static final String SOURCE_LONG_CMD_ARG = "source";
55  
56      /** Short commandline parameter name for the <em>destination</em> or
57       * <em>output</em> path. */ 
58      @SuppressWarnings("nls")
59      public static final String DESTINATION_SHORT_CMD_ARG = "d";
60      /** Long commandline parameter name for the <em>destination</em> or
61       * <em>output</em> path. */ 
62      @SuppressWarnings("nls")
63      public static final String DESTINATION_LONG_CMD_ARG = "dest";
64  
65      @SuppressWarnings("nls")
66  
67      private JobConf jobConf = null;
68  
69      /**
70       * Implementations of {@code AbstractEosDriver} must call
71       * {@code super.run(String[])}. After the call {@link #getJobConf()}
72       * returns a value != {@code null}.
73       * <p>Never starts a job. This is part of the implementing driver.</p>
74       * @return always zero.
75       */
76      @SuppressWarnings("nls")
77      public int run(final String[] args) throws Exception {
78          Configuration conf = getConf();
79          if (conf == null) {
80              conf = new Configuration();
81          }
82          this.jobConf = new JobConf(conf, this.getClass());
83          final Parser parser = new GnuParser();
84          final Options options = createOptions();
85          final CommandLine cmdLine = parser.parse(options, args);
86  
87          final String source = cmdLine.getOptionValue(SOURCE_SHORT_CMD_ARG);
88          final String dest = cmdLine.getOptionValue(DESTINATION_SHORT_CMD_ARG);
89  
90          if (source == null || source.length() == 0) {
91              LOG.warn("Arguments contains no source path.");
92          } else {
93              final Path in = new Path(source);
94              this.jobConf.setInputPath(in);
95          }
96          if (dest == null || dest.length() == 0) {
97              LOG.warn("Arguments contains no destination path.");
98          } else {
99              final Path out = new Path(dest);
100             this.jobConf.setOutputPath(out);
101         }
102 
103         return 0;
104     }
105 
106     /**
107      * Returns the job configuration. Throws an exception if method was called
108      * before an implementation calls {@link #run(String[])}.
109      * @return the job configuration
110      * @throws IllegalStateException if called before {@link #run(String[])}
111      */
112     @SuppressWarnings("nls")
113     protected final JobConf getJobConf() {
114         checkState(this.jobConf != null, "Called before run(String[]) thru super.");
115 
116         return this.jobConf;
117     }
118 
119     protected Options createOptions() {
120         return new Options()
121             .addOption(SOURCE_SHORT_CMD_ARG,
122                        SOURCE_LONG_CMD_ARG,
123                        true,
124                        "Path to the source folder of the input data")
125             .addOption(DESTINATION_SHORT_CMD_ARG,
126                        DESTINATION_LONG_CMD_ARG,
127                        true,
128                        "Path to the destination folder of the output data");
129     }
130 
131     /**
132      * Runs the job for the given configuration.
133      * @param conf the job configuration
134      * @return the job status. 0 if the job success. 1 if the job fails.
135      * @throws Exception if an error occurs
136      */
137     protected int doJob(final JobConf conf) throws Exception {
138         final RunningJob job = JobClient.runJob(conf);
139         job.waitForCompletion();
140         return job.isSuccessful() ? 0 : 1;
141     }
142 }