前言
数据采集与数据传输是大数据/分布式计算技术的两项基本需求。对应于这两项需求,开源社区有很多成熟的解决方案,例如 Scribe,Flume,Chukwa,Kafka,各类MQ等等,基于不同的业务场景可以灵活选择。考虑到我们的实际业务模式与技术特点,在分布式计算系统中需要选择轻量级、可定制性好、扩展性强、易于维护的数据采集与传输单元。其中,采集单元选用模块化程度极高的 Flume-ng,传输单元选择高吞吐率的 Kafka,将两者结合共同构成分布式计算集群的基础数据输入组件。
0. 材料准备
- Flume 安装程序(版本:Flume-ng 1.5.3)
- Kafka 安装程序(版本:Kafka 0.8.2)
- Flume-Kafka 插件
1. 配置 Flume
# 配置 flume agent
vi conf/flume-conf.properties
## 定义 agent
# 本agent的名称为 “agent181”
agent181.sources = src1
agent181.channels = ch1
agent181.sinks = k1
## 定义 sources
agent181.sources.src1.type = avro
agent181.sources.src1.channels = ch1
# 本地 flume 服务器地址,需要在 hosts 中注册
agent181.sources.src1.bind = hd181
# source 绑定端口
agent181.sources.src1.port = 41414
## 定义 sinks
agent181.sinks.k1.type = com.thilinamb.flume.sink.KafkaSink
# 需要连接的 topic 名称
# 注意:如果此 topic 不存在(即在 Kafka 集群中未创建)则默认连接到一个名为 “default-flume-topic” 的 topic
agent181.sinks.k1.custom-topic = flumeTopic
agent181.sinks.k1.preprocessor = com.thilinamb.flume.sink.example.SimpleMessagePreprocessor
# 需要连接到的 Kafka 服务器地址与端口(这里是 kafka182 )
agent181.sinks.k1.kafka.metadata.broker.list=kafka182:9092
agent181.sinks.k1.kafka.serializer.class = kafka.serializer.StringEncoder
agent181.sinks.k1.kafka.request.required.acks = 1
agent181.sinks.k1.channel = ch1
## 定义 channels
agent181.channels.ch1.type = memory
agent181.channels.ch1.capacity = 1000
2. 准备 flume-kafka 插件
下载插件:(该插件即将集成到1.6版本的 Flume 官方程序中,但在1.5及以下的版本中仍然需要手动配置)
https://github.com/thilinamb/flume-ng-kafka-sink
编译:mvn clean install
或者 mvn package
编译完成之后在 dist/target
目录下会生成 flume-kafka-sink-dist-0.5.0-bin.zip
,解压缩后,在 lib 目录下有四个依赖jar包:
- flume-kafka-sink-impl-x.x.x.jar
- kafka_x.x.-x.x.x.x.jar
- metrics-core-x.x.x.jar
- scala-library-x.x.x.jar
添加依赖包:在Flume的安装目录下建立 plugins.d
文件夹,再在该文件夹下建立 kafka-sink
文件夹,然后在kafka-sink文件夹下建立 lib
与 libext
两个文件夹,将 flume-kafka-sink-impl-0.5.0.jar 拷贝到 lib 下,其他三个jar包拷贝到 libext 下,整个目录结构如下所示:
${FLUME_HOME}
|-- plugins.d
|-- kafka-sink
|-- lib
|-- flume-kafka-sink-impl-x.x.x.jar
|-- libext
|-- kafka_x.x.-x.x.x.x.jar
|-- metrics-core-x.x.x.jar
|-- scala-library-x.x.x.jar
NOTES
- 上述 Flume 配置文件中提到的 "默认连接到一个名为 ‘default-flume-topic’ 的 topic" 实际上是在
flume-ng-kafka-sink
项目中定义的,如果需要修改默认名称等属性,可以修改Constants
与MessagePreprocessor
接口实现类的extractTopic
方法。Key 和 Message 的处理也可以根据需要通过MessagePreprocessor
接口的另外两个方法类似实现。由于插件作者写的SimpleMessagePreprocessor
中定义了属性名为custom-topic
的 topic名称,会对使用者造成一定的混淆,同时额外的 example module 也不便于集成到编译后的插件中,因此,我在原始插件代码的基础上做了一点修改,并更新到 weyo/flume-ng-kafka-sink 项目中,可以直接下载并使用 Maven 编译生成需要的插件。
3. 运行
- 启动 Kafka server(每个 flume agent 对应的 Kafka broker,本例中为 kafka182)
bin/kafka-server-start.sh config/server.properties &
- 创建 Kafka topic(上面 Flume 配置文件中对应的topic名称)
bin/kafka-topics.sh --create --zookeeper zk1:2181 --replication 1 --partition 1 --topic flumeTopic
- 启动 Kafka consumer
bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic flumeTopic --from-beginning
- 启动 Flume (本例中为 agent181)
flume-ng agent -c /home/flume/conf/ -f /home/flume/conf/flume-conf.properties -n agent181 &
TIPS
- 如果需要监控 agent 配置信息,可以添加
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
参数,通过http://agenthost:34545
访问 agent 配置信息。
-
启动 Flume 数据源发送数据
-
在 Kafka consumer 客户端观察数据接收情况
Comments
comments powered by Disqus