Implementing Your Own Mapreduce Input Format

There are two steps for each input format, namely, getSplit and recordReader. GetSplit transforms the original input data into splits, one split for each mapper. Typically, if a big file is feed into mapreduce, it will be divided into splits of block size (e.g., 64M or 128M). However, if small files are provided, mapreduce will treat each file as a split. RecordReader turn each split into key value pairs, which are used by the mapper later.

In this article, I use several examples to explain how an inputformat be implemented. I’m greatly influenced by a few great references, such as this and this.

The first example is from book “Hadoop: The Definitive Guide”. WholeFileInputFormat treats each individual input file as a value. We do not need to implement our own getSplit function. Once the isSplitable function returns false, no file will be divided. Each file is treated as a split for the mapper.

1
2
3
4
5
6
7
8
9
10
11
12
13
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(inputSplit, taskAttemptContext);
return reader;
}
}

Since each file is a split, the inputSplit of the record reader is actually a FileSplit. In the example code below, each returned value is the whole content of the file. The key is null in this example, which can also be the name of the file obtained from the fileSplit.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;

public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = taskAttemptContext.getConfiguration();
}

public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed) {
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}

public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}

public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}

public float getProgress() throws IOException, InterruptedException {
return processed? 1.0f : 0.0f;
}

public void close() throws IOException {
//do nothing
}
}

I also think the TeraInputFormat a good example. The key difference from the previous example lies in the input file format of terasort. According to the document, each input element is a 100 byte array, with the first 10 bytes as key and the left 90 bytes as value. This raises a challenge for dividing the input file into splits with exactly multiple of 100 bytes. In the code below, the getSplits simply invokes super.getSplits, ignoring the format issue. Then, in the record reader, the offset is adjusted to the next multiple of 100.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class TeraInputFormat extends FileInputFormat<Text, Text> {
static final String PARTITION_FILENAME = "_partition.lst";
private static final String NUM_PARTITIONS = "mapreduce.terasort.num.partitions";
private static final String SAMPLE_SIZE = "mapreduce.terasort.partitions.sample";
static final int KEY_LENGTH = 10;
static final int VALUE_LENGTH = 90;
static final int RECORD_LENGTH = 100;
private static MRJobConfig lastContext = null;
private static List<InputSplit> lastResult = null;

public TeraInputFormat() {
}

public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new TeraInputFormat.TeraRecordReader();
}

public List<InputSplit> getSplits(JobContext job) throws IOException {
if(job == lastContext) {
return lastResult;
} else {
long t1 = System.currentTimeMillis();
lastContext = job;
lastResult = super.getSplits(job);
long t2 = System.currentTimeMillis();
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
if(job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
TeraScheduler scheduler = new TeraScheduler((FileSplit[])lastResult.toArray(new FileSplit[0]), job.getConfiguration());
lastResult = scheduler.getNewFileSplits();
long t3 = System.currentTimeMillis();
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
}

return lastResult;
}
}

static class TeraRecordReader extends RecordReader<Text, Text> {
private FSDataInputStream in;
private long offset;
private long length;
private static final int RECORD_LENGTH = 100;
private byte[] buffer = new byte[100];
private Text key;
private Text value;

public TeraRecordReader() throws IOException {
}

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Path p = ((FileSplit)split).getPath();
FileSystem fs = p.getFileSystem(context.getConfiguration());
this.in = fs.open(p);
long start = ((FileSplit)split).getStart();
this.offset = (100L - start % 100L) % 100L;
this.in.seek(start + this.offset);
this.length = ((FileSplit)split).getLength();
}

public void close() throws IOException {
this.in.close();
}

public Text getCurrentKey() {
return this.key;
}

public Text getCurrentValue() {
return this.value;
}

public float getProgress() throws IOException {
return (float)this.offset / (float)this.length;
}

public boolean nextKeyValue() throws IOException {
if(this.offset >= this.length) {
return false;
} else {
long newRead;
for(int read = 0; read < 100; read = (int)((long)read + newRead)) {
newRead = (long)this.in.read(this.buffer, read, 100 - read);
if(newRead == -1L) {
if(read == 0) {
return false;
}

throw new EOFException("read past eof");
}
}

if(this.key == null) {
this.key = new Text();
}

if(this.value == null) {
this.value = new Text();
}

this.key.set(this.buffer, 0, 10);
this.value.set(this.buffer, 10, 90);
this.offset += 100L;
return true;
}
}
}
}

One of the most common reason of customizing your own input format is for combining small files into fewer bigger ones. For this purpose, we still need to disable the split of files. Note that the implemented CombineWholeFileInputFormat extends CombineFileInputFormat which helps to turn the input split into a combine file split (the size of each split could be tuned) which may contains multiple small files. Each individual file could be retrieved with an index as shown in the record reader below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class CombineWholeFileInputFormat extends CombineFileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
return new CombineFileRecordReader<Text, BytesWritable>(
(CombineFileSplit)inputSplit,
taskAttemptContext,
CombineWholeFileRecordReader.class
);
}
}

public class CombineWholeFileRecordReader extends RecordReader<Text, BytesWritable> {
private WholeFileRecordReader reader;
private String fileName;
public CombineWholeFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {
FileSplit fileSplit = new FileSplit(split.getPath(index),
split.getOffset(index), split.getLength(index),
split.getLocations());

fileName = fileSplit.getPath().toString();

reader = new WholeFileRecordReader();
reader.initialize(fileSplit, context);
}

public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

}

public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}

public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
}

public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}

public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}

public void close() throws IOException {
reader.close();
}
}

Summary

This article demonstrated a few sample implementations of input format for a mapreduce job. The basic idea is first divide the whole input data into splits and then read each split with a record reader. Usually, there are already plenty of base input formats that can be leveraged for customizing into more sophisticated ones.