最新消息:深度思考

4.flume sink

flume liuxuecheng 3129浏览 0评论

1.HDFS Sink

hdfs sink支持将text类型或者sequenceFile类型的event写入hdfs,同时也支持在写入的时候进行压缩。
flume使用hadoop提供的FileSystem相关的操作工具,使用hsync或者hflush的方式将数据写入hdfs。

HDFS在写数据务必要保证数据的一致性与持久性,从HDFS最初的版本到2.0版本HDFS提供了两种同步语义。
1.将client端写入的数据刷到每个DataNode的OS缓存中,如果每个副本所在的DataNode同时crash时(例如机房断电)就会导致数据丢失(sync和hflush方法)。
2. 将client端写入的数据刷到每个DataNode的磁盘中(hsync方法);
在Hadoop2.0和cdh4中DFSOutputStream提供了sync,hflush和hsync三个方法,sync和hflush均是语义1,而hsync是语义2,hsync比sync和hflush的同步性更强

 // Write the data to HDFS
        try {
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
                   "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }

      transaction.commit();

2.Kafka Sink

kafka sink支持将数据打入kafka以供其他系统通过topic接入。
flume发送消息到kafka源码解析:

kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
producer.flush();
for (Future<RecordMetadata> future : kafkaFutures) {
          future.get();
      }

以上是flume将数据发送到kafka的功能性代码,可以看到使用的是kafka的producer,整个发送过程都有事务保证,同时提供了一个发送后的回调函数,但是遗憾的是如果发生异常这个回调函数只是打印错误日志。源码如下:

class SinkCallback implements Callback {
  private static final Logger logger = LoggerFactory.getLogger(SinkCallback.class);
  private long startTime;

  public SinkCallback(long startTime) {
    this.startTime = startTime;
  }

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
      logger.debug("Error sending message to Kafka {} ", exception.getMessage());
    }

    if (logger.isDebugEnabled()) {
      long eventElapsedTime = System.currentTimeMillis() - startTime;
      if (metadata != null) {
        logger.debug("Acked message partition:{} ofset:{}", metadata.partition(),
                metadata.offset());
      }
      logger.debug("Elapsed time for send: {}", eventElapsedTime);
    }
  }
}

所以,flume自身内部有事务机制保证数据,但是与kafka交互过程中不能保证数据的准确性。

3.Hive Sink

支持将含有分隔符的text或者json数据导入hive,通过连接hive server,只要有数据导入hive,则在hive中这些数据就是立即可见的。同时也可以在hive中创建分区。
flume数据导入hive使用thrift方式,并且具有事务性。核心代码如下:

 private int drainOneBatch(Channel channel)
          throws HiveWriter.Failure, InterruptedException {
    int txnEventCount = 0;
    try {
      Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
      for (; txnEventCount < batchSize; ++txnEventCount) {
        // 0) Read event from Channel
        Event event = channel.take();
        if (event == null) {
          break;
        }

        //1) Create end point by substituting place holders
        HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
                partitionVals, event.getHeaders(), timeZone,
                needRounding, roundUnit, roundValue, useLocalTime);

        //2) Create or reuse Writer
        HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);

        //3) Write
        LOG.debug("{} : Writing event to {}", getName(), endPoint);
        writer.write(event);

      } // for

      //4) Update counters
      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }
      sinkCounter.addToEventDrainAttemptCount(txnEventCount);

      // 5) Flush all Writers
      for (HiveWriter writer : activeWriters.values()) {
        writer.flush(true);
      }

      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
      return txnEventCount;
    } catch (HiveWriter.Failure e) {
      // in case of error we close all TxnBatches to start clean next time
      LOG.warn(getName() + " : " + e.getMessage(), e);
      abortAllWriters();
      closeAllWriters();
      throw e;
    }
  }

  private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
                                       HiveEndPoint endPoint)
          throws HiveWriter.ConnectException, InterruptedException {
    try {
      HiveWriter writer = allWriters.get( endPoint );
      if (writer == null) {
        LOG.info(getName() + ": Creating Writer to Hive end point : " + endPoint);
        writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
                callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);

        sinkCounter.incrementConnectionCreatedCount();
        if (allWriters.size() > maxOpenConnections) {
          int retired = closeIdleWriters();
          if (retired == 0) {
            closeEldestWriter();
          }
        }
        allWriters.put(endPoint, writer);
        activeWriters.put(endPoint, writer);
      } else {
        if (activeWriters.get(endPoint) == null)  {
          activeWriters.put(endPoint,writer);
        }
      }
      return writer;
    } catch (HiveWriter.ConnectException e) {
      sinkCounter.incrementConnectionFailedCount();
      throw e;
    }
  }

  private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table,
                                    List<String> partVals, Map<String, String> headers,
                                    TimeZone timeZone, boolean needRounding,
                                    int roundUnit, Integer roundValue,
                                    boolean useLocalTime)  {
    if (partVals == null) {
      return new HiveEndPoint(metaStoreUri, database, table, null);
    }

    ArrayList<String> realPartVals = Lists.newArrayList();
    for (String partVal : partVals) {
      realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone,
              needRounding, roundUnit, roundValue, useLocalTime));
    }
    return new HiveEndPoint(metaStoreUri, database, table, realPartVals);
  }

  public Status process() throws EventDeliveryException {
    // writers used in this Txn

    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();
    boolean success = false;
    try {
      // 1 Enable Heart Beats
      if (timeToSendHeartBeat.compareAndSet(true, false)) {
        enableHeartBeatOnAllWriters();
      }

      // 2 Drain Batch
      int txnEventCount = drainOneBatch(channel);
      transaction.commit();
      success = true;

      // 3 Update Counters
      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        return Status.READY;
      }
    } catch (InterruptedException err) {
      LOG.warn(getName() + ": Thread was interrupted.", err);
      return Status.BACKOFF;
    } catch (Exception e) {
      throw new EventDeliveryException(e);
    } finally {
      if (!success) {
        transaction.rollback();
      }
      transaction.close();
    }
  }

转载请注明:大数据随笔 » 4.flume sink

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址