Flume
概念
Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据,同时Flume提供对数据简单处理,并写到各种数据接收方的能力
结构
Flume 以agent为最小的独立运行单元
一个agent就是一个jvm
单个agent由source、sink和channel三大组件构成
agent: 能独立执行数据收集任务的JVM进程
source: 一个agent用来和数据源对接的服务
常见种类:
avro source: 接受网络端口中的数据
exec source: tail -f 监听某个文件中新增加的内容, 检测新增日志
spooldir source: 监控文件夹 如果这个文件夹发生变化, 就可以采集
custom source: 自定义source
sink: 一个agent用来和数据目的地组件对接的服务
常见种类:
loggerSink: 做测试用
HDFSSink: 离线数据的sink
KafakSink: 流式数据sink
channel: agent内部的中转组件
常见种类:
memory: 内存中 快 不安全
file: 文件系统中 相对来说安全 但是效率低
jdbc: 使用数据库来进行数据保存
event: 在source、sink、channel中进行流转的数据消息封装对象(数据传输的基本单元 )以事件的形式将数据由源头传送到数据目的地
header 装载描述信息
body 装载真实数据
client: 将原始的log(数据)包装成一个events并且将他们发送到一个至多个agent的实体, 目的是将数据源系统和Flume中解耦, 在Flume拓扑结构中不是必须的
Flume三大案例:
一、官方案例监控端口数据案例
1、在flume的目录下面创建文件夹
[root@hadoop0 flume]# mkdir job
[root@hadoop0 flume]# cd job
2、定义配置文件telnet-logger.conf
[root@hadoop0 job]# vim telnet-logger.conf
添加内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #example.conf: A single-node Flume configuration
#Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
#Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
#Describe the sink a1.sinks.k1.type = logger
#Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
|
3、先开启flume监听端口
退到flume目录
官方样例:bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root. logger=INFO,console
实际操作:bin/flume-ng agent –conf conf/ –name a1 –conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
4、执行telnet localhost 44444
会先报找不到telnet
然后执行yum -y install telnet
5、发送命令测试即可
二、监控目录中的文件到HDFS
1、创建配置文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
添加下面的内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| a3.sources = r3 a3.sinks = k3 a3.channels = c3
#Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /software/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
#Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop0:8020/flume/upload/%Y%m%d/%H a3.sinks.k3.hdfs.filePrefix = upload- a3.sinks.k3.hdfs.round = true a3.sinks.k3.hdfs.roundValue = 1 a3.sinks.k3.hdfs.roundUnit = hour a3.sinks.k3.hdfs.useLocalTimeStamp = true a3.sinks.k3.hdfs.batchSize = 100 a3.sinks.k3.hdfs.fileType = DataStream a3.sinks.k3.hdfs.rollInterval = 600 a3.sinks.k3.hdfs.rollSize = 134217700 a3.sinks.k3.hdfs.rollCount = 0 a3.sinks.k3.hdfs.minBlockReplicas = 1
#Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100
#Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
|
2、启动监控目录命令
bin/flume-ng agent –conf conf/ –name a3 –conf-file job/dir-hdfs.conf
三、监控文件到HDFS
1、创建一个自动化文件
[root@hadoop0 job]#vim mydateauto.sh
写入:
1 2 3 4 5 6 7 8
| #!/bin/bash
while true do echo `date` sleep 1 done
|
然后运行测试:
[root@hadoop0 job]# sh mydateauto.sh
Wed Aug 19 18:34:19 CST 2020
Wed Aug 19 18:34:20 CST 2020
然后修改配置,将输出的日志追加到某个文件中
1 2 3 4 5 6 7
| #!/bin/bash
while true do echo `date` >> /software/flume/mydate.txt sleep 1 done
|
再次执行[root@hadoop0 job]# sh mydateauto.sh
就会在flume的文件夹下面生成了mydate.txt文件
通过tail -f mydate.txt 查看
再次执行sh mydateauto.sh 查看输出。
2、创建配置file-hdfs.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2
#Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /software/flume/mydate.txt a2.sources.r2.shell = /bin/bash -c
#Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop0:8020/flume/%Y%m%d/%H a2.sinks.k2.hdfs.filePrefix = logs- a2.sinks.k2.hdfs.round = true a2.sinks.k2.hdfs.roundValue = 1 a2.sinks.k2.hdfs.roundUnit = hour a2.sinks.k2.hdfs.useLocalTimeStamp = true a2.sinks.k2.hdfs.batchSize = 1000 a2.sinks.k2.hdfs.fileType = DataStream a2.sinks.k2.hdfs.rollInterval = 600 a2.sinks.k2.hdfs.rollSize = 134217700 a2.sinks.k2.hdfs.rollCount = 0 a2.sinks.k2.hdfs.minBlockReplicas = 1
#Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100
#Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
|
3、启动
bin/flume-ng agent –conf conf/ –name a2 –conf-file job/file-hdfs.conf