Storm DRPC
(翻译自 Storm 官网)分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。
DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。下图是 DRPC 的原理示意图。
下面结合笔者个人的实践经验介绍 Storm DRPC 的使用。
集群配置
- nimbus: "hd124"
- DRPC server: "hd181"
Server 端
启动 DRPC server
在安装有 Storm 环境的服务器启动一个 DRPC server,注意,此 server 的基本配置(nimbus,ZooKeeper,……)应该与集群其他机器相同。
nohup storm drpc >> /home/storm/apache-storm-0.9.3/logs/nohup_drpc.out 2>&1 &
配置集群 DRPC server 列表
在集群其他机器上添加配置:
drpc.servers:
- "hd181"
- "otherdrpcservers"
hd181/otherdrpcservers 是上面启动的 DRPC server 地址。
定义 DRPC topology
Local mode
使用 LocalDRPC
定义本地 RPC 服务模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3)
.shuffleGrouping("exclaim");
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("exclaim", conf, builder.createTopology());
// local mode 测试代码
System.out.println(drpc.execute("exclamation", "aaa"));
|
Remote mode
1 2 3 4 5 6 7 8 9 10 11 12 13 | TopologyBuilder builder = new TopologyBuilder();
DRPCSpout spout = new DRPCSpout("drpcFunc");
builder.setSpout("drpc", spout, 3);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 7)
.shuffleGrouping("exclaim");
Config conf = new Config();
conf.setNumWorkers(2);
StormSubmitter.submitTopology("drpc-test", conf, builder.createTopology());
|
Client 端
在 DRPC server 启动完成、DPRC Topology 提交到 Storm 集群运行之后,就可以启动 DRPC 客户端。
1 2 3 4 5 6 7 8 | public class DRPCClientDemo {
public static void main(String[] args) throws TException, DRPCExecutionException {
DRPCClient client = new DRPCClient("hd181", 3772);
String result = client.execute("drpcFunc", "aaa");
System.out.println(result);
}
}
|
说明
- 3772 是 Storm 集群中配置的 DRPC 端口(drpc.port: 3772)。
- 客户端调用的函数名称必须与 Topology 中定义的
DRPCSpout
的 componentId 相同。
如果一切正常客户端会很快收到结果;如果发生错误,就会报错:
Exception in thread "main" DRPCExecutionException(msg:Request timed out)
at backtype.storm.generated.DistributedRPC$execute_result.read(DistributedRPC.java:904)
at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:78)
at backtype.storm.generated.DistributedRPC$Client.recv_execute(DistributedRPC.java:92)
at backtype.storm.generated.DistributedRPC$Client.execute(DistributedRPC.java:78)
at backtype.storm.utils.DRPCClient.execute(DRPCClient.java:71)
在服务器间网络连接正常的情况下,这种错误一般是集群或 Topology 的配置错误,例如 DRPC 服务端 IP、端口配置错误,或执行的 function 名称错误等,客户端会等待服务端的响应很长时间(默认配置为 600s,可以在 storm.yaml 中修改),此时需要检查集群的部署是否正确。
Questions
修改配置或重新启动 DRPC server 后有一段时间出现错误
Exclamation-6:aaa203!!!
Exception in thread "main" DRPCExecutionException(msg:Request failed)
at backtype.storm.generated.DistributedRPC$execute_result.read(DistributedRPC.java:904)
at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:78)
at backtype.storm.generated.DistributedRPC$Client.recv_execute(DistributedRPC.java:92)
at backtype.storm.generated.DistributedRPC$Client.execute(DistributedRPC.java:78)
at backtype.storm.utils.DRPCClient.execute(DRPCClient.java:71)
at com.enjoyor.storm.kafka.example.DRPCClientDemo.main(DRPCTest.java:18)
Client 的代码如下
1 2 3 4 5 6 7 | public static void main(String[] args) throws TException, DRPCExecutionException {
DRPCClient client = new DRPCClient("hd181", 3772);
for (int i = 0; i < 100; ++i) {
System.out.println(client.execute("drpcFunc", "aaa"));
}
}
|
即在重启 DRPC server 后会存在一段服务不稳定的时间,这段时间内 DRPC server 会时而正常执行 RPC 请求,时而连接不正常,大约在十分钟后恢复正常。这个问题已经提交给 Storm 社区了,暂时还没有收到正式的回应。
Comments
comments powered by Disqus