# 案例:Flume采集文件夹内数据上传至HDFS

官方文档:采集文件的source配置 (opens new window)基于文件的Channel (opens new window)hdfs sink (opens new window)

/usr/local/soft/flume/apache-flume-1.11.0-bin/conf目录下创建file-to-hdfs.conf文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source组件配置
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/test # 指定要监控的文件目录

# channels组件配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/local/soft/flume/apache-flume-1.11.0-bin/data/testDir/checkpoint # 存放检查点目录
a1.channels.c1.dataDirs = /usr/local/soft/flume/apache-flume-1.11.0-bin/data/testDir/data # 存放数据的目录

# sink组件配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.133.103:9000/flume/testDir  # 数据存储在HDFS中的目录路径
a1.sinks.k1.hdfs.filePrefix = test-  # 文件名的前缀
a1.sinks.k1.hdfs.fileType = DataStream  # 要写入HDFS的文件类型
a1.sinks.k1.hdfs.writeFormat = Text  # 数据写入文件的格式
a1.sinks.k1.hdfs.rollInterval = 3600  # 创建新文件的时间间隔(以秒为单位)
a1.sinks.k1.hdfs.rollSize = 4194304  # 在创建新文件之前的文件最大大小(以字节为单位),4MB
a1.sinks.k1.hdfs.rollCount = 0  # 单个文件可以写入的最大事件数。0表示无限制。

# 将3个组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令:bin/flume-ng agent --conf conf --conf-file conf/file-to-hdfs.conf --name a1

# 案例:采集网站日志上传至HDFS

现在来做一个复杂点的案例,分布式网站日志数据采集,核心说明如下:

  1. 将A和B两台机器实时产生的日志数据汇总到机器C中
  2. 通过机器C将数据统一上传至HDFS的指定目录中
  3. 要求HDFS中的目录是按天生成的,每天一个目录

flume采集多台网站数据到HDFS

# 服务器A&B安装解压Flume

[root@bigdata05 ~]# cd /usr/local/soft/flume/
[root@bigdata05 flume]# tar -zxvf apache-flume-1.11.0-bin.tar.gz 
[root@bigdata05 flume]# cd apache-flume-1.11.0-bin/conf/
[root@bigdata05 conf]# mv flume-env.sh.template flume-env.sh

# 编写配置文件

编写配置文件file-to-avro-104.conf,104是IP地址的后3位,方便区分,各机器文件内容如下:

服务器A的配置文件,file-to-avro-104.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = exec
a1.sources.r1.spoolDir = tail -F /data/log/access.log

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdfs://192.168.182.106 
a1.sinks.k1.port = 45454

# 将3个组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
服务器B的配置文件,file-to-avro-105.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = exec
a1.sources.r1.spoolDir = tail -F /data/log/access.log

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdfs://192.168.182.106 
a1.sinks.k1.port = 45454

# 将3个组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
服务器C的配置文件,file-to-avro-106.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 45454

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.133.103:9000/access/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = access
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 4194304
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 将3个组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意:服务器C应最先启动,然后启动剩余两个服务,因为A和B需要连接到C发送数据