目录
  1. 1. Flume
    1. 1.1. 概念
    2. 1.2. 结构
  2. 2. Flume三大案例:
Flume相关

Flume

概念

Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据,同时Flume提供对数据简单处理,并写到各种数据接收方的能力

结构

Flume 以agent为最小的独立运行单元
一个agent就是一个jvm
单个agent由source、sink和channel三大组件构成

agent: 能独立执行数据收集任务的JVM进程
source: 一个agent用来和数据源对接的服务
  常见种类:
    avro source: 接受网络端口中的数据
    exec source: tail -f 监听某个文件中新增加的内容, 检测新增日志
    spooldir source: 监控文件夹 如果这个文件夹发生变化, 就可以采集
    custom source: 自定义source
sink: 一个agent用来和数据目的地组件对接的服务
  常见种类:
    loggerSink: 做测试用
    HDFSSink: 离线数据的sink
    KafakSink: 流式数据sink
channel: agent内部的中转组件
  常见种类:
    memory: 内存中 快 不安全
    file: 文件系统中 相对来说安全 但是效率低
    jdbc: 使用数据库来进行数据保存
    
event: 在source、sink、channel中进行流转的数据消息封装对象(数据传输的基本单元 )以事件的形式将数据由源头传送到数据目的地
  header 装载描述信息
  body 装载真实数据
client: 将原始的log(数据)包装成一个events并且将他们发送到一个至多个agent的实体, 目的是将数据源系统和Flume中解耦, 在Flume拓扑结构中不是必须的

Flume三大案例:

一、官方案例监控端口数据案例
1、在flume的目录下面创建文件夹
[root@hadoop0 flume]# mkdir job
[root@hadoop0 flume]# cd job
2、定义配置文件telnet-logger.conf
[root@hadoop0 job]# vim telnet-logger.conf
添加内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#example.conf: A single-node Flume configuration

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Describe the sink
a1.sinks.k1.type = logger

#Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、先开启flume监听端口
退到flume目录
官方样例:bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root. logger=INFO,console
实际操作:bin/flume-ng agent –conf conf/ –name a1 –conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console

4、执行telnet localhost 44444
会先报找不到telnet
然后执行yum -y install telnet
5、发送命令测试即可

二、监控目录中的文件到HDFS
1、创建配置文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
添加下面的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
a3.sources = r3
a3.sinks = k3
a3.channels = c3

#Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

#Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop0:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

#Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

#Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

2、启动监控目录命令
bin/flume-ng agent –conf conf/ –name a3 –conf-file job/dir-hdfs.conf

三、监控文件到HDFS
1、创建一个自动化文件
[root@hadoop0 job]#vim mydateauto.sh
写入:

1
2
3
4
5
6
7
8
#!/bin/bash

while true
do
echo `date`
sleep 1
done

然后运行测试:
[root@hadoop0 job]# sh mydateauto.sh
Wed Aug 19 18:34:19 CST 2020
Wed Aug 19 18:34:20 CST 2020

然后修改配置,将输出的日志追加到某个文件中

1
2
3
4
5
6
7
#!/bin/bash

while true
do
echo `date` >> /software/flume/mydate.txt
sleep 1
done

再次执行[root@hadoop0 job]# sh mydateauto.sh
就会在flume的文件夹下面生成了mydate.txt文件
通过tail -f mydate.txt 查看
再次执行sh mydateauto.sh 查看输出。

2、创建配置file-hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

#Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /software/flume/mydate.txt
a2.sources.r2.shell = /bin/bash -c

#Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop0:8020/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

#Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

#Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3、启动
bin/flume-ng agent –conf conf/ –name a2 –conf-file job/file-hdfs.conf

文章作者: sovwcwsfm
文章链接: http://sovwcwsfm.com/blog/page/flume.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 _L