前言

数据采集与数据传输是大数据/分布式计算技术的两项基本需求。对应于这两项需求,开源社区有很多成熟的解决方案,例如 Scribe,Flume,Chukwa,Kafka,各类MQ等等,基于不同的业务场景可以灵活选择。考虑到我们的实际业务模式与技术特点,在分布式计算系统中需要选择轻量级、可定制性好、扩展性强、易于维护的数据采集与传输单元。其中,采集单元选用模块化程度极高的 Flume-ng,传输单元选择高吞吐率的 Kafka,将两者结合共同构成分布式计算集群的基础数据输入组件。


0. 材料准备

  • Flume 安装程序(版本:Flume-ng 1.5.3)
  • Kafka 安装程序(版本:Kafka 0.8.2)
  • Flume-Kafka 插件

1. 配置 Flume

# 配置 flume agent
vi conf/flume-conf.properties
## 定义 agent
# 本agent的名称为 “agent181”
agent181.sources = src1
agent181.channels = ch1
agent181.sinks = k1

## 定义 sources
agent181.sources.src1.type = avro
agent181.sources.src1.channels = ch1
# 本地 flume 服务器地址,需要在 hosts 中注册
agent181.sources.src1.bind = hd181
# source 绑定端口
agent181.sources.src1.port = 41414

## 定义 sinks
agent181.sinks.k1.type = com.thilinamb.flume.sink.KafkaSink
# 需要连接的 topic 名称
# 注意:如果此 topic 不存在(即在 Kafka 集群中未创建)则默认连接到一个名为 “default-flume-topic” 的 topic
agent181.sinks.k1.custom-topic = flumeTopic
agent181.sinks.k1.preprocessor = com.thilinamb.flume.sink.example.SimpleMessagePreprocessor
# 需要连接到的 Kafka 服务器地址与端口(这里是 kafka182 )
agent181.sinks.k1.kafka.metadata.broker.list=kafka182:9092
agent181.sinks.k1.kafka.serializer.class = kafka.serializer.StringEncoder
agent181.sinks.k1.kafka.request.required.acks = 1
agent181.sinks.k1.channel = ch1

## 定义 channels
agent181.channels.ch1.type = memory
agent181.channels.ch1.capacity = 1000

2. 准备 flume-kafka 插件

下载插件:(该插件即将集成到1.6版本的 Flume 官方程序中,但在1.5及以下的版本中仍然需要手动配置)

https://github.com/thilinamb/flume-ng-kafka-sink

编译mvn clean install 或者 mvn package

编译完成之后在 dist/target 目录下会生成 flume-kafka-sink-dist-0.5.0-bin.zip,解压缩后,在 lib 目录下有四个依赖jar包:

  • flume-kafka-sink-impl-x.x.x.jar
  • kafka_x.x.-x.x.x.x.jar
  • metrics-core-x.x.x.jar
  • scala-library-x.x.x.jar

添加依赖包:在Flume的安装目录下建立 plugins.d 文件夹,再在该文件夹下建立 kafka-sink 文件夹,然后在kafka-sink文件夹下建立 liblibext 两个文件夹,将 flume-kafka-sink-impl-0.5.0.jar 拷贝到 lib 下,其他三个jar包拷贝到 libext 下,整个目录结构如下所示:

${FLUME_HOME}
 |-- plugins.d
        |-- kafka-sink
            |-- lib
                |-- flume-kafka-sink-impl-x.x.x.jar
            |-- libext
                |-- kafka_x.x.-x.x.x.x.jar
                |-- metrics-core-x.x.x.jar
                |-- scala-library-x.x.x.jar

NOTES

  • 上述 Flume 配置文件中提到的 "默认连接到一个名为 ‘default-flume-topic’ 的 topic" 实际上是在flume-ng-kafka-sink项目中定义的,如果需要修改默认名称等属性,可以修改 ConstantsMessagePreprocessor 接口实现类的 extractTopic 方法。Key 和 Message 的处理也可以根据需要通过 MessagePreprocessor 接口的另外两个方法类似实现。由于插件作者写的 SimpleMessagePreprocessor 中定义了属性名为 custom-topic 的 topic名称,会对使用者造成一定的混淆,同时额外的 example module 也不便于集成到编译后的插件中,因此,我在原始插件代码的基础上做了一点修改,并更新到 weyo/flume-ng-kafka-sink 项目中,可以直接下载并使用 Maven 编译生成需要的插件。

3. 运行

  • 启动 Kafka server(每个 flume agent 对应的 Kafka broker,本例中为 kafka182)
bin/kafka-server-start.sh config/server.properties & 
  • 创建 Kafka topic(上面 Flume 配置文件中对应的topic名称)
bin/kafka-topics.sh --create --zookeeper zk1:2181 --replication 1 --partition 1 --topic flumeTopic 
  • 启动 Kafka consumer
bin/kafka-console-consumer.sh --zookeeper zk1:2181 --topic flumeTopic --from-beginning
  • 启动 Flume (本例中为 agent181)
flume-ng agent -c /home/flume/conf/ -f /home/flume/conf/flume-conf.properties -n agent181 &

TIPS

  • 如果需要监控 agent 配置信息,可以添加 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 参数,通过 http://agenthost:34545 访问 agent 配置信息。
  • 启动 Flume 数据源发送数据

  • 在 Kafka consumer 客户端观察数据接收情况




Comments

comments powered by Disqus