2.flume sources

版权申明:转载请注明出处。
文章来源:大数据随笔

flume中提供了多种source来应对不同场景的数据传输,最常用的有exec source和spooling directory source。下面对flume的source做详细的说明。

1.Spooling Directory Source

这种方式是将要传输的文件放在磁盘的某个目录下,这个目录可以理解为一个池子,当池子中有文件的时候就会被放入channel,当确认文件已经放入channel,原始文件会被重命名或者删除。
这种方式是可靠的、并且不会丢失数据,当agent挂掉重启后也能接着传输。需要注意的是放入池子的文件不能再被写入或者不能出现重名文件,否则传输会报错。
需要注意的是尽管这种方式提供了可靠性保证,但是下游的某种失败也会导致event重复。
以下是该source的配置属性,加粗的为必选属性。

属性名称 默认值 说明
channels channel,可以为多个
type 组件的类型名称,这里为spooldir
spoolDir 读取文件的目录
fileSuffix .COMPLETED 为已经读取的文件添加的后缀
deletePolicy never 删除spool中文件的策略,有never和immediate
fileHeader false 是否在全路径的文件名上添加header
fileHeaderKey file 将一个绝对路径的文件名添加到event header时的key
basenameHeader false 是否在基本文件名上添加一个header
basenameHeaderKey basename 将一个基本文件添加到event header时的key
includePattern ^.*$ 用来标识需要被传输的文件名的正则表达式,可以与ignorePattern同时使用
如果同时命中了两条规则,则这个文件会被忽略。
ignorePattern ^$ 用来标识将要被忽略的文件的正则表达式。
trackerDir .flumespool 用来存储正在处理的文件的元数据的目录,如果这个目录不是一个绝对路径,
可以将其理解为一个和poolDir相关的目录
consumeOrder oldest 消耗spool中文件的顺序,有oldest,youngest,random,当为oldest或者youngest时,
会通过文件最后一次修改时间去排序,若修改时间一致,则按照文件名称的字典顺序排序。
这两种方式每次都会扫描整个目录,当有大量文件的时候速度会比较慢。
当为random时会随机的消耗文件,这种方式速度快但是有可能造成最早生成的文件最后消耗,对于时序有要求的不建议使用。
pollDelay 500 轮巡新文件的延时,单位为毫秒
recursiveDirectorySearch false 是否监控并读取子目录的文件。
maxBackoff 4000 两次写入channel的时间间隔,单位为毫秒,
source会以一个较大的间隔时间开始,然后每次以指数减少的方式缩短间隔时间,直到channel写满抛出异常,是一个动态的参数。
batchSize 100 每个批次的大小
inputCharset UTF-8 deserializers用来处理文件的字符集
decodeErrorPolicy FAIL 字符无法解码时的策略。
FAIL:抛出一个解码失败的异常。
REPLACE:替换字符,一般使用Unicode字符U+FFFD。
IGNORE:丢弃无法解析的队列
deserializer LINE deserializer用于将file解析成event,默认将文件的每一行解析为一个event。
deserializer.* 可以使用多个 deserializer对一个event处理。
bufferMaxLineLength 5000 每个需要提交的buffer中的最大文件行数,使用deserializer.maxLineLength替代。
selector.type replicating 取值有replicating、multiplexing
selector.* 依赖于selector.type
interceptors 拦截器,可配置多个,使用空格分割。

配置示例

a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

2.Scribe Source

scribe是facebook 提供的另外一种数据传输系统,使用scribe source相当于在scribe之后又套接了flume传输数据。以下是各个数据传输系统的对比图。
传输工具对比
scribe source 的属性配置如下:

属性 默认值 说明
type org.apache.flume.source.scribe.ScribeSource
port 1499 端口
maxReadBufferBytes 16384000 thrift默认的配置
workerThreads 5 工作线程数
selector.type replicating 取值有replicating、multiplexing
selector.* 依赖于selector.type

配置示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

3.Exec Source

exec source是一种比较简单的用来实时传输的source,它会启动一个用户给定的类Unix命令,这个命令会不断的产生数据流,exec source接收它产生的数据流。一般常见的命令有tail -f ,也可以使用一些比较复杂的命令。
但是这种方式无法保证可靠性,source无法感知event放入channel时产生的错误。比如当channel写满的时候,source还会不断的发送数据到channel,这会导致数据丢失。
exec source的属性配置如下:

属性 默认值 说明
channels channel,可以为多个
type 组件的类型名称,这里为exec
command 需要执行的命令。
shell 一个用户执行命令的shell调用。
restartThrottle 10000 重启之前等待的时间。
restart false 当执行的命令挂掉后是否需要重启。
logStdErr false 是否要将命令的错误日志记录。
batchSize 20 读取并发送到channel的最大行数。
batchTimeout 3000 批次间隔时间。
selector.type replicating 取值有replicating、multiplexing
selector.* 依赖于selector.type
interceptors 拦截器,可配置多个,使用空格分割。

配置示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

4.Taildir Source

这种方式会监听指定的文件,并以接近实时的频率去读取文件新产生的行。这种方式是可靠的并且不会丢失数据,因为在读取文件的过程中,会周期的记录文件读取的位置到一个json格式的文件中。当agent出现问题重启时会去读取位置文件中的信息,然后重新传输,也可以从指定的位置进行传输。taildir source不支持传输二进制的文件。
Taildir source的属性配置如下:

属性 默认值 说明
channels channel,可以为多个
type 组件的类型名称,这里为TAILDIR
filegroups 空格分割的多个文件组,每个组中有一组需要被tail的文件。
filegroupName 文件组的绝对路径。
positionFile ~/.flume/taildir_position.json 用于tail文件时记录最新的位置。
headers 多个header可以用来标识一个文件组
byteOffsetHeader false 是否为已经tail的行添加一个叫byteoffset的字节偏移
skipToEnd false 在位置文件没有写入的情况下是否跳过位置信息直接到末尾
idleTimeout 120000 当一个文件不活动的时间达到idleTimeout指定的时间,
则关闭对该文件的传输,
当有新的行写入时,会自动重新传输。
writePosInterval 3000 将每个文件最后一次读取位置写入位置文件的间隔时间。
batchSize 100 一次读取并发送到channel的文件行数的大小。
backoffSleepIncrement 1000 上一次没有读取到新数据,再一次读取的时延的增量,单位毫秒。
maxBackoffSleep 5000 上一次没有读取到新数据,再一次读取数据时间隔的最大时间,单位毫秒。
cachePatternMatching true 当一个目录有成千上万个文件,使用正则表达式匹配比较耗时,
缓存文件列表能改善这个问题。文件消耗的顺序也会被缓存
要求文件系统保持至少一秒的间隔去跟踪文件的修改时间。
fileHeader false 是否添加一个header去存储文件的绝对路径。
fileHeaderKey file 上述header的key

配置示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true

为您推荐

发表评论

电子邮件地址不会被公开。 必填项已用*标注