Trident学习笔记(一)

Easter79
• 阅读 669

1. Trident入门

Trident

-------------------

 三叉戟

 storm高级抽象,支持有状态流处理;

 好处是确保消费被处理一次;

 以小批次方式处理输入流,得到精准一次性处理 ;

 不再使用bolt,使用functions、aggreates、filters以及states。

 Trident Tuple: trident top的数据模型,trident处理数据的单元;

        每个tuple有预定义的字段列表构成,字段类型可以是byte;

        character,integer,long,float,double,Boolean or byte array。

 Trident functions: 包含修改tuple的业务逻辑,输入的是tuple的字段,输出多个tuple。

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

/**
 * 求和函数
 */
public class SumFunction extends BaseFunction { @Override public void execute(TridentTuple input, TridentCollector collector) { Integer num1 = input.getInteger(0); Integer num2 = input.getInteger(1); int sum = num1 + num2; collector.emit(new Values(sum)); } }

如果tuple有a, b, c, d四个field,只有a和b作为输入传给function,functions会生成新的sum字段,

sum字段和输入的元祖进行合并,生成一个完成tuple,因此,新的tuple的总和字段个数是a, b, c, d, sum。

Trident Filter

--------------------

  1. 描述

  获取字段集合作为输入,输出boolean,如果反悔true,tuple在流中保留,否则删除,

  a, b, c, d, sum是元祖的字段,sum作为输入传递给filter,判断sum是否为偶数,

  如果是偶数,tuple(a, b, c, d, sum)保留,否则tuple删除。

  2. 代码

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * 校验是否是偶数的过滤器
 */
public class CheckEvenFilter extends BaseFilter {

    @Override
    public boolean isKeep(TridentTuple input) {
        Integer sum = input.getInteger(0); if (sum % 2 == 0) { return true; } return false; } }

Trident projections

--------------------

  1. 描述

   投影操作中,trident值保留在投影中制定的字段,

   x, y, z --> projection(x) --> x

  2. 调用投影的方式

   mystream.project(new fields("x"));

Trident学习笔记(一)

Trident学习笔记(一)

 Trident学习笔记(一)

Trident学习笔记(一)

写一个topology

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

public class PrintFunction extends BaseFunction {

    @Override
    public void execute(TridentTuple input, TridentCollector collector) {
        Integer sum = input.getInteger(0);
        System.out.println(this.getCLass.getSimpleName + ": " + sum);
    }
    
}

import com.google.common.collect.ImmutableList;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FeederBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b", "c", "d"));

        // 创建流
        Stream stream = topology.newStream("spout", testSpout);
        stream.shuffle().each(new Fields("a", "b"), new SumFunction(), new Fields("sum")).parallelismHint(1)
                .shuffle().each(new Fields("sum"), new CheckEvenFilter()).parallelismHint(1)
                .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("xxx")).parallelismHint(1);

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo", new Config(), topology.build());

        // 测试数据
        testSpout.feed(ImmutableList.of(new Values(1, 2, 3, 4)));
        testSpout.feed(ImmutableList.of(new Values(2, 3, 4, 5)));
        testSpout.feed(ImmutableList.of(new Values(3, 4, 5, 6)));
        testSpout.feed(ImmutableList.of(new Values(4, 5, 6, 7)));
    }

}

输出结果

SumFunction:1, 2
CheckEvenFilter:3
PrintFunction: 3
SumFunction:2, 3
CheckEvenFilter:5
PrintFunction: 5
SumFunction:3, 4
CheckEvenFilter:7
PrintFunction: 7
SumFunction:4, 5
CheckEvenFilter:9
PrintFunction: 9

加入一个求平均数的函数

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

/**
 * 求平均值方法
 */
public class AverageFunction extends BaseFunction {

    @Override
    public void execute(TridentTuple input, TridentCollector collector) {
        int a = input.getIntegerByField("a");
        int b = input.getIntegerByField("b");
        int c = input.getIntegerByField("c");
        int d = input.getIntegerByField("d");
        int sum = input.getIntegerByField("sum");
        float avg = (float) ((a+b+c+d+sum) / 5.0);
        System.out.println(this.getClass().getSimpleName() + ": avg = " + avg);
    }

}

import com.google.common.collect.ImmutableList;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FeederBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b", "c", "d"));

        // 创建流
        Stream stream = topology.newStream("spout", testSpout);
        stream.shuffle().each(new Fields("a", "b"), new SumFunction(), new Fields("sum")).parallelismHint(1)
                .shuffle().each(new Fields("sum"), new CheckEvenFilter()).parallelismHint(1)
                .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("res")).parallelismHint(1)
                .shuffle().each(new Fields("a", "b", "c", "d", "sum"), new AverageFunction(), new Fields("avg")).parallelismHint(1);

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo", new Config(), topology.build());

        // 测试数据
        testSpout.feed(ImmutableList.of(new Values(1, 2, 3, 4)));
        testSpout.feed(ImmutableList.of(new Values(2, 3, 4, 5)));
        testSpout.feed(ImmutableList.of(new Values(3, 4, 5, 6)));
        testSpout.feed(ImmutableList.of(new Values(4, 5, 6, 7)));
    }

}

2. Trident聚合函数

Trident学习笔记(一)

Trident学习笔记(一)

Trident学习笔记(一)

Trident学习笔记(一)

Trident学习笔记(一)

 分区聚合

import com.google.common.collect.ImmutableList;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.testing.FeederBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TridentTopologyApp2 {

    public static void main(String[] args) {
        // 创建topology
        TridentTopology topology = new TridentTopology();

        // 创建spout
        FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b"));

        // 创建流
        Stream stream = topology.newStream("testSpout", testSpout);
        stream.shuffle().each(new Fields("a", "b"), new MyFilter1()).parallelismHint(1)
                .global().each(new Fields("a", "b"), new MyFilter2()).parallelismHint(1)
                .partitionBy(new Fields("a"))
                //.each(new Fields("a", "b"), new MyFunction1(), new Fields("none")).parallelismHint(1)
                .partitionAggregate(new Fields("a"), new MyCount(), new Fields("count"))
                .each(new Fields("count"), new MyPrintFunction1(), new Fields("xxx")).parallelismHint(1);

        // 本地提交
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TridentDemo2", new Config(), topology.build());

        // 测试数据
        testSpout.feed(ImmutableList.of(new Values(1, 2)));
        testSpout.feed(ImmutableList.of(new Values(2, 3)));
        testSpout.feed(ImmutableList.of(new Values(2, 4)));
        testSpout.feed(ImmutableList.of(new Values(3, 5)));
    }

}

Trident学习笔记(一)

批次聚合

 Trident学习笔记(一)

3. 自定义聚合函数-Sum-SumAsAggregator

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
Oracle一张表中实现对一个字段不同值和总值的统计(多个count)
需求:统计WAIT\_ORDER表中的工单总数、未处理工单总数、已完成工单总数、未完成工单总数。表结构:为了举例子方便,WAIT\_ORDER表只有两个字段,分别是ID、STATUS,其中STATUS为工单的状态。1表示未处理,2表示已完成,3表示未完成总数。 SQL:  1.SELECT   2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Easter79
Easter79
Lv1
今生可爱与温柔,每一样都不能少。
文章
2.8k
粉丝
6
获赞
1.2k