1.背景
使用hive的动态分区(dynamic partition)时,如果最终生成的分区数比较多,会出现jvm直接内存(direct buffer memory)溢出的情况,导致分区作业失败。假如有一张订单表t_order_tmp,里面有个字段create_time记录了下单时间,现在要对这张表按日期分区,将同一天的订单信息放在t_order表的一个分区里面,写如下的sql:
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode = 1000;
SET hive.exec.max.dynamic.partitions=2000;
insert overwrite table t_order partition(dt)
select a.*,substring(a.create_time,0,10) as dt from t_order_tmp a;
当输出的分区比较多时,会报下面的异常:
FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at com.hadoop.compression.lzo.LzoCompressor.realloc(LzoCompressor.java:261)
at com.hadoop.compression.lzo.LzoCompressor.init(LzoCompressor.java:278)
at com.hadoop.compression.lzo.LzoCompressor.reinit(LzoCompressor.java:217)
at com.hadoop.compression.lzo.LzoCompressor.<init>(LzoCompressor.java:199)
at com.hadoop.compression.lzo.LzoCodec.createCompressor(LzoCodec.java:168)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
at com.hadoop.compression.lzo.LzopCodec.getCompressor(LzopCodec.java:171)
at com.hadoop.compression.lzo.LzopCodec.createOutputStream(LzopCodec.java:72)
at org.apache.hadoop.hive.ql.exec.Utilities.createCompressedStream(Utilities.java:1399)
at org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.getHiveRecordWriter(HiveIgnoreKeyTextOutputFormat.java:140)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:263)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:591)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:535)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createNewPaths(FileSinkOperator.java:824)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:931)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:678)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:120)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:95)
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:157)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:497)
at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2.分析
2.1Lzo压缩过程中会申请直接内存
通过异常堆栈信息可以看到最底层的错误来源于java.nio直接内存相关的操作,java nio不会凭空报错,所以接着找它的调用方,看到LzoCompressor的261行报错。新建一个maven工程,引入如下pom.xml,并使用mvn dependency:sources下载源码,在idea中分析源码。
<dependency>
<groupId>com.hadoop.gplcompression</groupId>
<artifactId>hadoop-lzo</artifactId>
<version>0.4.19</version>
</dependency>
找到和直接内存相关的代码:
/**
* Reallocates a direct byte buffer by freeing the old one and allocating
* a new one, unless the size is the same, in which case it is simply
* cleared and returned.
*
* NOTE: this uses unsafe APIs to manually free memory - if anyone else
* has a reference to the 'buf' parameter they will likely read random
* data or cause a segfault by accessing it.
*/
private ByteBuffer realloc(ByteBuffer buf, int newSize) {
if (buf != null) {
if (buf.capacity() == newSize) {
// Can use existing buffer
buf.clear();
return buf;
}
try {
// Manually free the old buffer using undocumented unsafe APIs.
// If this fails, we'll drop the reference and hope GC finds it
// eventually.
Method cleanerMethod = buf.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(buf);
Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.setAccessible(true);
cleanMethod.invoke(cleaner);
} catch (Exception e) {
// Perhaps a non-sun-derived JVM - contributions welcome
LOG.warn("Couldn't realloc bytebuffer", e);
}
}
return ByteBuffer.allocateDirect(newSize);
}
private void init(CompressionStrategy strategy, int compressionLevel, int directBufferSize) {
this.strategy = strategy;
this.lzoCompressionLevel = compressionLevel;
this.directBufferSize = directBufferSize;
uncompressedDirectBuf = realloc(uncompressedDirectBuf, directBufferSize);
compressedDirectBuf = realloc(compressedDirectBuf, directBufferSize);
compressedDirectBuf.position(directBufferSize);
reset();
/**
* Initialize {@link #lzoCompress} and {@link #workingMemoryBufLen}
*/
init(this.strategy.getCompressor());
workingMemoryBuf = realloc(workingMemoryBuf, workingMemoryBufLen);
}
从上述代码可以看到LzoCompressor在压缩过程中不断的释放和申请直接内存。使用直接内存而不使用jvm的堆内存是Lzo压缩基于性能考虑的,但是直接内存不受jvm管理,出现问题难以排查。有关jvm运行时内存的相关介绍可以参考我的另外一篇文章:jvm内存模型
2.2 压缩发生在文件输出过程中
继续跟进错误信息栈,可以看到hive在输出过程中使用了LzoCompressor。同样的引入hive-exec模块的源码,pom如下:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.0</version>
</dependency>
核心的代码在这里:
public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
int rowSeparator = 0;
String rowSeparatorString = tableProperties.getProperty(
serdeConstants.LINE_DELIM, "\n");
try {
rowSeparator = Byte.parseByte(rowSeparatorString);
} catch (NumberFormatException e) {
rowSeparator = rowSeparatorString.charAt(0);
}
final int finalRowSeparator = rowSeparator;
FileSystem fs = outPath.getFileSystem(jc);
final OutputStream outStream = Utilities.createCompressedStream(jc,
fs.create(outPath, progress), isCompressed);
return new RecordWriter() {
@Override
public void write(Writable r) throws IOException {
if (r instanceof Text) {
Text tr = (Text) r;
outStream.write(tr.getBytes(), 0, tr.getLength());
outStream.write(finalRowSeparator);
} else {
// DynamicSerDe always writes out BytesWritable
BytesWritable bw = (BytesWritable) r;
outStream.write(bw.get(), 0, bw.getSize());
outStream.write(finalRowSeparator);
}
}
@Override
public void close(boolean abort) throws IOException {
outStream.close();
}
};
}
可以看到是文件最终是以压缩流的方式写入到hdfs上的,所以动态分区直接内存报错不在数据读入阶段,网上给的增加jvm堆内存的方式不管用。
3.解决方案
既然找到了问题的根源,那就有大致两种解决方案。
3.1 修改hive作业运行时的直接内存
Hadoop 2.x版本在/hadoop/etc/hadoop/mapred-site.xml中定义了作业运行过程中jvm的相关参数,这些都是默认值。
可以看到-XX:MaxDirectMemorySize=128m,直接内存才分了128m,不挂才怪。
作业运行时可以在Configuration中重写这些配置。因为动态分区作业没有reduce阶段,所以在hive处理逻辑前加上如下配置:
set mapreduce.map.java.opts=-XX:MaxDirectMemorySize=1024m
如果在map和reduce阶段都有对直接内存的访问,那就需要对mapreduce.map.java.opts和mapreduce.reduce.java.opts同时赋值。
这里,当我在hive脚本加入了上述配置项以后,发现作业依然报之前的错误,于是分析了作业运行过程中输出的完整日志,终于发现了下面异常的地方:
WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.map.java.opts; Ignoring.
从字面意思来看,我设置的mapreduce.map.java.opts参数被忽略了,找到这段逻辑的代码实现:org.apache.hadoop.conf.Configuration类
private Set<String> finalParameters = new HashSet<String>();
private void loadProperty(Properties properties, String name, String attr,
String value, boolean finalParameter, String[] source) {
if (value != null) {
if (!finalParameters.contains(attr)) {
properties.setProperty(attr, value);
updatingResource.put(attr, source);
} else if (!value.equals(properties.getProperty(attr))) {
LOG.warn(name+":an attempt to override final parameter: "+attr
+"; Ignoring.");
}
}
if (finalParameter) {
finalParameters.add(attr);
}
}
在Configuration类中定义了一个finalParameters,类似于java中的final变量,一但被创建就不能被修改。到此终于知道在hive中设置的参数为啥不起作用了,原来是默认的参数设置了final。但是什么样的配置项才算是final呢?Configuration类的doc写的比较清楚:
Configuration parameters may be declared final. Once a resource declares a value final, no subsequently-loaded resource can alter that value. For example, one might define a final parameter with:
<property>
<name>dfs.hosts.include</name>
<value>/etc/hadoop/conf/hosts.include</value>
<final>true</final>
</property>
Administrators typically define parameters as final in core-site.xml for values that user applications may not alter.
说白了就是配置项带
3.2 修改压缩方式为Gzip
特意去看了下org.apache.hadoop.io.compress.GzipCodec的源码,没有用到直接内存。所以hive脚本改为:
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode = 1000;
SET hive.exec.max.dynamic.partitions=2000;
--新增配置--
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
insert overwrite table t_order partition(dt)
select a.*,substring(a.create_time,0,10) as dt from t_order_tmp a ;
再次尝试,运行成功。当然还可以使用snappy等其他压缩方式,只要确认这种压缩方式不通过直接内存就行。
最后附上hive的一些配置项:
https://www.cnblogs.com/binguo2008/p/7487782.html
https://blog.csdn.net/djd1234567/article/details/51581354
https://www.cnblogs.com/binguo2008/p/7487782.html