4.flume sink

1.HDFS Sink

hdfs sink支持将text类型或者sequenceFile类型的event写入hdfs,同时也支持在写入的时候进行压缩。 flume使用hadoop提供的FileSystem相关的操作工具,使用hsync或者hflush的方式将数据写入hdfs。 > HDFS在写数据务必要保证数据的一致性与持久性,从HDFS最初的版本到2.0版本HDFS提供了两种同步语义。 1.将client端写入的数据刷到每个DataNode的OS缓存中,如果每个副本所在的DataNode同时crash时(例如机房断电)就会导致数据丢失(sync和hflush方法)。 2. 将client端写入的数据刷到每个DataNode的磁盘中(hsync方法); 在Hadoop2.0和cdh4中DFSOutputStream提供了sync,hflush和hsync三个方法,sync和hflush均是语义1,而hsync是语义2,hsync比sync和hflush的同步性更强

 // Write the data to HDFS
        try {
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info(Bucket was closed while trying to append,  +
                   reinitializing bucket and writing event.);
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }

      transaction.commit();

2.Kafka Sink

kafka sink支持将数据打入kafka以供其他系统通过topic接入。 flume发送消息到kafka源码解析: ```
kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
producer.flush();
for (Future future : kafkaFutures) {
future.get();
}


以上是flume将数据发送到kafka的功能性代码,可以看到使用的是kafka的producer,整个发送过程都有事务保证,同时提供了一个发送后的回调函数,但是遗憾的是如果发生异常这个回调函数只是打印错误日志。源码如下: ```
class SinkCallback implements Callback {
  private static final Logger logger = LoggerFactory.getLogger(SinkCallback.class);
  private long startTime;

  public SinkCallback(long startTime) {
    this.startTime = startTime;
  }

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
      logger.debug(Error sending message to Kafka {} , exception.getMessage());
    }

    if (logger.isDebugEnabled()) {
      long eventElapsedTime = System.currentTimeMillis() - startTime;
      if (metadata != null) {
        logger.debug(Acked message partition:{} ofset:{}, metadata.partition(),
                metadata.offset());
      }
      logger.debug(Elapsed time for send: {}, eventElapsedTime);
    }
  }
}

所以,flume自身内部有事务机制保证数据,但是与kafka交互过程中不能保证数据的准确性。 ### 3.Hive Sink

支持将含有分隔符的text或者json数据导入hive,通过连接hive server,只要有数据导入hive,则在hive中这些数据就是立即可见的。同时也可以在hive中创建分区。 flume数据导入hive使用thrift方式,并且具有事务性。核心代码如下: ```
private int drainOneBatch(Channel channel)
throws HiveWriter.Failure, InterruptedException {
int txnEventCount = 0;
try {
Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
for (; txnEventCount < batchSize; ++txnEventCount) {
// 0) Read event from Channel
Event event = channel.take();
if (event == null) {
break;
}

    //1) Create end point by substituting place holders
    HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
            partitionVals, event.getHeaders(), timeZone,
            needRounding, roundUnit, roundValue, useLocalTime);

    //2) Create or reuse Writer
    HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);

    //3) Write
    LOG.debug({} : Writing event to {}, getName(), endPoint);
    writer.write(event);

  } // for

  //4) Update counters
  if (txnEventCount == 0) {
    sinkCounter.incrementBatchEmptyCount();
  } else if (txnEventCount == batchSize) {
    sinkCounter.incrementBatchCompleteCount();
  } else {
    sinkCounter.incrementBatchUnderflowCount();
  }
  sinkCounter.addToEventDrainAttemptCount(txnEventCount);

  // 5) Flush all Writers
  for (HiveWriter writer : activeWriters.values()) {
    writer.flush(true);
  }

  sinkCounter.addToEventDrainSuccessCount(txnEventCount);
  return txnEventCount;
} catch (HiveWriter.Failure e) {
  // in case of error we close all TxnBatches to start clean next time
  LOG.warn(getName() +  :  + e.getMessage(), e);
  abortAllWriters();
  closeAllWriters();
  throw e;
}

}

private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
HiveEndPoint endPoint)
throws HiveWriter.ConnectException, InterruptedException {
try {
HiveWriter writer = allWriters.get( endPoint );
if (writer == null) {
LOG.info(getName() + : Creating Writer to Hive end point : + endPoint);
writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);

    sinkCounter.incrementConnectionCreatedCount();
    if (allWriters.size() > maxOpenConnections) {
      int retired = closeIdleWriters();
      if (retired == 0) {
        closeEldestWriter();
      }
    }
    allWriters.put(endPoint, writer);
    activeWriters.put(endPoint, writer);
  } else {
    if (activeWriters.get(endPoint) == null)  {
      activeWriters.put(endPoint,writer);
    }
  }
  return writer;
} catch (HiveWriter.ConnectException e) {
  sinkCounter.incrementConnectionFailedCount();
  throw e;
}

}

private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table,
List partVals, Map<String, String> headers,
TimeZone timeZone, boolean needRounding,
int roundUnit, Integer roundValue,
boolean useLocalTime) {
if (partVals == null) {
return new HiveEndPoint(metaStoreUri, database, table, null);
}

ArrayList realPartVals = Lists.newArrayList();
for (String partVal : partVals) {
  realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone,
          needRounding, roundUnit, roundValue, useLocalTime));
}
return new HiveEndPoint(metaStoreUri, database, table, realPartVals);

}

public Status process() throws EventDeliveryException {
// writers used in this Txn

Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;
try {
  // 1 Enable Heart Beats
  if (timeToSendHeartBeat.compareAndSet(true, false)) {
    enableHeartBeatOnAllWriters();
  }

  // 2 Drain Batch
  int txnEventCount = drainOneBatch(channel);
  transaction.commit();
  success = true;

  // 3 Update Counters
  if (txnEventCount < 1) {
    return Status.BACKOFF;
  } else {
    return Status.READY;
  }
} catch (InterruptedException err) {
  LOG.warn(getName() + : Thread was interrupted., err);
  return Status.BACKOFF;
} catch (Exception e) {
  throw new EventDeliveryException(e);
} finally {
  if (!success) {
    transaction.rollback();
  }
  transaction.close();
}

}

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇