storm drpc实例

Easter79
• 阅读 656

本文主要演示一下storm drpc实例

配置

version: '2'
services:
    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
        ports:
            - 6700:6700
            - 6701:6701
            - 6702:6702
            - 6703:6703
            - 8000:8000
    drpc:
        image: storm
        container_name: drpc
        command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - supervisor
            - zookeeper
        links:
            - nimbus
            - supervisor
            - zookeeper
        restart: always
        ports:
            - 3772:3772
            - 3773:3773
            - 3774:3774
  • 这里对supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好让worker通过drpc.invocations.port去访问drpc节点
  • 对于drpc服务,则暴露drpc.port(好让外部的DRPCClient访问)、drpc.invocations.port(让worker访问)

TridentTopology

    @Test
    public void testDeployDRPCStateQuery() throws InterruptedException, TException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        spout.setCycle(true);
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        //NOTE transforms a Stream into a TridentState object
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(6);

        topology.newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        StormTopology stormTopology = topology.build();

        //远程提交 mvn clean package -Dmaven.test.skip=true
        //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
        System.setProperty("storm.jar",TOPOLOGY_JAR);

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181

        StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
    }
  • 这里newStream创建了一个TridentState,然后newDRPCStream创建了一个DRPCStream,其stateQuery指定为前面创建的TridentState
  • 由于TridentState把结果存储到了MemoryMapState,因而这里的DRPCStream通过drpc进行stateQuery

DRPCClient

    @Test
    public void testLaunchDrpcClient() throws TException {
        Config conf = new Config();
        //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针
        conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
        DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
        System.out.println(client.execute("words", "cat dog the man"));
    }
  • 注意这里的配置项不能少,否则会引发空指针
  • Config.DRPC_THRIFT_TRANSPORT_PLUGIN这里使用的是SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
  • 由于使用了SimpleTransportPlugin.class,因而这里要配置Config.DRPC_MAX_BUFFER_SIZE
  • DRPCClient配置了drpc的地址及port
  • client.execute这里要传入newDRPCStream指定的function名称

小结

  • 使用drpc的时候,需要通过storm drpc启动drpc server服务节点,另外要暴露两个端口,一个drpc.port是供外部DRPCClient调用,一个drpc.invocations.port是给worker来访问;drpc.http.port端口是暴露给http协议调用的(DRPCClient使用的是thrift协议调用)
  • supervisor要配置drpc.servers、drpc.invocations.port,好让worker去访问到drpc server
  • DRPCClient使用drpc.port指定的端口来访问,另外client.execute这里要传入newDRPCStream指定的function名称

doc

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k