Flume框架常用的source interceptors有以下几种:

  1. imestamp Interceptor:向event中的header里面添加timestamp 时间戳信息
  2. Host Interceptor:向event中的header里面添加host属性,host的值为当前机器的主机名或者ip
  3. Search and Replace Intercepto (opens new window):根据指定的规则查询Event中body里面的数据,然后进行替换, 这个拦截器会修改event中body的值,也就是会修改原始采集到的数据内容
  4. Static Interceptor:向event中的header里面添加固定的key和value
  5. Regex Extractor Interceptor:根据指定的规则从Event中的body里面抽取数据,生成key和value, 再把key和value添加到header中

# 实战:替换日志内容

假设日志会产生如下两种数据,注意看type的内容。

{
    "id": 86,
    "title": "数据结构和算法极速上手-java版",
    "coverImg": "https://file.aaa.net/video/2023/cover/86.png",
    "oldAmount": 99.00,
    "type": "video_info"
}
{
    "id":2,
    "name": "张三",
    "headImg": "https://file.ttt.net/user/2023/cover/2.png",
    "type": "user_info"
}

现在有的需求如下:

  1. 日志按时间和日志类型(type)存储,存储到如下路径:
    • hdfs://192.168.133.103:9000/moreType/20231001/videInfo
    • hdfs://192.168.133.103:9000/moreType/20231001/userInfo
  2. 类型的下划线转换成驼峰形式书写,整理执行逻辑如下图所示:

SourcelInterceptors处理流程案例

在安装了flume服务器的flume/conf目录下创建file-to-hdfs-moreType.conf文件

a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = exec
a1.sources.r1.spoolDir = tail -F /data/log/moreType.log

# score拦截器配置 [多个拦截器按照顺序依次执行]
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info"
a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo"

a1.sources.r1.interceptors.i2.type = search_replace
a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info"
a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo"

a1.sources.r1.interceptors.i3.type = regex_extractor
a1.sources.r1.interceptors.i3.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i3.serializers = s1
a1.sources.r1.interceptors.i3.serializers.s1.name = logType

# channels组件配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/local/soft/flume/apache-flume-1.11.0-bin/data/moreType/checkpoint
a1.channels.c1.dataDirs = /usr/local/soft/flume/apache-flume-1.11.0-bin/data/moreType/data

# sink组件配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdfs://192.168.133.106 
a1.sinks.k1.port = 45454

# 将3个组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#