Updated on 2018-02-05
I recently encountered several OOMs from mapper tasks reading parquet files. The yarn container is killed due to running out of physical memory. Since I already set the JVM memory to 0.8 of the container size, I’m pretty sure that this is due to off-heap memory allocation issues. I found the two jira issues here and here, pointing me to the snappy codec used by parquet for decompression. There aren’t so much I can do except allocating more memory beside the JVM.
===
I recently experienced two OOM problems running a mapreduce application. The MR application reads from a group of parquet files, shuffles the input rows, and writes into parquet files, too.
The first OOM is thrown by the mapper with error logs look like following
1 | 2017-06-22 09:59:10.978 STDIO [ERROR] [WORKER] [129] Container [pid=14638,containerID=container_e26_1495868456939_0784_01_000066] is running beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 1.5 GB of 2.1 GB virtual memory used. Killing container. |
After some investigation, I realized this is due to a misconfiguration of the mapper container memory limit (mapreduce.map.memory.mb) and the mapper JVM memory limit (mapreduce.map.java.opts). Basically, the latter should be smaller than the former, because the mapper container consumes some memory itself. After setting mapreduce.map.java.opts = mapreduce.map.memory.mb * 0.8, the OOM problem is gone. I note that this also applies for the reducer, which has two corresponding parameters (mapreduce.reduce.java.opts and mapreduce.reduce.memory.mb). This article explains nicely.
The second OOM issue is much harder to address, which comes with the shuffle phase. I saw error logs like following
1 | 2017-06-21 20:22:42.294 STDIO [ERROR] [WORKER] [100] Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1 |
This is not an old problem which could be found in here and here. Most of the solutions suggest tuning the three parameters:
- mapreduce.reduce.shuffle.input.buffer.percent (default 0.7): how much memory shuffle can use to store data pulled from mappers for in-memory sort.
- mapreduce.reduce.shuffle.memory.limit.percent (default 0.25): how much memory each shuffle thread uses for pulling data from mappers into memory.
- mapreduce.reduce.shuffle.parallelcopies (default 10): the number of shuffle thread can run in parallel
Some solutions claims that we should have
1 | mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.reduce.shuffle.memory.limit.percent * mapreduce.reduce.shuffle.parallelcopies < 1 |
which is actually not correct. MergeManager allocates memory to shuffle threads which is used for copying mapper output into memory. Each time a shuffle thread applies for a copy action, the MergeManager determines if the application is granted by checking (1) if the appliedMemory size is more than the max memory each shuffle thread can have. This is controlled by mapreduce.reduce.shuffle.input.buffer.percent * mapreduce.reduce.shuffle.memory.limit.percent. Suppose the reducer JVM has 3.5G heap size, each shuffle can apply no more than 3500*0.7*0.25=612M with default settings. (2) if the usedMemory is more than memoryLimit. The used memory accounts for memory used by shuffles and in-memory merge. The memory limit is calculated by 3.5*0.7 = 2.45G with 3.5G JVM heap size. Now, if the usedMemory is 2.44G and appliedMemory is 612M, the real memory used by shuffle could be more than 3G !!!
This is not a bug, since there is a detailed comments in MergeManagerImpl.reserve. The comments explain why the actually used memory could be one shuffle larger than the limit. From the other side, this could cause OOM. Due to this issue, there’s no 100% safe way to fix the OOM by tuning the parameters. We can only mitigate this problem by reducing mapreduce.reduce.shuffle.input.buffer.percent and/or mapreduce.reduce.shuffle.memory.limit.percent. One should carefully calculate these parameters according to the real workload. Especially, the memory each shuffle can use limit the max size of output from each mapper. For example, if the mapper produces a 300M intermediate file, the shuffle should be able to allocate memory more than 300M. Otherwise, all sort will be done on disk.
One more thing is about the parquet format. It is a highly compressed format, and therefore the decompressed mapper output is much larger than the input split size. I think this is why OOM happens more frequently for parquet files than other file formats.