There are two steps for each input format, namely, getSplit and recordReader. GetSplit transforms the original input data into splits, one split for each mapper. Typically, if a big file is feed into mapreduce, it will be divided into splits of block size (e.g., 64M or 128M). However, if small files are provided, mapreduce will treat each file as a split. RecordReader turn each split into key value pairs, which are used by the mapper later.
In this article, I use several examples to explain how an inputformat be implemented. I’m greatly influenced by a few great references, such as this and this.
The first example is from book “Hadoop: The Definitive Guide”. WholeFileInputFormat treats each individual input file as a value. We do not need to implement our own getSplit function. Once the isSplitable function returns false, no file will be divided. Each file is treated as a split for the mapper.
1 2 3 4 5 6 7 8 9 10 11 12 13
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path filename) { return false; }
Since each file is a split, the inputSplit of the record reader is actually a FileSplit. In the example code below, each returned value is the whole content of the file. The key is null in this example, which can also be the name of the file obtained from the fileSplit.
public void close() throws IOException { //do nothing } }
I also think the TeraInputFormat a good example. The key difference from the previous example lies in the input file format of terasort. According to the document, each input element is a 100 byte array, with the first 10 bytes as key and the left 90 bytes as value. This raises a challenge for dividing the input file into splits with exactly multiple of 100 bytes. In the code below, the getSplits simply invokes super.getSplits, ignoring the format issue. Then, in the record reader, the offset is adjusted to the next multiple of 100.
static class TeraRecordReader extends RecordReader<Text, Text> { private FSDataInputStream in; private long offset; private long length; private static final int RECORD_LENGTH = 100; private byte[] buffer = new byte[100]; private Text key; private Text value;
One of the most common reason of customizing your own input format is for combining small files into fewer bigger ones. For this purpose, we still need to disable the split of files. Note that the implemented CombineWholeFileInputFormat extends CombineFileInputFormat which helps to turn the input split into a combine file split (the size of each split could be tuned) which may contains multiple small files. Each individual file could be retrieved with an index as shown in the record reader below.
public class CombineWholeFileRecordReader extends RecordReader<Text, BytesWritable> { private WholeFileRecordReader reader; private String fileName; public CombineWholeFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException { FileSplit fileSplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
fileName = fileSplit.getPath().toString();
reader = new WholeFileRecordReader(); reader.initialize(fileSplit, context); }
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
}
public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); }
public Text getCurrentKey() throws IOException, InterruptedException { return new Text(fileName); }
public BytesWritable getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); }
public float getProgress() throws IOException, InterruptedException { return reader.getProgress(); }
public void close() throws IOException { reader.close(); } }
Summary
This article demonstrated a few sample implementations of input format for a mapreduce job. The basic idea is first divide the whole input data into splits and then read each split with a record reader. Usually, there are already plenty of base input formats that can be leveraged for customizing into more sophisticated ones.