业务场景

最近在开发过程中遇到了一个比较复杂的场景,需要实现如下的业务逻辑:

  • 有多个连续的 Bolt,例如 BoltA-->BoltB-->BoltC-->BoltD...,每一级 Bolt 均按照 fieldGrouping 的方式进行处理;

  • BoltA 对数据流进行定时处理,BoltA 处理后数据发送到 BoltB;

  • BlotA 一批数据发送完成之后发送 “发送完成” 信号给 BoltB;

  • BoltB 接收 BoltA 所发送的数据并存储在内存(Map)中,当收到 BoltA 发送的 “发送完成” 信号后,开始处理内存中的数据,并将结果依次发送到 BoltC,同样,在内存中数据处理完成之后发送 “发送完成” 信号给 BoltC;

  • BoltC, BoltD, ... 采用与 BoltB 相同的批处理方式。


技术难点

这个场景看上去挺简单,只需要分别实现定时处理和批处理的逻辑就可以了,实际上有很多坑:

  • 由于每一级 Bolt 的数据处理算法逻辑较复杂,不能直接使用 Trident;

  • 在生产环境中集群的并发度 >> 1,每一个 Bolt 都有多个工作线程,并且分布在不同的 worker 节点上,无法确保 “发送完成” 信号能够恰好发送到下一级 Bolt 的所有的 task 上;

  • 同理,由于集群的分布式,每一级 Bolt 的缓存数据同样分布在不同的 worker 上,而且由于算法需要,每一级 Bolt 实际处理的域均有所不同,不能简单使用 fieldGrouping 方式进行汇总处理;而且也不能使用 allGrouping 的方式,否则会造成数据的重复处理;

  • 可以使用 globalGrouping 的方式依次汇总每一级 Bolt 的数据流,但是这样就严重浪费了 Strom 的并发性能。


设计实现

这个问题着实困扰了我好几天的时间,期间尝试了多种方式均告失败:

  1. 直接串联 BoltA 与 BoltB,但是 BoltA 的 “发送完成” 信号始终无法保证发送到 BoltB 的所有 executor(task) 上;

  2. 在两个连续 Bolt 中间设计一个中介 Bolt,用于接收数据并发送信号,结果发现这是与 1 相同的方式;

  3. 听说 DRPC 的组件 CoordinatedBolt 具有批处理的功能,使用 CoordinatedBolt 来包装 Bolt,结果与普通的串联 Bolt 相同;

这样的结果确实够打击人的,唯一的收获是了解到新版的 Storm 支持通过 tick tuple 来实现定时功能,这样 BoltA 的定时处理问题就能解决了(比我一开始时设计的 TimingSpout 效果要好得多,虽然还是达不到普通的 Java 程序时间控制的精确程度,但总是聊胜于无)。

郁闷了好一段时间之后,决定还是从那个号称能够实现完全批处理功能的 CoordinatedBolt 入手。在了解了 CoordinatedBolt 的实现原理后 (Twitter Storm源代码分析之CoordinatedBolt),隐隐约约感觉到这应该是一个设计方向。于是尝试在 BoltA 中同时使用 emitemitDirect 来发送消息,其中,前者仍然发送普通数据,而后者用于发送信号,但是实际运行时发现 Storm 并不支持在同一个 Bolt 中同时采用普通的 emitemitDirect 两种数据传输方式,topology 无法运行,这次尝试再告失败。

Let there be light: and there was light.

既然在同一个 Bolt 中同时发送两种数据无法成功,那么分成两个 Bolt 分别发送可不可行呢?


解决方案

Yes, it works.

在两级 Bolt 中加入一个并联(不是之前的串联方式)的中介 Bolt —— InterMediateBolt,这个 Bolt 只负责发送 “发送完成” 信号,结构如下:

                 Stream1
BoltA --------------------------->BoltB
  |                                 ^
  |Stream2                          |
  +--------> InterMediateBolt ------+

Strem1 代表普通的数据流,Stream2 代表 BoltA 发送的批处理信号。

InterMediateBolt 的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class InterMediateBolt extends BaseRichBolt {
    private List<Integer> _countOutTasks;
    private List<String> targetComponents;
    private OutputCollector _collector;
    private int sourceNumber;
    private int count = 0;

    public InterMediateBolt(int number, String... targetComponentIds) {
        this._countOutTasks = new ArrayList<Integer>();
        this.targetComponents = new ArrayList<String>();
        for (String id : targetComponentIds) {
            this.targetComponents.add(id);
        }
        this.sourceNumber = number;
    }

    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
            TopologyContext context, OutputCollector collector) {
        for (String component : this.targetComponents) {
            for (Integer task : context.getComponentTasks(component)) {
                _countOutTasks.add(task);
            }
        }
        this._collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        boolean signal = input.getBoolean(0);
        if (++count == sourceNumber) {
            System.out.println("InterMediateBolt received: ++++++++" + signal);
            for (int task : _countOutTasks) {
                _collector.emitDirect(task, input, new Values(true));
            }
            // 信号消息需要确保能够发送成功
            _collector.ack(input);
            count = 0;
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("signal"));
    }   
}


这时上游的 Bolt 需要发送两个数据流(这里以 word-count 为例,代码中省略了根据 tick tuple 判断定时信号的逻辑):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void execute(Tuple input) {
    String word = input.getString(0);
    int count = 0;
    if (words.containsKey(word)) {
        count = words.get(word);
    }
    words.put(word, ++count);

    // 达到指定条件时批量发送缓存数据
    if (condition) {
        Iterator<Entry<String, Integer>> it = words.entrySet()
                .iterator();
        while (it.hasNext()) {
            Entry<String, Integer> entry = it.next();
            _collector.emit("stream1", input, new Values(false, entry.getKey(),
                    entry.getValue()));
        }
        // 缓存数据发送结束后发送“发送完成”信号
        _collector.emit("stream2", input, new Values(true));
        words.clear();
    }
}

其中,stream1 仍是普通的数据流,stream2 则是信号数据流。


这样,下游的 Bolt 就可以很容易根据信号对数据流进行处理了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public void execute(Tuple input) {
    boolean signal = input.getBoolean(0);
    if (signal) {
        System.out.println("Handle-task-" + taskId
                + ": received ***signal***");
    } else {
        String word = input.getString(1);
        int count = input.getInteger(2);
        System.out.println("Handle-task-" + taskId + ": received "
                + word + "-" + count);
    }
    _collector.ack(input);
}


在 topology 中可以这样定义各个 Bolt:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
int an = 3;
int bn = 5;
String boltA = "boltA";
String boltB = "boltB";
String intermediate = "intermediate";

builder.setBolt(boltA, new BoltA(), an)
        .fieldsGrouping("words", new Fields("word"));
builder.setBolt(intermediate, new InterMediateBolt(an, boltB), 1)
        .shuffleGrouping("boltA", "stream2");
builder.setBolt(boltB, new HandleBolt(), bn)
        .fieldsGrouping(boltA, "stream1", new Fields("word"))
        .directGrouping(intermediate);

说明

  1. 为了使得 InterMediateBolt 能够获取下游 Bolt 的各个 taskId,这里在 InterMediateBolt 的构造器中传入了下游 Bolt 的 componentId 参数,根据 componentId 就可以在初始化阶段从 TopologyContext 中获取到所需要的 taskId。

  2. InterMediateBolt 的构造器中传入了上游 Bolt 的并发度(parallelism_hint)参数,是为了确保上游任务处理结果的一致性,使得下游任务必须在上游所有任务全部完成之后才能开始进行。


[注意] 这里的 InterMediateBoltparallelism_hint 必须 设置为 1,这是为了保证上游 Bolt 的所有的 executor 都能够统一将信号发送到 InterMediateBolt 的唯一一个 task。由于 InterMediateBolt 只处理单一的信号消息,数据的吞吐量极小,并行度为 1 并不会降低集群的处理性能。




Comments

comments powered by Disqus