微信公众号:[中间件兴趣圈]
关于作者:《RocketMQ技术内幕》作者;
管道聚合处理来自其他聚合而不是文档集的输出,将信息添加到输出树中。
注:关于脚本聚合目前在本文中暂时不会涉及。
主要有如下两种管道聚合方式:
parent
sibling
下面一一介绍ES定义的管道聚合。
Avg Bucket 聚合
同级管道聚合,它计算同级聚合中指定度量的平均值。同级聚合必须是多桶聚合,针对的是度量聚合(metric Aggregation)。
示例如下:
1{2 "avg_bucket": {3 "buckets_path": "the_sum" // @14 }5}
buckets_path:指定聚合的名称,支持多级嵌套聚合。
其他参数:gap_policy
当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
insert_zeros:默认使用0代替。
format
用于格式化聚合桶的输出(key)。
示例如下:
1POST /_search 2{ 3 "size": 0, 4 "aggs": { 5 "sales_per_month": { // @1 6 "date_histogram": { 7 "field": "date", 8 "interval": "month" 9 },10 "aggs": { // @211 "sales": {12 "sum": {13 "field": "price"14 }15 }16 }17 },18 "avg_monthly_sales": { // @319 "avg_bucket": {20 "buckets_path": "sales_per_month>sales" 21 }22 }23 }24}
代码@1:首先定义第一级聚合(按月)直方图聚合。
代码@2:定义第二级聚合,在按月聚合的基础上,对每个月的文档求sum。
代码@3:对上面的聚合求平均值。
其返回结果如下:
1{ 2 ... // 省略 3 "aggregations": { 4 "sales_per_month": { 5 "buckets": [ 6 { 7 "key_as_string": "2015/01/01 00:00:00", 8 "key": 1420070400000, 9 "doc_count": 3,10 "sales": {11 "value": 550.012 }13 },14 {15 "key_as_string": "2015/02/01 00:00:00",16 "key": 1422748800000,17 "doc_count": 2,18 "sales": {19 "value": 60.020 }21 }22 ]23 },24 "avg_monthly_sales": { // 这是对二级聚合的结果再进行一次求平均值聚合。25 "value": 328.3333333333333326 }27 }28}
对应的JAVA示例如下:
1public static void test_pipeline_avg_buncket_aggregation() { 2 RestHighLevelClient client = EsClient.getClient(); 3 try { 4 SearchRequest searchRequest = new SearchRequest(); 5 searchRequest.indices("aggregations_index02"); 6 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 7 AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg") 8 .field("sellerId") 9 .subAggregation(AggregationBuilders.sum("seller_num_agg")10 .field("num")11 )12 ;13 sourceBuilder.aggregation(aggregationBuild);1415 // 添加 avg bucket pipeline16 sourceBuilder.aggregation(new AvgBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));17 sourceBuilder.size(0);1819 searchRequest.source(sourceBuilder);20 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);21 System.out.println(result);22 } catch (Throwable e) {23 e.printStackTrace();24 } finally {25 EsClient.close(client);26 }27 }
Percentiles Bucket 聚合
同级管道聚合,百分位管道聚合。其JAVA示例如下:
1public static void test_Percentiles_buncket_aggregation() { 2 RestHighLevelClient client = EsClient.getClient(); 3 try { 4 SearchRequest searchRequest = new SearchRequest(); 5 searchRequest.indices("aggregations_index02"); 6 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 7 AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg") 8 .field("sellerId") 9 .subAggregation(AggregationBuilders.sum("seller_num_agg")10 .field("num")11 )12 ;13 sourceBuilder.aggregation(aggregationBuild);1415 // 添加 avg bucket pipeline16 sourceBuilder.aggregation(new PercentilesBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));17 sourceBuilder.size(0);1819 searchRequest.source(sourceBuilder);20 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);21 System.out.println(result);22 } catch (Throwable e) {23 e.printStackTrace();24 } finally {25 EsClient.close(client);26 }27 }
其返回值如下:
1{ 2 ... // 省略其他属性 3 "aggregations":{ 4 "lterms#seller_agg":{ 5 "doc_count_error_upper_bound":0, 6 "sum_other_doc_count":12, 7 "buckets":[ 8 { 9 "key":45,10 "doc_count":567,11 "sum#seller_num_agg":{12 "value":91113 }14 },15 {16 "key":31,17 "doc_count":324,18 "sum#seller_num_agg":{19 "value":35320 }21 } // 省略其他桶的显示22 ]23 },24 "percentiles_bucket#seller_num_agg_av":{25 "values":{26 "1.0":5,27 "5.0":5,28 "25.0":10,29 "50.0":20,30 "75.0":290,31 "95.0":911,32 "99.0":91133 }34 }35 }36}
Cumulative Sum 聚合
累积管道聚合,就是就是依次将每个管道的sum聚合进行累加。
其语法(restfull)如下:
1{2 "cumulative_sum": {3 "buckets_path": "the_sum"4 }5}
支持的参数说明:
buckets_path
桶聚合名称,作为管道聚合的输入信息。format
格式化key。
使用示例如下:
1POST /sales/_search 2{ 3 "size": 0, 4 "aggs" : { 5 "sales_per_month" : { 6 "date_histogram" : { 7 "field" : "date", 8 "interval" : "month" 9 },10 "aggs": {11 "sales": {12 "sum": {13 "field": "price"14 }15 },16 "cumulative_sales": {17 "cumulative_sum": {18 "buckets_path": "sales" 19 }20 }21 }22 }23 }24}
其返回结果如下:
1{ 2 "took": 11, 3 "timed_out": false, 4 "_shards": ..., 5 "hits": ..., 6 "aggregations": { 7 "sales_per_month": { 8 "buckets": [ 9 {10 "key_as_string": "2015/01/01 00:00:00",11 "key": 1420070400000,12 "doc_count": 3,13 "sales": {14 "value": 550.015 },16 "cumulative_sales": {17 "value": 550.018 }19 },20 {21 "key_as_string": "2015/02/01 00:00:00",22 "key": 1422748800000,23 "doc_count": 2,24 "sales": {25 "value": 60.026 },27 "cumulative_sales": {28 "value": 610.029 }30 },31 {32 "key_as_string": "2015/03/01 00:00:00",33 "key": 1425168000000,34 "doc_count": 2,35 "sales": {36 "value": 375.037 },38 "cumulative_sales": {39 "value": 985.040 }41 }42 ]43 }44 }45}
从结果可知,cumulative_sales的值等于上一个cumulative_sales + 当前桶的sum聚合。
对应的JAVA示例如下:
1{ 2 "aggregations":{ 3 "date_histogram#createTime_histogram":{ 4 "buckets":{ 5 "2015-12-01 00:00:00":{ 6 "key_as_string":"2015-12-01 00:00:00", 7 "key":1448928000000, 8 "doc_count":6, 9 "sum#seller_num_agg":{10 "value":1611 },12 "simple_value#Cumulative_Seller_num_agg":{13 "value":1614 }15 },16 "2016-01-01 00:00:00":{17 "key_as_string":"2016-03-01 00:00:00",18 "key":1456790400000,19 "doc_count":10,20 "sum#seller_num_agg":{21 "value":1122 },23 "simple_value#Cumulative_Seller_num_agg":{24 "value":3125 }26 }27 // ... 忽略28 }29 }30 }31}
Bucket Sort 聚合
一种父管道聚合,它对其父多桶聚合的桶进行排序。并可以指定多个排序字段。每个bucket可以根据它的_key、_count或子聚合进行排序。此外,可以设置from和size的参数,以便截断结果桶。
使用语法如下:
1{ 2 "bucket_sort": { 3 "sort": [ 4 {"sort_field_1": {"order": "asc"}}, 5 {"sort_field_2": {"order": "desc"}}, 6 "sort_field_3" 7 ], 8 "from": 1, 9 "size": 310 }11}
支持的参数说明如下:
sort
定义排序结构。from
用与对父聚合的桶进行截取,该值之前的所有桶将忽略,也就是不参与排序,默认为0。size
返回的桶数。默认为父聚合的所有桶。gap_policy
当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
insert_zeros:默认使用0代替。
官方示例如下:
1POST /sales/_search 2{ 3 "size": 0, 4 "aggs" : { 5 "sales_per_month" : { 6 "date_histogram" : { 7 "field" : "date", 8 "interval" : "month" 9 },10 "aggs": {11 "total_sales": {12 "sum": {13 "field": "price"14 }15 },16 "sales_bucket_sort": {17 "bucket_sort": {18 "sort": [19 {"total_sales": {"order": "desc"}}20 ],21 "size": 322 }23 }24 }25 }26 }27}
对应的JAVA示例如下:
1public static void test_bucket_sort_Aggregation() { 2 RestHighLevelClient client = EsClient.getClient(); 3 try { 4 5 //构建日期直方图聚合 时间间隔,示例中按月统计 6 DateHistogramInterval interval = new DateHistogramInterval("1M"); 7 SearchRequest searchRequest = new SearchRequest(); 8 searchRequest.indices("aggregations_index02"); 9 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();10 AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram")11 .field("createTime")12 .dateHistogramInterval(interval)13 .keyed(true)14 .subAggregation(AggregationBuilders.sum("seller_num_agg")15 .field("num")16 )17 .subAggregation(new BucketSortPipelineAggregationBuilder("seller_num_agg_sort", Arrays.asList(18 new FieldSortBuilder("seller_num_agg").order(SortOrder.ASC)))19 .from(0)20 .size(3))21 // BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts)22 .subAggregation(new CumulativeSumPipelineAggregationBuilder("Cumulative_Seller_num_agg", "seller_num_agg"))23 // .format("yyyy-MM-dd") // 对key的格式化24 ;25 sourceBuilder.aggregation(aggregationBuild);26 sourceBuilder.size(0);27 sourceBuilder.query(28 QueryBuilders.termQuery("sellerId", 24)29 );30 searchRequest.source(sourceBuilder);31 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);32 System.out.println(result);33 } catch (Throwable e) {34 e.printStackTrace();35 } finally {36 EsClient.close(client);37 }38 }
返回值:
1{ 2 "aggregations":{ 3 "date_histogram#createTime_histogram":{ 4 "buckets":{ 5 "2016-04-01 00:00:00":{ 6 "key_as_string":"2016-04-01 00:00:00", 7 "key":1459468800000, 8 "doc_count":2, 9 "sum#seller_num_agg":{10 "value":211 },12 "simple_value#Cumulative_Seller_num_agg":{13 "value":214 }15 },16 "2017-05-01 00:00:00":{17 "key_as_string":"2017-05-01 00:00:00",18 "key":1493596800000,19 "doc_count":3,20 "sum#seller_num_agg":{21 "value":322 },23 "simple_value#Cumulative_Seller_num_agg":{24 "value":525 }26 },27 "2017-02-01 00:00:00":{28 "key_as_string":"2017-02-01 00:00:00",29 "key":1485907200000,30 "doc_count":4,31 "sum#seller_num_agg":{32 "value":433 },34 "simple_value#Cumulative_Seller_num_agg":{35 "value":936 }37 }38 }39 }40 }
Max Bucket 聚合
与 avg类似。
Min Bucket 聚合
与 avg类似。
Sum Bucket 聚合
与 avg类似。
Stats Bucket 聚合
与 avg类似。
更多文章请关注公众号:
一波广告来袭:作者新书《RocketMQ技术内幕》出版上市啦。
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
本文分享自微信公众号 - 中间件兴趣圈(dingwpmz_zjj)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。