业务场景
最近在开发过程中遇到了一个比较复杂的场景,需要实现如下的业务逻辑:
-
有多个连续的 Bolt,例如
BoltA-->BoltB-->BoltC-->BoltD...
,每一级 Bolt 均按照fieldGrouping
的方式进行处理; -
BoltA 对数据流进行定时处理,BoltA 处理后数据发送到 BoltB;
-
BlotA 一批数据发送完成之后发送
“发送完成”
信号给 BoltB; -
BoltB 接收 BoltA 所发送的数据并存储在内存(Map)中,当收到 BoltA 发送的
“发送完成”
信号后,开始处理内存中的数据,并将结果依次发送到 BoltC,同样,在内存中数据处理完成之后发送“发送完成”
信号给 BoltC; -
BoltC, BoltD, ... 采用与 BoltB 相同的批处理方式。
技术难点
这个场景看上去挺简单,只需要分别实现定时处理和批处理的逻辑就可以了,实际上有很多坑:
-
由于每一级 Bolt 的数据处理算法逻辑较复杂,不能直接使用 Trident;
-
在生产环境中集群的并发度 >> 1,每一个 Bolt 都有多个工作线程,并且分布在不同的 worker 节点上,无法确保
“发送完成”
信号能够恰好发送到下一级 Bolt 的所有的 task 上; -
同理,由于集群的分布式,每一级 Bolt 的缓存数据同样分布在不同的 worker 上,而且由于算法需要,每一级 Bolt 实际处理的域均有所不同,不能简单使用 fieldGrouping 方式进行汇总处理;而且也不能使用 allGrouping 的方式,否则会造成数据的重复处理;
-
可以使用 globalGrouping 的方式依次汇总每一级 Bolt 的数据流,但是这样就严重浪费了 Strom 的并发性能。
设计实现
这个问题着实困扰了我好几天的时间,期间尝试了多种方式均告失败:
-
直接串联 BoltA 与 BoltB,但是 BoltA 的
“发送完成”
信号始终无法保证发送到 BoltB 的所有 executor(task) 上; -
在两个连续 Bolt 中间设计一个中介 Bolt,用于接收数据并发送信号,结果发现这是与 1 相同的方式;
-
听说 DRPC 的组件
CoordinatedBolt
具有批处理的功能,使用 CoordinatedBolt 来包装 Bolt,结果与普通的串联 Bolt 相同;
这样的结果确实够打击人的,唯一的收获是了解到新版的 Storm 支持通过 tick tuple
来实现定时功能,这样 BoltA 的定时处理问题就能解决了(比我一开始时设计的 TimingSpout
效果要好得多,虽然还是达不到普通的 Java 程序时间控制的精确程度,但总是聊胜于无)。
郁闷了好一段时间之后,决定还是从那个号称能够实现完全批处理功能的 CoordinatedBolt
入手。在了解了 CoordinatedBolt
的实现原理后 (Twitter Storm源代码分析之CoordinatedBolt),隐隐约约感觉到这应该是一个设计方向。于是尝试在 BoltA 中同时使用 emit
与 emitDirect
来发送消息,其中,前者仍然发送普通数据,而后者用于发送信号,但是实际运行时发现 Storm 并不支持在同一个 Bolt 中同时采用普通的 emit
与 emitDirect
两种数据传输方式,topology 无法运行,这次尝试再告失败。
Let there be light: and there was light.
既然在同一个 Bolt 中同时发送两种数据无法成功,那么分成两个 Bolt 分别发送可不可行呢?
解决方案
Yes, it works.
在两级 Bolt 中加入一个并联(不是之前的串联方式)的中介 Bolt —— InterMediateBolt
,这个 Bolt 只负责发送 “发送完成”
信号,结构如下:
Stream1
BoltA --------------------------->BoltB
| ^
|Stream2 |
+--------> InterMediateBolt ------+
Strem1 代表普通的数据流,Stream2 代表 BoltA 发送的批处理信号。
InterMediateBolt
的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | public class InterMediateBolt extends BaseRichBolt {
private List<Integer> _countOutTasks;
private List<String> targetComponents;
private OutputCollector _collector;
private int sourceNumber;
private int count = 0;
public InterMediateBolt(int number, String... targetComponentIds) {
this._countOutTasks = new ArrayList<Integer>();
this.targetComponents = new ArrayList<String>();
for (String id : targetComponentIds) {
this.targetComponents.add(id);
}
this.sourceNumber = number;
}
@Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
TopologyContext context, OutputCollector collector) {
for (String component : this.targetComponents) {
for (Integer task : context.getComponentTasks(component)) {
_countOutTasks.add(task);
}
}
this._collector = collector;
}
@Override
public void execute(Tuple input) {
boolean signal = input.getBoolean(0);
if (++count == sourceNumber) {
System.out.println("InterMediateBolt received: ++++++++" + signal);
for (int task : _countOutTasks) {
_collector.emitDirect(task, input, new Values(true));
}
// 信号消息需要确保能够发送成功
_collector.ack(input);
count = 0;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("signal"));
}
}
|
这时上游的 Bolt 需要发送两个数据流(这里以 word-count 为例,代码中省略了根据 tick tuple 判断定时信号的逻辑):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public void execute(Tuple input) {
String word = input.getString(0);
int count = 0;
if (words.containsKey(word)) {
count = words.get(word);
}
words.put(word, ++count);
// 达到指定条件时批量发送缓存数据
if (condition) {
Iterator<Entry<String, Integer>> it = words.entrySet()
.iterator();
while (it.hasNext()) {
Entry<String, Integer> entry = it.next();
_collector.emit("stream1", input, new Values(false, entry.getKey(),
entry.getValue()));
}
// 缓存数据发送结束后发送“发送完成”信号
_collector.emit("stream2", input, new Values(true));
words.clear();
}
}
|
其中,stream1
仍是普通的数据流,stream2
则是信号数据流。
这样,下游的 Bolt 就可以很容易根据信号对数据流进行处理了:
1 2 3 4 5 6 7 8 9 10 11 12 13 | public void execute(Tuple input) {
boolean signal = input.getBoolean(0);
if (signal) {
System.out.println("Handle-task-" + taskId
+ ": received ***signal***");
} else {
String word = input.getString(1);
int count = input.getInteger(2);
System.out.println("Handle-task-" + taskId + ": received "
+ word + "-" + count);
}
_collector.ack(input);
}
|
在 topology 中可以这样定义各个 Bolt:
1 2 3 4 5 6 7 8 9 10 11 12 13 | int an = 3;
int bn = 5;
String boltA = "boltA";
String boltB = "boltB";
String intermediate = "intermediate";
builder.setBolt(boltA, new BoltA(), an)
.fieldsGrouping("words", new Fields("word"));
builder.setBolt(intermediate, new InterMediateBolt(an, boltB), 1)
.shuffleGrouping("boltA", "stream2");
builder.setBolt(boltB, new HandleBolt(), bn)
.fieldsGrouping(boltA, "stream1", new Fields("word"))
.directGrouping(intermediate);
|
说明
-
为了使得
InterMediateBolt
能够获取下游 Bolt 的各个 taskId,这里在InterMediateBolt
的构造器中传入了下游 Bolt 的componentId
参数,根据componentId
就可以在初始化阶段从TopologyContext
中获取到所需要的 taskId。 -
在
InterMediateBolt
的构造器中传入了上游 Bolt 的并发度(parallelism_hint)参数,是为了确保上游任务处理结果的一致性,使得下游任务必须在上游所有任务全部完成之后才能开始进行。
[注意] 这里的
InterMediateBolt
的parallelism_hint
必须 设置为 1,这是为了保证上游 Bolt 的所有的 executor 都能够统一将信号发送到InterMediateBolt
的唯一一个 task。由于InterMediateBolt
只处理单一的信号消息,数据的吞吐量极小,并行度为 1 并不会降低集群的处理性能。
Comments
comments powered by Disqus