MultipleInputs is a feature that supports different input formats in the MapReduce.
For example, we have two files with different formats:
(1) First file format:
VALUE
(2) Second file format:
VALUE ADDITIONAL
In order to read the custom format, we need to write Record Class, RecordReader, InputFormat for each one.
InputFormat is needed by MultipleInputs, an InputFormat use RecordReader to read the file and return value, the value is a Record Class instance
Here is the implementation:
( if you write them in one big file, you need to add “static” modifier to each class, I wrote them in one big file in order to test easily)
My Hadoop version is 0.20.2, lower version may has bugs
1. First file format:
(1) Record Class (must implements Writable)
public static class FirstClass implements Writable { private String value; public FirstClass() { this.value = "TEST"; } public FirstClass(String val) { this.value = val; } @Override public void readFields(DataInput in) throws IOException { if (null == in) { throw new IllegalArgumentException("in cannot be null"); } String value = in.readUTF(); this.value = value.trim(); } @Override public void write(DataOutput out) throws IOException { if (null == out) { throw new IllegalArgumentException("out cannot be null"); } out.writeUTF(this.value); } @Override public String toString() { return "FirstClasst" + value; } }
(2) RecordReader
public static class FirstClassReader extends RecordReader<Text, FirstClass> { private LineRecordReader lineRecordReader = null; private Text key = null; private FirstClass valueFirstClass = null; @Override public void close() throws IOException { if (null != lineRecordReader) { lineRecordReader.close(); lineRecordReader = null; } key = null; valueFirstClass = null; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public FirstClass getCurrentValue() throws IOException, InterruptedException { return valueFirstClass; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { close(); lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(split, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { key = null; valueFirstClass = null; return false; } // otherwise, take the line and parse it Text line = lineRecordReader.getCurrentValue(); String str = line.toString(); System.out.println("FirstClass:" + str); String[] arr = str.split("t", -1); key = new Text(arr[0].trim()); valueFirstClass = new FirstClass(arr[1].trim()); return true; } }
(3) InputFormat
public static class FirstInputFormat extends FileInputFormat<Text, FirstClass> { @Override public RecordReader<Text, FirstClass> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new FirstClassReader(); } }
2. Second file format:
(1) Record Class (must implements Writable)
public static class SecondClass implements Writable { private String value; private int additional; public SecondClass() { this.value = "TEST"; this.additional = 0; } public SecondClass(String val, int addi) { this.value = val; this.additional = addi; } @Override public void readFields(DataInput in) throws IOException { if (null == in) { throw new IllegalArgumentException("in cannot be null"); } String value = in.readUTF(); int addi = in.readInt(); this.value = value.trim(); this.additional = addi; } @Override public void write(DataOutput out) throws IOException { if (null == out) { throw new IllegalArgumentException("out cannot be null"); } out.writeUTF(this.value); out.writeInt(this.additional); } @Override public String toString() { return "SecondClasst" + value + "t" + additional; } }
(2) RecordReader
public static class SecondClassReader extends RecordReader<Text, SecondClass> { private LineRecordReader lineRecordReader = null; private Text key = null; private SecondClass valueSecondClass = null; @Override public void close() throws IOException { if (null != lineRecordReader) { lineRecordReader.close(); lineRecordReader = null; } key = null; valueSecondClass = null; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public SecondClass getCurrentValue() throws IOException, InterruptedException { return valueSecondClass; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { close(); lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(split, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { key = null; valueSecondClass = null; return false; } // otherwise, take the line and parse it Text line = lineRecordReader.getCurrentValue(); String str = line.toString(); System.out.println("SecondClass:" + str); String[] arr = str.split("t", -1); int addi = Integer.parseInt(arr[2]); key = new Text(arr[0].trim()); valueSecondClass = new SecondClass(arr[1].trim(), addi); return true; } }
(3) InputFormat
public static class SecondInputFormat extends FileInputFormat<Text, SecondClass> { @Override public RecordReader<Text, SecondClass> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new SecondClassReader(); } }
3. Now we write Mapper for each file format
(1) FirstMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer)
public static class FirstMap extends Mapper<Text, FirstClass, Text, Text> { public void map(Text key, FirstClass value, Context context) throws IOException, InterruptedException { System.out.println("FirstMap:" + key.toString() + " " + value.toString()); context.write(key, new Text(value.toString())); } }
(2) SecondMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer)
public static class SecondMap extends Mapper<Text, SecondClass, Text, Text> { public void map(Text key, SecondClass value, Context context) throws IOException, InterruptedException { System.out.println("SecondMap:" + key.toString() + " " + value.toString()); context.write(key, new Text(value.toString())); } }
4. Write Reducer, IMPORTANT: you can only use one reducer, so if in your mappers you want to output different key/value type, you need to use GenericWritable to wrap up them.
public static class MyReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { System.out.println("Reduce:" + key.toString() + " " + value.toString()); context.write(key, value); } } }
5. In the Driver, we need to specify the multiple input format for MultipleInput
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Path firstPath = new Path(args[0]); Path sencondPath = new Path(args[1]); Path outputPath = new Path(args[2]); Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(MultipleInputsTest.class); job.setJobName("MultipleInputs Test"); job.setReducerClass(MyReducer.class); //output format for mapper job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //output format for reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //use MultipleOutputs and specify different Record class and Input formats MultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class); MultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); }
The explanation is in detail along with the program. Thanks a lot !!!
It would be great if you can provide sample input files to execute and check the program.
can i use dbinputformat and a textinputformat at a time to simultaneously work on database data and file data.
I never try this before..my guess is yes
Can I read the file contents and using another mapper and reducer to find the count of particular words using this?