1.HDFS Sink
hdfs sink支持将text类型或者sequenceFile类型的event写入hdfs,同时也支持在写入的时候进行压缩。 flume使用hadoop提供的FileSystem相关的操作工具,使用hsync或者hflush的方式将数据写入hdfs。 > HDFS在写数据务必要保证数据的一致性与持久性,从HDFS最初的版本到2.0版本HDFS提供了两种同步语义。 1.将client端写入的数据刷到每个DataNode的OS缓存中,如果每个副本所在的DataNode同时crash时(例如机房断电)就会导致数据丢失(sync和hflush方法)。 2. 将client端写入的数据刷到每个DataNode的磁盘中(hsync方法); 在Hadoop2.0和cdh4中DFSOutputStream提供了sync,hflush和hsync三个方法,sync和hflush均是语义1,而hsync是语义2,hsync比sync和hflush的同步性更强
// Write the data to HDFS
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info(Bucket was closed while trying to append, +
reinitializing bucket and writing event.);
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
transaction.commit();
2.Kafka Sink
kafka sink支持将数据打入kafka以供其他系统通过topic接入。 flume发送消息到kafka源码解析: ```
kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
producer.flush();
for (Future
future.get();
}
以上是flume将数据发送到kafka的功能性代码,可以看到使用的是kafka的producer,整个发送过程都有事务保证,同时提供了一个发送后的回调函数,但是遗憾的是如果发生异常这个回调函数只是打印错误日志。源码如下: ```
class SinkCallback implements Callback {
private static final Logger logger = LoggerFactory.getLogger(SinkCallback.class);
private long startTime;
public SinkCallback(long startTime) {
this.startTime = startTime;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.debug(Error sending message to Kafka {} , exception.getMessage());
}
if (logger.isDebugEnabled()) {
long eventElapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
logger.debug(Acked message partition:{} ofset:{}, metadata.partition(),
metadata.offset());
}
logger.debug(Elapsed time for send: {}, eventElapsedTime);
}
}
}
所以,flume自身内部有事务机制保证数据,但是与kafka交互过程中不能保证数据的准确性。 ### 3.Hive Sink
支持将含有分隔符的text或者json数据导入hive,通过连接hive server,只要有数据导入hive,则在hive中这些数据就是立即可见的。同时也可以在hive中创建分区。 flume数据导入hive使用thrift方式,并且具有事务性。核心代码如下: ```
private int drainOneBatch(Channel channel)
throws HiveWriter.Failure, InterruptedException {
int txnEventCount = 0;
try {
Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
for (; txnEventCount < batchSize; ++txnEventCount) {
// 0) Read event from Channel
Event event = channel.take();
if (event == null) {
break;
}
//1) Create end point by substituting place holders
HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
partitionVals, event.getHeaders(), timeZone,
needRounding, roundUnit, roundValue, useLocalTime);
//2) Create or reuse Writer
HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);
//3) Write
LOG.debug({} : Writing event to {}, getName(), endPoint);
writer.write(event);
} // for
//4) Update counters
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
sinkCounter.addToEventDrainAttemptCount(txnEventCount);
// 5) Flush all Writers
for (HiveWriter writer : activeWriters.values()) {
writer.flush(true);
}
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return txnEventCount;
} catch (HiveWriter.Failure e) {
// in case of error we close all TxnBatches to start clean next time
LOG.warn(getName() + : + e.getMessage(), e);
abortAllWriters();
closeAllWriters();
throw e;
}
}
private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
HiveEndPoint endPoint)
throws HiveWriter.ConnectException, InterruptedException {
try {
HiveWriter writer = allWriters.get( endPoint );
if (writer == null) {
LOG.info(getName() + : Creating Writer to Hive end point : + endPoint);
writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);
sinkCounter.incrementConnectionCreatedCount();
if (allWriters.size() > maxOpenConnections) {
int retired = closeIdleWriters();
if (retired == 0) {
closeEldestWriter();
}
}
allWriters.put(endPoint, writer);
activeWriters.put(endPoint, writer);
} else {
if (activeWriters.get(endPoint) == null) {
activeWriters.put(endPoint,writer);
}
}
return writer;
} catch (HiveWriter.ConnectException e) {
sinkCounter.incrementConnectionFailedCount();
throw e;
}
}
private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table,
List
TimeZone timeZone, boolean needRounding,
int roundUnit, Integer roundValue,
boolean useLocalTime) {
if (partVals == null) {
return new HiveEndPoint(metaStoreUri, database, table, null);
}
ArrayList realPartVals = Lists.newArrayList();
for (String partVal : partVals) {
realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone,
needRounding, roundUnit, roundValue, useLocalTime));
}
return new HiveEndPoint(metaStoreUri, database, table, realPartVals);
}
public Status process() throws EventDeliveryException {
// writers used in this Txn
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
boolean success = false;
try {
// 1 Enable Heart Beats
if (timeToSendHeartBeat.compareAndSet(true, false)) {
enableHeartBeatOnAllWriters();
}
// 2 Drain Batch
int txnEventCount = drainOneBatch(channel);
transaction.commit();
success = true;
// 3 Update Counters
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
return Status.READY;
}
} catch (InterruptedException err) {
LOG.warn(getName() + : Thread was interrupted., err);
return Status.BACKOFF;
} catch (Exception e) {
throw new EventDeliveryException(e);
} finally {
if (!success) {
transaction.rollback();
}
transaction.close();
}
}