ES Pipeline Aggregation(管道聚合)

Stella981
• 阅读 753

微信公众号:[中间件兴趣圈]
关于作者:《RocketMQ技术内幕》作者;

管道聚合处理来自其他聚合而不是文档集的输出,将信息添加到输出树中。

注:关于脚本聚合目前在本文中暂时不会涉及。

主要有如下两种管道聚合方式:

  • parent

  • sibling

下面一一介绍ES定义的管道聚合。

Avg Bucket 聚合

同级管道聚合,它计算同级聚合中指定度量的平均值。同级聚合必须是多桶聚合,针对的是度量聚合(metric Aggregation)。
示例如下:

1{2    "avg_bucket": {3        "buckets_path": "the_sum"  // @14    }5}
  • buckets_path:指定聚合的名称,支持多级嵌套聚合。
    其他参数:

  • gap_policy
    当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。

  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。

  • insert_zeros:默认使用0代替。

  • format
    用于格式化聚合桶的输出(key)。

示例如下:

 1POST /_search 2{ 3  "size": 0, 4  "aggs": { 5    "sales_per_month": {                  // @1 6           "date_histogram": { 7            "field": "date", 8            "interval": "month" 9      },10      "aggs": {                                    // @211        "sales": {12          "sum": {13            "field": "price"14          }15        }16      }17    },18    "avg_monthly_sales": {             // @319      "avg_bucket": {20        "buckets_path": "sales_per_month>sales" 21      }22    }23  }24}

代码@1:首先定义第一级聚合(按月)直方图聚合。
代码@2:定义第二级聚合,在按月聚合的基础上,对每个月的文档求sum。
代码@3:对上面的聚合求平均值。

其返回结果如下:

 1{ 2    ... // 省略 3   "aggregations": { 4      "sales_per_month": { 5         "buckets": [ 6            { 7               "key_as_string": "2015/01/01 00:00:00", 8               "key": 1420070400000, 9               "doc_count": 3,10               "sales": {11                  "value": 550.012               }13            },14            {15               "key_as_string": "2015/02/01 00:00:00",16               "key": 1422748800000,17               "doc_count": 2,18               "sales": {19                  "value": 60.020               }21            }22         ]23      },24      "avg_monthly_sales": {   // 这是对二级聚合的结果再进行一次求平均值聚合。25          "value": 328.3333333333333326      }27   }28}

对应的JAVA示例如下:

 1public static void test_pipeline_avg_buncket_aggregation() { 2        RestHighLevelClient client = EsClient.getClient(); 3        try { 4            SearchRequest searchRequest = new SearchRequest(); 5            searchRequest.indices("aggregations_index02"); 6            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 7            AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg") 8                                                        .field("sellerId") 9                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")10                                                                            .field("num")11                                                        )12                                                  ;13            sourceBuilder.aggregation(aggregationBuild);1415            // 添加 avg bucket pipeline16            sourceBuilder.aggregation(new AvgBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));17            sourceBuilder.size(0);1819            searchRequest.source(sourceBuilder);20            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);21            System.out.println(result);22        } catch (Throwable e) {23            e.printStackTrace();24        } finally {25            EsClient.close(client);26        }27    }

Percentiles Bucket 聚合

同级管道聚合,百分位管道聚合。其JAVA示例如下:

 1public static void test_Percentiles_buncket_aggregation() { 2        RestHighLevelClient client = EsClient.getClient(); 3        try { 4            SearchRequest searchRequest = new SearchRequest(); 5            searchRequest.indices("aggregations_index02"); 6            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 7            AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg") 8                                                        .field("sellerId") 9                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")10                                                                            .field("num")11                                                        )12                                                  ;13            sourceBuilder.aggregation(aggregationBuild);1415            // 添加 avg bucket pipeline16            sourceBuilder.aggregation(new PercentilesBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));17            sourceBuilder.size(0);1819            searchRequest.source(sourceBuilder);20            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);21            System.out.println(result);22        } catch (Throwable e) {23            e.printStackTrace();24        } finally {25            EsClient.close(client);26        }27    }

其返回值如下:

 1{ 2  ...  // 省略其他属性 3    "aggregations":{ 4        "lterms#seller_agg":{ 5            "doc_count_error_upper_bound":0, 6            "sum_other_doc_count":12, 7            "buckets":[ 8                { 9                    "key":45,10                    "doc_count":567,11                    "sum#seller_num_agg":{12                        "value":91113                    }14                },15                {16                    "key":31,17                    "doc_count":324,18                    "sum#seller_num_agg":{19                        "value":35320                    }21                } // 省略其他桶的显示22            ]23        },24        "percentiles_bucket#seller_num_agg_av":{25            "values":{26                "1.0":5,27                "5.0":5,28                "25.0":10,29                "50.0":20,30                "75.0":290,31                "95.0":911,32                "99.0":91133            }34        }35    }36}

Cumulative Sum 聚合

累积管道聚合,就是就是依次将每个管道的sum聚合进行累加。

其语法(restfull)如下:

1{2    "cumulative_sum": {3        "buckets_path": "the_sum"4    }5}

支持的参数说明:

  • buckets_path
    桶聚合名称,作为管道聚合的输入信息。

  • format
    格式化key。

使用示例如下:

 1POST /sales/_search 2{ 3    "size": 0, 4    "aggs" : { 5        "sales_per_month" : { 6            "date_histogram" : { 7                "field" : "date", 8                "interval" : "month" 9            },10            "aggs": {11                "sales": {12                    "sum": {13                        "field": "price"14                    }15                },16                "cumulative_sales": {17                    "cumulative_sum": {18                        "buckets_path": "sales" 19                    }20                }21            }22        }23    }24}

其返回结果如下:

 1{ 2   "took": 11, 3   "timed_out": false, 4   "_shards": ..., 5   "hits": ..., 6   "aggregations": { 7      "sales_per_month": { 8         "buckets": [ 9            {10               "key_as_string": "2015/01/01 00:00:00",11               "key": 1420070400000,12               "doc_count": 3,13               "sales": {14                  "value": 550.015               },16               "cumulative_sales": {17                  "value": 550.018               }19            },20            {21               "key_as_string": "2015/02/01 00:00:00",22               "key": 1422748800000,23               "doc_count": 2,24               "sales": {25                  "value": 60.026               },27               "cumulative_sales": {28                  "value": 610.029               }30            },31            {32               "key_as_string": "2015/03/01 00:00:00",33               "key": 1425168000000,34               "doc_count": 2,35               "sales": {36                  "value": 375.037               },38               "cumulative_sales": {39                  "value": 985.040               }41            }42         ]43      }44   }45}

从结果可知,cumulative_sales的值等于上一个cumulative_sales + 当前桶的sum聚合。

对应的JAVA示例如下:

 1{ 2    "aggregations":{ 3        "date_histogram#createTime_histogram":{ 4            "buckets":{ 5                "2015-12-01 00:00:00":{ 6                    "key_as_string":"2015-12-01 00:00:00", 7                    "key":1448928000000, 8                    "doc_count":6, 9                    "sum#seller_num_agg":{10                        "value":1611                    },12                    "simple_value#Cumulative_Seller_num_agg":{13                        "value":1614                    }15                },16                "2016-01-01 00:00:00":{17                    "key_as_string":"2016-03-01 00:00:00",18                    "key":1456790400000,19                    "doc_count":10,20                    "sum#seller_num_agg":{21                        "value":1122                    },23                    "simple_value#Cumulative_Seller_num_agg":{24                        "value":3125                    }26                }27                // ... 忽略28            }29        }30    }31}

Bucket Sort 聚合

一种父管道聚合,它对其父多桶聚合的桶进行排序。并可以指定多个排序字段。每个bucket可以根据它的_key、_count或子聚合进行排序。此外,可以设置from和size的参数,以便截断结果桶。

使用语法如下:

 1{ 2    "bucket_sort": { 3        "sort": [ 4            {"sort_field_1": {"order": "asc"}}, 5            {"sort_field_2": {"order": "desc"}}, 6            "sort_field_3" 7        ], 8        "from": 1, 9        "size": 310    }11}

支持的参数说明如下:

  • sort
    定义排序结构。

  • from
    用与对父聚合的桶进行截取,该值之前的所有桶将忽略,也就是不参与排序,默认为0。

  • size
    返回的桶数。默认为父聚合的所有桶。

  • gap_policy
    当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。

  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。

  • insert_zeros:默认使用0代替。

官方示例如下:

 1POST /sales/_search 2{ 3    "size": 0, 4    "aggs" : { 5        "sales_per_month" : { 6            "date_histogram" : { 7                "field" : "date", 8                "interval" : "month" 9            },10            "aggs": {11                "total_sales": {12                    "sum": {13                        "field": "price"14                    }15                },16                "sales_bucket_sort": {17                    "bucket_sort": {18                        "sort": [19                          {"total_sales": {"order": "desc"}}20                        ],21                        "size": 322                    }23                }24            }25        }26    }27}

对应的JAVA示例如下:

 1public static void test_bucket_sort_Aggregation() { 2        RestHighLevelClient client = EsClient.getClient(); 3        try { 4 5            //构建日期直方图聚合  时间间隔,示例中按月统计 6            DateHistogramInterval interval = new DateHistogramInterval("1M");  7            SearchRequest searchRequest = new SearchRequest(); 8            searchRequest.indices("aggregations_index02"); 9            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();10            AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram")11                                                                        .field("createTime")12                                                                        .dateHistogramInterval(interval)13                                                                        .keyed(true)14                                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")15                                                                                .field("num")16                                                                        )17                                                                        .subAggregation(new  BucketSortPipelineAggregationBuilder("seller_num_agg_sort", Arrays.asList(18                                                                                new FieldSortBuilder("seller_num_agg").order(SortOrder.ASC)))19                                                                                .from(0)20                                                                                .size(3))21                                                                        //  BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts)22                                                                        .subAggregation(new CumulativeSumPipelineAggregationBuilder("Cumulative_Seller_num_agg", "seller_num_agg"))23                                                                    //  .format("yyyy-MM-dd") // 对key的格式化24                                                  ;25            sourceBuilder.aggregation(aggregationBuild);26            sourceBuilder.size(0);27            sourceBuilder.query(28                    QueryBuilders.termQuery("sellerId", 24)29            );30            searchRequest.source(sourceBuilder);31            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);32            System.out.println(result);33        } catch (Throwable e) {34            e.printStackTrace();35        } finally {36            EsClient.close(client);37        }38    }

返回值:

 1{ 2    "aggregations":{ 3        "date_histogram#createTime_histogram":{ 4            "buckets":{ 5                "2016-04-01 00:00:00":{ 6                    "key_as_string":"2016-04-01 00:00:00", 7                    "key":1459468800000, 8                    "doc_count":2, 9                    "sum#seller_num_agg":{10                        "value":211                    },12                    "simple_value#Cumulative_Seller_num_agg":{13                        "value":214                    }15                },16                "2017-05-01 00:00:00":{17                    "key_as_string":"2017-05-01 00:00:00",18                    "key":1493596800000,19                    "doc_count":3,20                    "sum#seller_num_agg":{21                        "value":322                    },23                    "simple_value#Cumulative_Seller_num_agg":{24                        "value":525                    }26                },27                "2017-02-01 00:00:00":{28                    "key_as_string":"2017-02-01 00:00:00",29                    "key":1485907200000,30                    "doc_count":4,31                    "sum#seller_num_agg":{32                        "value":433                    },34                    "simple_value#Cumulative_Seller_num_agg":{35                        "value":936                    }37                }38            }39        }40    }

Max Bucket 聚合

与 avg类似。

Min Bucket 聚合

与 avg类似。

Sum Bucket 聚合

与 avg类似。

Stats Bucket 聚合

与 avg类似。


更多文章请关注公众号:

ES Pipeline Aggregation(管道聚合)


一波广告来袭:作者新书《RocketMQ技术内幕》出版上市啦。

ES Pipeline Aggregation(管道聚合)

《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。

本文分享自微信公众号 - 中间件兴趣圈(dingwpmz_zjj)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
ES聚合使用
一.聚合查询分类:聚合方式说明MetricAggregation(指标聚合)一些数学计算,可以对文档字段统计分析BucketAggregation(桶聚合)一些满足特定条件的文档的集合PipelineAggregation(管道聚合)对其他的聚合结果进行二次聚合MetrixAggregation
Stella981 Stella981
3年前
Es Bucket聚合(桶聚合) Terms Aggregation与Significant Terms Aggregation
微信公众号:\中间件兴趣圈\关于作者:《RocketMQ技术内幕》作者;本章将介绍elasticsearch最重要的桶聚合termsaggregation。TermsAggregation多值聚合,根据库中的文档动态构建桶。基于词根的聚合,如果聚合字段是text的话,会对一个一个的词根进行聚合,通常不会在te
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这