Flume高级组件之Sink处理器

2023-10-12 大数据flume

# 负载均衡处理器

如图所示有3个服务器,服务器C有两个Sink,负载均衡策略会将sink请求发送到服务器A&B,然后A&B将数据存储到HDFS。即使A&B有一个挂了,此时服务也能正常运行,只是效率会相较于两个服务器有所降低。

负载均衡Sink

服务器C配置如下:

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# source组件配置
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置 [配两个,batch-size=1表示有1条数据就转发,默认100,方便演示改为1]
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdfs://192.168.133.104
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hdfs://192.168.133.105
a1.sinks.k2.port = 41414 
a1.sinks.k2.batch-size = 1

# 配置sink策略为轮训
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

# 将组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
服务器A配置
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k4.type = hdfs
a1.sinks.k4.hdfs.path = hdfs://192.168.133.103:9000/load_balance
a1.sinks.k4.hdfs.fileType = DataStream
a1.sinks.k4.hdfs.writeFormat = Text
a1.sinks.k4.hdfs.rollInterval = 3600
a1.sinks.k4.hdfs.rollSize = 4194304
a1.sinks.k4.hdfs.rollCount = 0
a1.sinks.k4.hdfs.useLocalTimeStamp = true
a1.sinks.k4.hdfs.filePrefix = data104
a1.sinks.k4.hdfs.fileSuffix = .log

# 将组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
服务器B配置























 






a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k4.type = hdfs
a1.sinks.k4.hdfs.path = hdfs://192.168.133.103:9000/load_balance
a1.sinks.k4.hdfs.fileType = DataStream
a1.sinks.k4.hdfs.writeFormat = Text
a1.sinks.k4.hdfs.rollInterval = 3600
a1.sinks.k4.hdfs.rollSize = 4194304
a1.sinks.k4.hdfs.rollCount = 0
a1.sinks.k4.hdfs.useLocalTimeStamp = true
a1.sinks.k4.hdfs.filePrefix = data105
a1.sinks.k4.hdfs.fileSuffix = .log

# 将组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意先启动A&B服务器,然后启动C,这样C才能连接到A&B

# 故障转移处理器

如果Sink1不能正常工作,将专用Sink2,注意这两个不会负载均衡,只会有一个进行工作。

故障转移Sink.drawio

服务器C配置如下:



























 
 
 
 
 
 






a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# source组件配置
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hdfs://192.168.133.104
a1.sinks.k1.port = 41414
a1.sinks.k1.batch-size = 1

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hdfs://192.168.133.105
a1.sinks.k2.port = 41414 
a1.sinks.k2.batch-size = 1

# 配置sink策略为轮训
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 将组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
服务器A配置
















 













a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source组件配置
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k4.type = hdfs
a1.sinks.k4.hdfs.path = hdfs://192.168.133.103:9000/failover
a1.sinks.k4.hdfs.fileType = DataStream
a1.sinks.k4.hdfs.writeFormat = Text
a1.sinks.k4.hdfs.rollInterval = 3600
a1.sinks.k4.hdfs.rollSize = 4194304
a1.sinks.k4.hdfs.rollCount = 0
a1.sinks.k4.hdfs.useLocalTimeStamp = true
a1.sinks.k4.hdfs.filePrefix = data104
a1.sinks.k4.hdfs.fileSuffix = .log

# 将组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
服务器B配置如下:
















 













a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1

# source组件配置
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

# channels组件配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink组件配置
a1.sinks.k4.type = hdfs
a1.sinks.k4.hdfs.path = hdfs://192.168.133.103:9000/failover
a1.sinks.k4.hdfs.fileType = DataStream
a1.sinks.k4.hdfs.writeFormat = Text
a1.sinks.k4.hdfs.rollInterval = 3600
a1.sinks.k4.hdfs.rollSize = 4194304
a1.sinks.k4.hdfs.rollCount = 0
a1.sinks.k4.hdfs.useLocalTimeStamp = true
a1.sinks.k4.hdfs.filePrefix = data105
a1.sinks.k4.hdfs.fileSuffix = .log

# 将组件关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

服务器A&B相较于负载均衡没什么变化,改一下HDFS路径即可

上次更新: 4 个月前