最新消息:深度思考

Hive动态分区Direct buffer memory溢出分析及解决方案

hive liuxuecheng 7270浏览 0评论

1.背景

使用hive的动态分区(dynamic partition)时,如果最终生成的分区数比较多,会出现jvm直接内存(direct buffer memory)溢出的情况,导致分区作业失败。假如有一张订单表t_order_tmp,里面有个字段create_time记录了下单时间,现在要对这张表按日期分区,将同一天的订单信息放在t_order表的一个分区里面,写如下的sql:

SET hive.exec.dynamic.partition=true;  
SET hive.exec.dynamic.partition.mode=nonstrict; 
SET hive.exec.max.dynamic.partitions.pernode = 1000;
SET hive.exec.max.dynamic.partitions=2000;

insert overwrite table t_order partition(dt) 
select a.*,substring(a.create_time,0,10) as dt from t_order_tmp a;

当输出的分区比较多时,会报下面的异常:

FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:658)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
    at com.hadoop.compression.lzo.LzoCompressor.realloc(LzoCompressor.java:261)
    at com.hadoop.compression.lzo.LzoCompressor.init(LzoCompressor.java:278)
    at com.hadoop.compression.lzo.LzoCompressor.reinit(LzoCompressor.java:217)
    at com.hadoop.compression.lzo.LzoCompressor.<init>(LzoCompressor.java:199)
    at com.hadoop.compression.lzo.LzoCodec.createCompressor(LzoCodec.java:168)
    at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
    at com.hadoop.compression.lzo.LzopCodec.getCompressor(LzopCodec.java:171)
    at com.hadoop.compression.lzo.LzopCodec.createOutputStream(LzopCodec.java:72)
    at org.apache.hadoop.hive.ql.exec.Utilities.createCompressedStream(Utilities.java:1399)
    at org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.getHiveRecordWriter(HiveIgnoreKeyTextOutputFormat.java:140)
    at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:263)
    at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:591)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:535)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createNewPaths(FileSinkOperator.java:824)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:931)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:678)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
    at org.apache.hadoop.hive.ql.exec.FilterOperator.processOp(FilterOperator.java:120)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:95)
    at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:157)
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:497)
    at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

2.分析

2.1Lzo压缩过程中会申请直接内存

通过异常堆栈信息可以看到最底层的错误来源于java.nio直接内存相关的操作,java nio不会凭空报错,所以接着找它的调用方,看到LzoCompressor的261行报错。新建一个maven工程,引入如下pom.xml,并使用mvn dependency:sources下载源码,在idea中分析源码。

<dependency>
   <groupId>com.hadoop.gplcompression</groupId>
   <artifactId>hadoop-lzo</artifactId>
   <version>0.4.19</version>
</dependency>

找到和直接内存相关的代码:

/**
   * Reallocates a direct byte buffer by freeing the old one and allocating
   * a new one, unless the size is the same, in which case it is simply
   * cleared and returned.
   *
   * NOTE: this uses unsafe APIs to manually free memory - if anyone else
   * has a reference to the 'buf' parameter they will likely read random
   * data or cause a segfault by accessing it.
   */
 private ByteBuffer realloc(ByteBuffer buf, int newSize) {
    if (buf != null) {
      if (buf.capacity() == newSize) {
        // Can use existing buffer
        buf.clear();
        return buf;
      }
      try {
        // Manually free the old buffer using undocumented unsafe APIs.
        // If this fails, we'll drop the reference and hope GC finds it
        // eventually.
        Method cleanerMethod = buf.getClass().getMethod("cleaner");
        cleanerMethod.setAccessible(true);
        Object cleaner = cleanerMethod.invoke(buf);
        Method cleanMethod = cleaner.getClass().getMethod("clean");
        cleanMethod.setAccessible(true);
        cleanMethod.invoke(cleaner);
      } catch (Exception e) {
        // Perhaps a non-sun-derived JVM - contributions welcome
        LOG.warn("Couldn't realloc bytebuffer", e);
      }
    }
    return ByteBuffer.allocateDirect(newSize);
  }

  private void init(CompressionStrategy strategy, int compressionLevel, int directBufferSize) {
    this.strategy = strategy;
    this.lzoCompressionLevel = compressionLevel;
    this.directBufferSize = directBufferSize;

    uncompressedDirectBuf = realloc(uncompressedDirectBuf, directBufferSize);
    compressedDirectBuf = realloc(compressedDirectBuf, directBufferSize);
    compressedDirectBuf.position(directBufferSize);
    reset();

    /**
     * Initialize {@link #lzoCompress} and {@link #workingMemoryBufLen}
     */
    init(this.strategy.getCompressor());
    workingMemoryBuf = realloc(workingMemoryBuf, workingMemoryBufLen);
  }

从上述代码可以看到LzoCompressor在压缩过程中不断的释放和申请直接内存。使用直接内存而不使用jvm的堆内存是Lzo压缩基于性能考虑的,但是直接内存不受jvm管理,出现问题难以排查。有关jvm运行时内存的相关介绍可以参考我的另外一篇文章:jvm内存模型

2.2 压缩发生在文件输出过程中

继续跟进错误信息栈,可以看到hive在输出过程中使用了LzoCompressor。同样的引入hive-exec模块的源码,pom如下:

 <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.0</version>
 </dependency>

核心的代码在这里:

public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProperties, Progressable progress) throws IOException {
    int rowSeparator = 0;
    String rowSeparatorString = tableProperties.getProperty(
        serdeConstants.LINE_DELIM, "\n");
    try {
      rowSeparator = Byte.parseByte(rowSeparatorString);
    } catch (NumberFormatException e) {
      rowSeparator = rowSeparatorString.charAt(0);
    }

    final int finalRowSeparator = rowSeparator;
    FileSystem fs = outPath.getFileSystem(jc);
    final OutputStream outStream = Utilities.createCompressedStream(jc,
    fs.create(outPath, progress), isCompressed);
    return new RecordWriter() {
      @Override
      public void write(Writable r) throws IOException {
        if (r instanceof Text) {
          Text tr = (Text) r;
          outStream.write(tr.getBytes(), 0, tr.getLength());
          outStream.write(finalRowSeparator);
        } else {
          // DynamicSerDe always writes out BytesWritable
          BytesWritable bw = (BytesWritable) r;
          outStream.write(bw.get(), 0, bw.getSize());
          outStream.write(finalRowSeparator);
        }
      }
      @Override
      public void close(boolean abort) throws IOException {
        outStream.close();
      }
    };
  }

可以看到是文件最终是以压缩流的方式写入到hdfs上的,所以动态分区直接内存报错不在数据读入阶段,网上给的增加jvm堆内存的方式不管用。

3.解决方案

既然找到了问题的根源,那就有大致两种解决方案。

3.1 修改hive作业运行时的直接内存

Hadoop 2.x版本在/hadoop/etc/hadoop/mapred-site.xml中定义了作业运行过程中jvm的相关参数,这些都是默认值。
可以看到-XX:MaxDirectMemorySize=128m,直接内存才分了128m,不挂才怪。
jvm
作业运行时可以在Configuration中重写这些配置。因为动态分区作业没有reduce阶段,所以在hive处理逻辑前加上如下配置:

set mapreduce.map.java.opts=-XX:MaxDirectMemorySize=1024m

如果在map和reduce阶段都有对直接内存的访问,那就需要对mapreduce.map.java.optsmapreduce.reduce.java.opts同时赋值。
这里,当我在hive脚本加入了上述配置项以后,发现作业依然报之前的错误,于是分析了作业运行过程中输出的完整日志,终于发现了下面异常的地方:

WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.map.java.opts; Ignoring.

从字面意思来看,我设置的mapreduce.map.java.opts参数被忽略了,找到这段逻辑的代码实现:org.apache.hadoop.conf.Configuration类

private Set<String> finalParameters = new HashSet<String>();

private void loadProperty(Properties properties, String name, String attr,
      String value, boolean finalParameter, String[] source) {
    if (value != null) {
      if (!finalParameters.contains(attr)) {
        properties.setProperty(attr, value);
        updatingResource.put(attr, source);
      } else if (!value.equals(properties.getProperty(attr))) {
        LOG.warn(name+":an attempt to override final parameter: "+attr
            +";  Ignoring.");
      }
    }
    if (finalParameter) {
      finalParameters.add(attr);
    }
  }

在Configuration类中定义了一个finalParameters,类似于java中的final变量,一但被创建就不能被修改。到此终于知道在hive中设置的参数为啥不起作用了,原来是默认的参数设置了final。但是什么样的配置项才算是final呢?Configuration类的doc写的比较清楚:

Configuration parameters may be declared final. Once a resource declares a value final, no subsequently-loaded resource can alter that value. For example, one might define a final parameter with:
  <property>
    <name>dfs.hosts.include</name>
    <value>/etc/hadoop/conf/hosts.include</value>
    <final>true</final>
  </property>
Administrators typically define parameters as final in core-site.xml for values that user applications may not alter.

说白了就是配置项带true标签的不能被修改。所以只能换第二种方式:修改压缩方式。

3.2 修改压缩方式为Gzip

特意去看了下org.apache.hadoop.io.compress.GzipCodec的源码,没有用到直接内存。所以hive脚本改为:

SET hive.exec.dynamic.partition=true;  
SET hive.exec.dynamic.partition.mode=nonstrict; 
SET hive.exec.max.dynamic.partitions.pernode = 1000;
SET hive.exec.max.dynamic.partitions=2000;

--新增配置--
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

insert overwrite table t_order partition(dt) 
select a.*,substring(a.create_time,0,10) as dt from t_order_tmp a ;

再次尝试,运行成功。当然还可以使用snappy等其他压缩方式,只要确认这种压缩方式不通过直接内存就行。

最后附上hive的一些配置项:
https://www.cnblogs.com/binguo2008/p/7487782.html
https://blog.csdn.net/djd1234567/article/details/51581354
https://www.cnblogs.com/binguo2008/p/7487782.html

转载请注明:大数据随笔 » Hive动态分区Direct buffer memory溢出分析及解决方案

与本文相关的文章

  • 暂无相关文章!
发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址