There are few documents about the Amazon EMR Java API usage, in case you are finding codes configuring and starting EMR job using Java, here are the codes I use:
(a brief official sample can be found here, the file contains the following code is Here)
The codes include 2 steps, 1 for debugging and 1 for my own job. You can delete the debugging one if you want. (The code is modified based on some code I borrowed online, which I forget where I found it…thanks to the original author).
public class EMRStarter { private static final String HADOOP_VERSION = "0.20"; private static final int INSTANCE_COUNT = 2; private static final String INSTANCE_TYPE = InstanceType.M1Small.toString(); private static final String FLOW_NAME = "EMR Test"; private static final String BUCKET_NAME = "my-bucket"; private static final String S3N_HADOOP_JAR = "s3://" + BUCKET_NAME + "/jar/InventoryStorageCost.jar"; // jar private static final String outputDir = "test"; private static final String S3N_LOG_URI = "s3://" + BUCKET_NAME + "/log/" + outputDir; private static final String[] JOB_ARGS = new String[] { "s3://path", "s3://path", "s3://path", "arg" }; private static final ListARGS_AS_LIST = Arrays.asList(JOB_ARGS); private static final List DONE_STATES = Arrays.asList(new JobFlowExecutionState[] { JobFlowExecutionState.COMPLETED, JobFlowExecutionState.FAILED, JobFlowExecutionState.TERMINATED }); static AmazonElasticMapReduce emr; /** * * The only information needed to create a client are security credentials * consisting of the AWS Access Key ID and * Secret Access Key. All other * configuration, such as the service end points, are performed * automatically. * Client parameters, such as proxies, can be specified in an * optional ClientConfiguration object when * constructing a client. * * @see com.amazonaws.auth.BasicAWSCredentials * * @see com.amazonaws.auth.PropertiesCredentials * * @see com.amazonaws.ClientConfiguration */ private static void init() throws Exception { File file = new File("AwsCredentials.properties"); AWSCredentials credentials = new PropertiesCredentials(file); emr = new AmazonElasticMapReduceClient(credentials); } public static void main(String[] args) throws Exception { System.out.println("==========================================="); System.out.println("Welcome to the Elastic Map Reduce!"); System.out.println("==========================================="); init(); StepFactory stepFactory = new StepFactory(); // create the debugging step StepConfig enableDebugging = new StepConfig().withName("Enable Debugging") .withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep()); try { // Configure instances to use JobFlowInstancesConfig instances = new JobFlowInstancesConfig(); System.out.println("Using EMR Hadoop v" + HADOOP_VERSION); instances.setHadoopVersion(HADOOP_VERSION); System.out.println("Using instance count: " + INSTANCE_COUNT); instances.setInstanceCount(INSTANCE_COUNT); System.out.println("Using master instance type: " + INSTANCE_TYPE); instances.setMasterInstanceType(INSTANCE_TYPE); System.out.println("Using slave instance type: " + INSTANCE_TYPE); instances.setSlaveInstanceType(INSTANCE_TYPE); // Configure the job flow System.out.println("Configuring flow: " + FLOW_NAME); RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances); System.out.println("tusing log URI: " + S3N_LOG_URI); request.setLogUri(S3N_LOG_URI); // Configure the Hadoop jar to use System.out.println("tusing jar URI: " + S3N_HADOOP_JAR); HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR); System.out.println("tusing args: " + ARGS_AS_LIST); jarConfig.setArgs(ARGS_AS_LIST); // main step StepConfig stepConfig = new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1), jarConfig); request.setSteps(Arrays.asList(new StepConfig[] { enableDebugging, stepConfig })); // Run the job flow RunJobFlowResult result = emr.runJobFlow(request); // Check the status of the running job String lastState = ""; STATUS_LOOP: while (true) { DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] { result .getJobFlowId() })); DescribeJobFlowsResult descResult = emr.describeJobFlows(desc); for (JobFlowDetail detail : descResult.getJobFlows()) { String state = detail.getExecutionStatusDetail().getState(); if (isDone(state)) { System.out.println("Job " + state + ": " + detail.toString()); break STATUS_LOOP; } else if (!lastState.equals(state)) { lastState = state; System.out.println("Job " + state + " at " + new Date().toString()); } } Thread.sleep(10000); } } catch (AmazonServiceException ase) { System.out.println("Caught Exception: " + ase.getMessage()); System.out.println("Reponse Status Code: " + ase.getStatusCode()); System.out.println("Error Code: " + ase.getErrorCode()); System.out.println("Request ID: " + ase.getRequestId()); } } /** * * @param value * * @return */ public static boolean isDone(String value) { JobFlowExecutionState state = JobFlowExecutionState.fromValue(value); return DONE_STATES.contains(state); } }