Creating Amazon EMR job using Java API

There are few documents about the Amazon EMR Java API usage, in case you are finding codes configuring and starting EMR job using Java, here are the codes I use:

(a brief official sample can be found here, the file contains the following code is Here)

The codes include 2 steps, 1 for debugging and 1 for my own job. You can delete the debugging one if you want. (The code is modified based on some code I borrowed online, which I forget where I found it…thanks to the original author).

public class EMRStarter {

	private static final String HADOOP_VERSION = "0.20";
	private static final int INSTANCE_COUNT = 2;
	private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();
	private static final String FLOW_NAME = "EMR Test";
	private static final String BUCKET_NAME = "my-bucket";
	private static final String S3N_HADOOP_JAR = "s3://" + BUCKET_NAME + "/jar/InventoryStorageCost.jar"; // jar
	private static final String outputDir = "test";
	private static final String S3N_LOG_URI = "s3://" + BUCKET_NAME + "/log/" + outputDir;
	private static final String[] JOB_ARGS = new String[] { 
			"s3://path",
			"s3://path",
			"s3://path",
			"arg"
			};

	private static final List ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
	private static final List DONE_STATES = Arrays.asList(new JobFlowExecutionState[] {
			JobFlowExecutionState.COMPLETED, JobFlowExecutionState.FAILED, JobFlowExecutionState.TERMINATED 
        });

	static AmazonElasticMapReduce emr;

	/**
	 * 
	 * The only information needed to create a client are security credentials * consisting of the AWS Access Key ID and
	 * Secret Access Key. All other * configuration, such as the service end points, are performed * automatically.
	 * Client parameters, such as proxies, can be specified in an * optional ClientConfiguration object when
	 * constructing a client.
	 * 
	 * @see com.amazonaws.auth.BasicAWSCredentials
	 * 
	 * @see com.amazonaws.auth.PropertiesCredentials
	 * 
	 * @see com.amazonaws.ClientConfiguration
	 */

	private static void init() throws Exception {
		File file = new File("AwsCredentials.properties");
		AWSCredentials credentials = new PropertiesCredentials(file);
		emr = new AmazonElasticMapReduceClient(credentials);
	}

	public static void main(String[] args) throws Exception {
		System.out.println("===========================================");
		System.out.println("Welcome to the Elastic Map Reduce!");
		System.out.println("===========================================");

		init();

		StepFactory stepFactory = new StepFactory();
		// create the debugging step
		StepConfig enableDebugging = new StepConfig().withName("Enable Debugging")
				.withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep());

		try {
			// Configure instances to use
			JobFlowInstancesConfig instances = new JobFlowInstancesConfig();

			System.out.println("Using EMR Hadoop v" + HADOOP_VERSION);
			instances.setHadoopVersion(HADOOP_VERSION);

			System.out.println("Using instance count: " + INSTANCE_COUNT);
			instances.setInstanceCount(INSTANCE_COUNT);

			System.out.println("Using master instance type: " + INSTANCE_TYPE);
			instances.setMasterInstanceType(INSTANCE_TYPE);

			System.out.println("Using slave instance type: " + INSTANCE_TYPE);
			instances.setSlaveInstanceType(INSTANCE_TYPE);

			// Configure the job flow
			System.out.println("Configuring flow: " + FLOW_NAME);
			RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);

			System.out.println("tusing log URI: " + S3N_LOG_URI);
			request.setLogUri(S3N_LOG_URI);

			// Configure the Hadoop jar to use
			System.out.println("tusing jar URI: " + S3N_HADOOP_JAR);
			HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
			System.out.println("tusing args: " + ARGS_AS_LIST);
			jarConfig.setArgs(ARGS_AS_LIST);

			// main step
			StepConfig stepConfig = new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1), jarConfig);
			request.setSteps(Arrays.asList(new StepConfig[] { enableDebugging, stepConfig }));

			// Run the job flow
			RunJobFlowResult result = emr.runJobFlow(request);
			// Check the status of the running job
			String lastState = "";
			STATUS_LOOP: while (true) {
				DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] { result
						.getJobFlowId() }));

				DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);

				for (JobFlowDetail detail : descResult.getJobFlows()) {
					String state = detail.getExecutionStatusDetail().getState();
					if (isDone(state)) {
						System.out.println("Job " + state + ": " + detail.toString());
						break STATUS_LOOP;
					} else if (!lastState.equals(state)) {
						lastState = state;
						System.out.println("Job " + state + " at " + new Date().toString());
					}
				}

				Thread.sleep(10000);

			}

		} catch (AmazonServiceException ase) {
			System.out.println("Caught Exception: " + ase.getMessage());
			System.out.println("Reponse Status Code: " + ase.getStatusCode());
			System.out.println("Error Code: " + ase.getErrorCode());
			System.out.println("Request ID: " + ase.getRequestId());
		}

	}

	/**
	 * 
	 * @param value
	 * 
	 * @return
	 */

	public static boolean isDone(String value) {
		JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
		return DONE_STATES.contains(state);
	}

}

Leave a Reply

Your email address will not be published. Required fields are marked *