Storm DRPC

(翻译自 Storm 官网)分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。下图是 DRPC 的原理示意图。

drpc

下面结合笔者个人的实践经验介绍 Storm DRPC 的使用。


集群配置

  • nimbus: "hd124"
  • DRPC server: "hd181"

Server 端

启动 DRPC server

在安装有 Storm 环境的服务器启动一个 DRPC server,注意,此 server 的基本配置(nimbus,ZooKeeper,……)应该与集群其他机器相同。

nohup storm drpc >> /home/storm/apache-storm-0.9.3/logs/nohup_drpc.out 2>&1 &

配置集群 DRPC server 列表

在集群其他机器上添加配置:

drpc.servers:
  - "hd181"
  - "otherdrpcservers"

hd181/otherdrpcservers 是上面启动的 DRPC server 地址。

定义 DRPC topology

Local mode

使用 LocalDRPC 定义本地 RPC 服务模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();

DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
        .shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3)
        .shuffleGrouping("exclaim");

LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("exclaim", conf, builder.createTopology());

// local mode 测试代码
System.out.println(drpc.execute("exclamation", "aaa"));

Remote mode

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
TopologyBuilder builder = new TopologyBuilder();

DRPCSpout spout = new DRPCSpout("drpcFunc");
builder.setSpout("drpc", spout, 3);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
        .shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 7)
        .shuffleGrouping("exclaim");

Config conf = new Config();
conf.setNumWorkers(2);

StormSubmitter.submitTopology("drpc-test", conf, builder.createTopology());

Client 端

在 DRPC server 启动完成、DPRC Topology 提交到 Storm 集群运行之后,就可以启动 DRPC 客户端。

1
2
3
4
5
6
7
8
public class DRPCClientDemo {

    public static void main(String[] args) throws TException, DRPCExecutionException {
        DRPCClient client = new DRPCClient("hd181", 3772);
        String result = client.execute("drpcFunc", "aaa");
        System.out.println(result);
    }
}

说明

  • 3772 是 Storm 集群中配置的 DRPC 端口(drpc.port: 3772)。
  • 客户端调用的函数名称必须与 Topology 中定义的 DRPCSpout 的 componentId 相同。

如果一切正常客户端会很快收到结果;如果发生错误,就会报错:

Exception in thread "main" DRPCExecutionException(msg:Request timed out)
    at backtype.storm.generated.DistributedRPC$execute_result.read(DistributedRPC.java:904)
    at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:78)
    at backtype.storm.generated.DistributedRPC$Client.recv_execute(DistributedRPC.java:92)
    at backtype.storm.generated.DistributedRPC$Client.execute(DistributedRPC.java:78)
    at backtype.storm.utils.DRPCClient.execute(DRPCClient.java:71)

在服务器间网络连接正常的情况下,这种错误一般是集群或 Topology 的配置错误,例如 DRPC 服务端 IP、端口配置错误,或执行的 function 名称错误等,客户端会等待服务端的响应很长时间(默认配置为 600s,可以在 storm.yaml 中修改),此时需要检查集群的部署是否正确。


Questions

修改配置或重新启动 DRPC server 后有一段时间出现错误

Exclamation-6:aaa203!!!
Exception in thread "main" DRPCExecutionException(msg:Request failed)
    at backtype.storm.generated.DistributedRPC$execute_result.read(DistributedRPC.java:904)
    at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:78)
    at backtype.storm.generated.DistributedRPC$Client.recv_execute(DistributedRPC.java:92)
    at backtype.storm.generated.DistributedRPC$Client.execute(DistributedRPC.java:78)
    at backtype.storm.utils.DRPCClient.execute(DRPCClient.java:71)
    at com.enjoyor.storm.kafka.example.DRPCClientDemo.main(DRPCTest.java:18)

Client 的代码如下

1
2
3
4
5
6
7
    public static void main(String[] args) throws TException, DRPCExecutionException {
        DRPCClient client = new DRPCClient("hd181", 3772);

        for (int i = 0; i < 100; ++i) {
            System.out.println(client.execute("drpcFunc", "aaa"));
        }
    }

即在重启 DRPC server 后会存在一段服务不稳定的时间,这段时间内 DRPC server 会时而正常执行 RPC 请求,时而连接不正常,大约在十分钟后恢复正常。这个问题已经提交给 Storm 社区了,暂时还没有收到正式的回应。




Comments

comments powered by Disqus