最新消息:深度思考

log4j2+flume传输二进制日志到hdfs

flume liuxuecheng 5143浏览 0评论

1.背景

当前业务使用TextFile存储日志,每天增量5T左右,集群存储空间有限,且使用TextFile日志的扩展性非常差,因此想到使用protocol buffer序列化数据,后续传输及处理使用二进制。百度+google了一圈发现没有现成案例,所以决定自己搞一个。

2.问题

1.log4j如何将二进制文件写入flume?
2.flume如何将二进制文件传输到hdfs?
3.传输到hdfs的文件结构是什么?如何读取?

3.相关知识点

3.1 log4j2

简介:一个强悍的日志序列化框架
官网:https://logging.apache.org/log4j/2.x/manual/index.html
示例:log4j2使用指南

3.2 flume

简介:分布式日志收集系统
官网:https://flume.apache.org/FlumeUserGuide.html
参考文章
1.flume简介
2.flume source
3.flume channel
4.flume sink

3.3 protocol buffer

简介:是google开发的的一套用于数据存储,网络通信时用于协议编解码的工具库.它和XML和Json数据差不多,把数据已某种形式保存起来.Protobuf相对与XML和Json的不同之处,它是一种二进制的数据格式,具有更高的传输,打包和解包效率。
参考文档:Protobuf 语法指南

3.4 avro

简介:和pb类型,avro支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro提供的机制使动态语言可以方便地处理Avro数据。
参考文档:avro入门指南(Java实现)

4.环境搭建

4.1 本地安装hadoop

参考我的另一篇文章:windows环境下安装配置hadoop

4.2 本地部署flume

在本地部署flume比较简单,参考我的另外一篇文章:windows下配置flume
除此之外,因为涉及到写hdfs,所以需要把hadoop的下列jar复制到flume的lib目录下,否则会报错。

/hadoop/share/hadoop/common/.jar
/hadoop/share/hadoop/common/lib/
.jar
/share/hadoop/hdfs/hadoop-hdfs-2.5.2.jar

5.具体方案

整个方案的流程如图所示:
流程

5.1 定义protocol buffer协议

定义pb协议,编译生成AdxWinLog类。

option java_outer_classname="AdxWinLog";
//编译:protoc --java_out=./ adxlog.proto

message AdxLog {
    optional string version = 1;
    required string displayId = 2;
    optional string ts = 3;
    optional string cookieId = 4;
    optional string imei = 5;
    optional string idfa = 6;
    optional string sessionId = 7;
    optional string userId = 8;
    optional string httpxForward = 9;
    optional string requestSource = 10;
}

5.2 自定义FlumeByteAppender

在给出代码之前有三点需要说明:
(1)log4j的Logger.info()方法支持传入Object类型对象,在log4j系统内部被包装成LogEvent流转。但是自带的FlumeAppender在处理过程中会调用LogEvent中Object的toString方法,然后将拿到的值通过layout格式化后放入flumeEvent的body中发送出去。所以如果在log.info()方法中传入一个非String类型的对象且这个对象没有重写toString方法,则拿到的是对象的hash码。
(2)如果log.info()方法中传入的是对象,则对应的LogEvent中Message的类型为ObjectMessage,这个类有点扯,存放值的字段名称为obj,字段的类型为Object[],它的getter方法名字叫getParameters。
(3)flumeEvent的body接收的参数为byte[]类型,一般都是通过layout的toByteArray将EventLog中的内容转为byte[]。

所以,我们可以直接在log.info()中传入byte[],在Appender中直接放入flumeEvent的body中。同时通过layout转换是没有必要的,因为二进制文件无需布局
代码如下:

package net.bigdataer.demo.log4j.appender;

import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.*;
import org.apache.logging.log4j.core.layout.Rfc5424Layout;
import org.apache.logging.log4j.core.net.Facility;
import org.apache.logging.log4j.core.util.Booleans;
import org.apache.logging.log4j.core.util.Integers;
import org.apache.logging.log4j.flume.appender.*;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ObjectMessage;
import org.apache.logging.log4j.message.SimpleMessage;

import java.io.Serializable;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

/**
 * Created by liuxuecheng on 2018/7/3.
 * custom FlumeByteAppender based on FlumeAppender
 * generation event without layout,so it not need to confige layout in log4j2.xml
 * it also suppots to pass a String type object ,because string is converted to byte[] in event too.
 * @58corp
 */
@Plugin(name = "FlumeByte", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
public final class FlumeByteAppender extends AbstractAppender implements FlumeEventFactory {
    private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
    private static final int DEFAULT_MAX_DELAY = 60000;

    private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;

    private final AbstractFlumeManager manager;

    private final String mdcIncludes;
    private final String mdcExcludes;
    private final String mdcRequired;

    private final String eventPrefix;

    private final String mdcPrefix;

    private final boolean compressBody;

    private final FlumeEventFactory factory;

    /**
     * Which Manager will be used by the appender instance.
     */
    private enum ManagerType {
        AVRO, EMBEDDED, PERSISTENT;

        public static FlumeByteAppender.ManagerType getType(final String type) {
            return valueOf(type.toUpperCase(Locale.US));
        }
    }

    private FlumeByteAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
                              final boolean ignoreExceptions, final String includes, final String excludes,
                              final String required, final String mdcPrefix, final String eventPrefix,
                              final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
        super(name, filter, layout, ignoreExceptions);
        this.manager = manager;
        this.mdcIncludes = includes;
        this.mdcExcludes = excludes;
        this.mdcRequired = required;
        this.eventPrefix = eventPrefix;
        this.mdcPrefix = mdcPrefix;
        this.compressBody = compress;
        this.factory = factory == null ? this : factory;
    }

    /**
     * Publish the event.
     * @param event The LogEvent.
     */
    @Override
    public void append(final LogEvent event) {
        final String name = event.getLoggerName();
        if (name != null) {
            for (final String pkg : EXCLUDED_PACKAGES) {
                if (name.startsWith(pkg)) {
                    return;
                }
            }
        }
        final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
                eventPrefix, compressBody);

        /**
         * !!!only modified some codes here
         * if  message instanceof SimpleMessage then run deafalut logic
         * else if  message instanceof ObjectMessage ,get obj which passed into log.info() by call event.getMessage().getParameters()
         * it only has one element,so parameters's index is zero
         */
        Message message = event.getMessage();
        if(message instanceof SimpleMessage){
            flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
        }else if(message instanceof ObjectMessage) {
            Object[] parameters = event.getMessage().getParameters();
            if (parameters[0] instanceof byte[]) {
                byte[] bytes = (byte[]) parameters[0];
                flumeEvent.setBody(bytes);
            }
        }

        /**
         * btw,you can add some infomation into header ,just like those:
         */
        /*flumeEvent.getHeaders().put("dir","/home/hadoop/hdp_lbg_ectech/");
        flumeEvent.getHeaders().put("user","hdp_lbg_ectech");
        flumeEvent.getHeaders().put("topic","flume_topic");*/
        manager.send(flumeEvent);
    }

    @Override
    public boolean stop(final long timeout, final TimeUnit timeUnit) {
        setStopping();
        boolean stopped = super.stop(timeout, timeUnit, false);
        stopped &= manager.stop(timeout, timeUnit);
        setStopped();
        return stopped;
    }

    /**
     * Create a Flume event.
     * @param event The Log4j LogEvent.
     * @param includes comma separated list of mdc elements to include.
     * @param excludes comma separated list of mdc elements to exclude.
     * @param required comma separated list of mdc elements that must be present with a value.
     * @param mdcPrefix The prefix to add to MDC key names.
     * @param eventPrefix The prefix to add to event fields.
     * @param compress If true the body will be compressed.
     * @return A Flume Event.
     */
    @Override
    public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
                                  final String required, final String mdcPrefix, final String eventPrefix,
                                  final boolean compress) {
        return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
                eventPrefix, compressBody);
    }

    /**
     * Create a Flume Avro Appender.
     * @param agents An array of Agents.
     * @param properties Properties to pass to the embedded agent.
     * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
     * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
     * @param type Avro (default), Embedded, or Persistent.
     * @param dataDir The directory where the Flume FileChannel should write its data.
     * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is
     *                          1000.
     * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
     * @param agentRetries The number of times to retry an agent before failing to the next agent.
     * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch.
     * @param name The name of the Appender.
     * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
     *               they are propagated to the caller.
     * @param excludes A comma separated list of MDC elements to exclude.
     * @param includes A comma separated list of MDC elements to include.
     * @param required A comma separated list of MDC elements that are required.
     * @param mdcPrefix The prefix to add to MDC key names.
     * @param eventPrefix The prefix to add to event key names.
     * @param compressBody If true the event body will be compressed.
     * @param batchSize Number of events to include in a batch. Defaults to 1.
     * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
     * @param factory The factory to use to create Flume events.
     * @param layout The layout to format the event.
     * @param filter A Filter to filter events.
     *
     * @return A Flume Avro Appender.
     */
    @PluginFactory
    public static FlumeByteAppender createAppender(@PluginElement("Agents") final Agent[] agents,
                                               @PluginElement("Properties") final Property[] properties,
                                               @PluginAttribute("hosts") final String hosts,
                                               @PluginAttribute("embedded") final String embedded,
                                               @PluginAttribute("type") final String type,
                                               @PluginAttribute("dataDir") final String dataDir,
                                               @PluginAliases("connectTimeout")
                                               @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
                                               @PluginAliases("requestTimeout")
                                               @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
                                               @PluginAttribute("agentRetries") final String agentRetries,
                                               @PluginAliases("maxDelay") // deprecated
                                               @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
                                               @PluginAttribute("name") final String name,
                                               @PluginAttribute("ignoreExceptions") final String ignore,
                                               @PluginAttribute("mdcExcludes") final String excludes,
                                               @PluginAttribute("mdcIncludes") final String includes,
                                               @PluginAttribute("mdcRequired") final String required,
                                               @PluginAttribute("mdcPrefix") final String mdcPrefix,
                                               @PluginAttribute("eventPrefix") final String eventPrefix,
                                               @PluginAttribute("compress") final String compressBody,
                                               @PluginAttribute("batchSize") final String batchSize,
                                               @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
                                               @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
                                               @PluginElement("Filter") final Filter filter) {

        final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
                (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0;
        final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
        final boolean compress = Booleans.parseBoolean(compressBody, true);
        FlumeByteAppender.ManagerType managerType;
        if (type != null) {
            if (embed && embedded != null) {
                try {
                    managerType = FlumeByteAppender.ManagerType.getType(type);
                    LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
                } catch (final Exception ex) {
                    LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
                            " is invalid.");
                    managerType = FlumeByteAppender.ManagerType.EMBEDDED;
                }
            } else {
                try {
                    managerType = FlumeByteAppender.ManagerType.getType(type);
                } catch (final Exception ex) {
                    LOGGER.warn("Type " + type + " is invalid.");
                    managerType = FlumeByteAppender.ManagerType.EMBEDDED;
                }
            }
        }  else if (embed) {
            managerType = FlumeByteAppender.ManagerType.EMBEDDED;
        }  else {
            managerType = FlumeByteAppender.ManagerType.AVRO;
        }

        final int batchCount = Integers.parseInt(batchSize, 1);
        final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
        final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
        final int retries = Integers.parseInt(agentRetries, 0);
        final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
        final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);

        if (layout == null) {
            final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
            layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
                    mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
                    null);
        }

        if (name == null) {
            LOGGER.error("No name provided for Appender");
            return null;
        }

        AbstractFlumeManager manager;

        switch (managerType) {
            case EMBEDDED:
                manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
                break;
            case AVRO:
                manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
                break;
            case PERSISTENT:
                manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries,
                        connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
                break;
            default:
                LOGGER.debug("No manager type specified. Defaulting to AVRO");
                manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
        }

        if (manager == null) {
            return null;
        }

        return new FlumeByteAppender(name, filter, layout,  ignoreExceptions, includes,
                excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
    }

    private static Agent[] getAgents(Agent[] agents, final String hosts) {
        if (agents == null || agents.length == 0) {
            if (hosts != null && !hosts.isEmpty()) {
                LOGGER.debug("Parsing agents from hosts parameter");
                final String[] hostports = hosts.split(",");
                agents = new Agent[hostports.length];
                for(int i = 0; i < hostports.length; ++i) {
                    final String[] h = hostports[i].split(":");
                    agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null);
                }
            } else {
                LOGGER.debug("No agents provided, using defaults");
                agents = new Agent[] {Agent.createAgent(null, null)};
            }
        }

        LOGGER.debug("Using agents {}", agents);
        return agents;
    }
}

5.3 定义日志发送客户端

测试上面自定义的Appender,创建一个工程,核心代码如下:
log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>

<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <!--use custome FlumeByteAppender-->
        <FlumeByte name="eventLogger" compress="false" type="Avro">
            <!--it can config more than one host for HA
                attention to your host ip and port that must same with your configuration in flume -->
            <Agent host="0.0.0.0" port="7777"/>
        </FlumeByte>
        <!--you had better to use AsyncAppender to ref FlumeByte-->
        <Async name="Async">
            <AppenderRef ref="eventLogger"/>
        </Async>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="Async"/>
            <AppenderRef ref="Console"></AppenderRef>
        </Root>
    </Loggers>
</Configuration>

运行类,其中依赖的AdxLog类是上文的pb协议文件生成的。

package net.bigdataer.demo.log4j.logwriter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * Created by liuxuecheng on 2018/6/29.
 */
public class WriteLogToFlume {

    public static void main(String args[]) throws InterruptedException {
        Logger log = LogManager.getLogger(WriteLogToFlume.class);
        //the default buffer size in AsyncAppender is 1024,so you variable must bigger than 1024
        for(int i = 0;i<10000;i++){
            AdxWinLog.AdxLog adxLog = AdxWinLog.AdxLog.newBuilder()
                    .setTs("ts"+System.currentTimeMillis())
                    .setUserId("userid"+i)
                    .setImei("ime"+i)
                    .setDisplayId("displayid"+i)
                    .setCookieId("cookie=="+i)
                    .build();
            // log.info can accept Object kinds paramter,invoke method toByteArray() to transfer adxLog to byte[]
            log.info(adxLog.toByteArray());
        }
    }
}

完整工程:https://github.com/bigdataer01/log4j2-flumebyte-appender

5.4 自定义CustomAvroToHdfsSerializer

(1)flume的hdfs sink使用FlumeEventAvroEventSerializer,支持以SequenceFile或者DataStream或CompressedStream这三种类型的文件写入hdfs。
(2)source type为avro的flume agent中数据以avro_event的形式存在,它的结构是一个avro结构,schema为:

{“type”:”record”,”name”:”Event”,”fields”:[{“name”:”headers”,”type”:{“type”:”map”,”values”:”string”}},{“name”:”body”,”type”:”bytes”}]}

如果以这种格式序列化到hdfs,会带上header的信息,导致没法解析body里面的byte[]。
(3)重写serializer去掉header信息,在代码里需要重新定义avro_event的格式。
代码比较简单:

package net.bigdataer.demo.flume.sink.serializer;

import org.apache.avro.Schema;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.AbstractAvroEventSerializer;
import org.apache.flume.serialization.EventSerializer;

import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
 * Created by liuxuecheng on 2018/7/2.
 */
public class CustomAvroToHdfsSerializer extends AbstractAvroEventSerializer<RawData> {

    private static final Schema SCHEMA = (new Schema.Parser()).parse("{\"type\":\"record\",\"name\":\"RawData\",\"fields\":[{\"name\":\"body\",\"type\":\"bytes\"}]}");
    private final OutputStream out;

    private CustomAvroToHdfsSerializer(OutputStream out) {
        this.out = out;
    }

    protected Schema getSchema() {
        return SCHEMA;
    }

    protected OutputStream getOutputStream() {
        return this.out;
    }

    protected RawData convert(Event event) {
        /**
         * remove the header of the avro_event
         * when event write to hdfs,it's sequencefile which formatter is <LongWritable,BytesWritable>
         */
        return RawData.newBuilder().setBody(ByteBuffer.wrap(event.getBody())).build();
    }

    public static class Builder implements EventSerializer.Builder {
        public Builder() {
        }

        public EventSerializer build(Context context, OutputStream out) {
            CustomAvroToHdfsSerializer writer = new CustomAvroToHdfsSerializer(out);
            writer.configure(context);
            return writer;
        }
    }
}

完整工程:https://github.com/bigdataer01/flume-hdfs-sink-serializer
将上述工程打包后的jar放到flume的lib目录下即可。

5.5 配置flume

在flume的conf目录下新建hdfs.conf,并写入以下配置信息:

a1.sources=source1  
a1.channels=channel1  
a1.sinks=sink1  

a1.sources.source1.type=avro  
a1.sources.source1.bind=0.0.0.0  
a1.sources.source1.port=7777  
a1.sources.source1.channels=channel1  

a1.channels.channel1.type=memory  
a1.channels.channel1.capacity=1000  
a1.channels.channel1.transactionCapacity=1000  
a1.channels.channel1.keep-alive=30  

a1.sinks.sink1.type=hdfs  
a1.sinks.sink1.channel=channel1  

#写入hdfs的目录
a1.sinks.sink1.hdfs.path=hdfs://localhost:9000/home/rawdata/flume/
#写入hdfs的文件类型
a1.sinks.sink1.hdfs.fileType=SequenceFile  
a1.sinks.sink1.hdfs.rollInterval=0  
a1.sinks.sink1.hdfs.rollSize=10240  
a1.sinks.sink1.hdfs.rollCount=0  
a1.sinks.sink1.hdfs.idleTimeout=60 
#在这里使用自定义serializer
a1.sinks.sink1.hdfs.serializer=net.bigdataer.demo.flume.sink.serializer.CustomAvroToHdfsSerializer

5.6 启动flume

进入到flume的bin目录下,执行以下命令:

flume-ng.cmd agent -conf ../conf -conf-file ../conf/hdfs.conf -name a1 -property flume.root.logger=INFO,console

5.7 全流程测试

(1)直接在ide运行log4j2-flumebyte-appender项目中的WriteLogToFlume类,输出如下信息:
log4j
这是因为我们也配置了日志输出到console。
(2)flume的console界面输出如下信息:

2018-07-19 14:54:42,035 (hdfs-sink1-call-runner-2) [INFO – org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:655)] Renaming hdfs://localhost:9000/home/rawdata/flume/FlumeData.1531983230209.tmp to hdfs://localhost:9000/home/rawdata/flume/FlumeData.1531983230209
2018-07-19 14:54:42,068 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:251)] Creating hdfs://localhost:9000/home/rawdata/flume//FlumeData.1531983230210.tmp
2018-07-19 14:54:42,139 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:393)] Closing hdfs://localhost:9000/home/rawdata/flume//FlumeData.1531983230210.tmp

(3)查看hdfs,文件已经写入hdfs
hdfs

binaryfile

6.使用spark反序列化

将上述文件get到本地,使用下面的spark代码反序列化:

object SparkReadAvro {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("read").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.sequenceFile[LongWritable,BytesWritable]("file:///d:/logs/log")
      .map(x=>{
        x._2.copyBytes()
      }).map(AdxLog.parseFrom).foreach(x=>println(x.toString))
  }
}

deserialize

转载请注明:大数据随笔 » log4j2+flume传输二进制日志到hdfs

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

网友最新评论 (4)

  1. 很详细的文章,解决了很多问题。
    zhaiaosen1年前 (2018-07-19)回复
    • thanks
      liuxuecheng1年前 (2018-07-19)回复
  2. 你好,我得代码跟你的类似,但是出了个问题,就是.xml文件中,已经配置了agent,但是传输日志的时候,报找不到agent的错误
    9个月前 (03-13)回复
    • agent正常运行不?flume agent的输入要和log4j的输出保持一致
      liuxuecheng9个月前 (03-19)回复