Flume日志采集简介

Flume日志采集简介
一、flume分布flume的核心组件分为数据源source、管道channel和目的地sink。source对接数据源channel用于中间缓存数据sink对接目的地在整个数据传输的过程中流动的是 event它是 Flume 内部数据传输的最基本单元。event 将传输的数据进行封装。如果是文本文件,通常是一行记录event 也是事务的基本单位。event 从 source流向 channel再到 sink本身为一个字节数组并可携带 headers(头信息)信息。event 代表着一个数据的最小完整单元从外部数据源来向外部的目的地去。一个完整的 event 包括event headers、event body、event 信息其中event 信息就是 flume 收集到的日记记录。二、flume的结构flume的内部结构分为标准和串联标准结构单台flume传输单管道单目的地。串联多台flume传输给一台flume的数据源然后这台进行汇总传输到最终目的地hdfs。flume的原理​ flume本身是一个Java进程启动以后会生成一个叫agent的进程。​ agent进程中包含了sourcesink和channel。​ 在flume中数据被包装为event。真实的数据存放在event body中event是flume中最小的数据单元​三、flume的安装与部署flume官网(https://flume.apache.org/)Flume 1.11.0 用户指南 — Apache Flumeflume部署步骤解压配置环境变量改名flume-env.sh、在flume-env.sh输入Java路径删除lib下guava可能导致的版本冲突问题编辑conf文件启动flumeflume监视端口案例官网案例# example.conf: A single-node Flume configuration# 定义agent组件的名称a1.sourcesr1 a1.sinksk1 a1.channelsc1# 定义sources组件属性r1# 设定type为netcat用处为监听端口的数据转换log为英文文本a1.sources.r1.typenetcat a1.sources.r1.bindlocalhost a1.sources.r1.port44444# 定义sinks组件属性k1# 定义type打印方式这里为logger即屏幕上还可为hdfs等类型a1.sinks.k1.typelogger# 定义channels组件属性c1a1.channels.c1.typememory# 设定最多存放1000个、设定事务容量为100个一起成功或失败a1.channels.c1.capacity1000a1.channels.c1.transactionCapacity100# 设置sources和sinks的管道即channelsa1.sources.r1.channelsc1 a1.sinks.k1.channelc1flume启动案例flume-ng agent-nagent-c$FLUME_HOME/conf-f$FLUME_HOME/conf/flume-properties.conf-Dflume.root.loggerINFO,consoleflume监视文件目录案例a1.sourcesr1a1.sinksk1a1.channelsc1sources# 设定sources属性r1# 设定sources的type为spooldir可以监视文件目录# 注意不能有同名的文件在hdfs下产生否则报错罢工所以一般使用时间戳来命名进行采集数据a1.sources.r1.typespooldir a1.sources.r1.spoolDir/root/dirlogs a1.sources.r1.fileHeadertruesinks# 定义sinks组件属性k1# 定义type打印方式这里为hdfsa1.sinks.k1.typehdfs# 设置动态获取当前时间a1.sinks.k1.hdfs.useLocalTimeStamptruea1.sinks.k1.hdfs.path/tmp/flume/%y-%m-%d/%H:%M/%S# filePrefix设定文件前缀a1.sinks.k1.hdfs.filePrefix-events# fileSuffix设置文件后缀a1.sinks.k1.hdfs.fileSuffix.log# 设置文件类型为datastream普通文件默认为序列化文件a1.sinks.k1.hdfs.fileTypeDatastream# round 控制文件夹以多少时间滚动、是否开启时间舍弃# 这里设置为10分钟则10分钟内采集的文件都会在一个文件夹下面a1.sinks.k1.hdfs.roundtruea1.sinks.k1.hdfs.roundValue10a1.sinks.k1.hdfs.roundUnitminute# roll 控制写入hdfs文件以何种方式滚动a1.sinks.k1.hdfs.rollInterval3# 以时间间隔、默认30秒a1.sinks.k1.hdfs.rollSize20# 以文件大小、默认1024a1.sinks.k1.hdfs.rollCount5# 以event数量、默认10个# 如果三个都设置谁先满足谁触发# 如果不想以某种属性为滚动设置为0即可channels# 定义channels组件属性c1# 设定使用内存来缓存a1.channels.c1.typememory# 设定最多存放1000个、设定事务容量为100个event一起成功或失败a1.channels.c1.capacity1000a1.channels.c1.transactionCapacity100设定连接# 设置sources和sinks的管道即channelsa1.sources.r1.channelsc1 a1.sinks.k1.channelc1flume监视文件日志实时监视、实时更新a1.sourcesr1a1.sinksk1a1.channelsc1sources# 设定sources属性r1# 设置exec可以运行cat或者tail -F命令的结果作为数据进行收集a1.sources.r1.typeexeca1.sources.r1.commandcat/root/logs/tests.logsinks# 定义sinks组件属性k1a1.sinks.k1.typehdfs a1.sinks.k1.hdfs.useLocalTimeStamptruea1.sinks.k1.hdfs.path/tmp/flume/%y-%m-%d/%H:%M/%S a1.sinks.k1.hdfs.filePrefix-eventsa1.sinks.k1.hdfs.fileSuffix.log a1.sinks.k1.hdfs.fileTypeDatastream a1.sinks.k1.hdfs.roundtruea1.sinks.k1.hdfs.roundValue10a1.sinks.k1.hdfs.roundUnitminute a1.sinks.k1.hdfs.rollInterval3a1.sinks.k1.hdfs.rollSize20a1.sinks.k1.hdfs.rollCount5channels# 定义channels组件属性c1a1.channels.c1.typememory a1.channels.c1.capacity1000a1.channels.c1.transactionCapacity100设定连接# 设置sources和sinks的管道即channelsa1.sources.r1.channelsc1 a1.sinks.k1.channelc1spooldir监视文件a1.sourcess1a1.channelsc1a1.sinkss2a1.sources.s1.typespooldira1.sources.s1.spoolDir/root/csv/a1.sources.s1.fileHeadertruea1.channels.c1.typememorya1.channels.c1.capacity100000000a1.channels.c1.transactionCapacity50000000a1.sinks.s2.typehdfsa1.sinks.s2.hdfs.pathhdfs://master:9000/inputa1.sinks.s2.hdfs.rollSize100000000a1.sinks.s2.hdfs.rollCount0a1.sinks.s2.hdfs.rollInterval0a1.sinks.s2.hdfs.batchSize50000000a1.sources.s1.channelsc1a1.sinks.s2.channelc1四、flume的load-balance、failoverflume的负载均衡flume在串联的过程中如果前面的agent能够一次处理100个event、后面一个则只能一次处理10个event、那么就会出现堆积情况。这时候可以多个进程进行处理前面的agent。同一个请求只能交给一个进行处理避免数据重复。如何分配请求就涉及到了负载均衡的算法轮询(round_robin) 随机(random) 权重负载均衡是用于解决一台机器一个进程无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能如下图Agent1是一个路由节点负责将Channel暂存的Event均衡到对应的多个Sink组件上而每个Sink组件分别连接到一个独立的Agent 上示例配置如下所示a1.sinkgroupsg1 a1.sinkgroups.g1.sinksk1 k2 k3 a1.sinkgroups.g1.processortypeload_balance a1.sinkgroups.g1.processor.backofftrue#如果开启则将失败的sink放入黑名单a1.sinkgroups.g1.processor.selectorround_robin#另外还支持randoma1.sinkgroups.g1.processor.selector.maxTimeOut10000#在黑名单放置的超时时间超时结束时若仍然无法接收则超时时间呈指数增长flume串联跨网络传输数据avro sinkavro source使用上述两个组件指定绑定的端口ip 就可以满足数据跨网络传递 通常用于flume串联架构中。agent1.sinks.k1.channelc1 agent1.sinks.k1.typeavro agent1.sinks.k1.hostnamenode-2 agent1.sinks.k1.port52020# set sink2agent1.sinks.k2.channelc1 agent1.sinks.k2.typeavro agent1.sinks.k2.hostnamenode-3 agent1.sinks.k2.port52020#set sink groupagent1.sinkgroups.gl.sinksk1 k2#set failoveragent1.sinkgroups.gl.processor.typeload_balance agent1.sinkgroups.gl.processor.backofftrueagent1.sinkgroups.gl.processor.selectorround robin|agent1.sinkgroups.gl.processor.selector.maxTimeOut10000al.sourcesrl al.sinkskl al.channelscl# Describe/configure the sourceal.sources.rl.typeavro al.sources.rl.channelscl al.sources.rl.bindnode-3 al.sources.rl.port52020# Describe the sinkal.sinks.kl.typelogger# Use a channel which buffers events in memoryal.channels.cl.typememory al.channels.cl.capacity1000al.channels.cl.transactionCapacity100# Bind the source and sink to the channelal.sources.rl.channelscl al.sinks.kl.channelcl