Elasticsearch Multi Get、 Bulk API详解、原理与示例

Stella981
• 阅读 807

本文将详细介绍批量获取API(Multi Get API)与Bulk API。

1、Multi Get API

  • public final MultiGetResponse mget(MultiGetRequest multiGetRequest, RequestOptions options) throws IOException

  • public final void mgetAsync(MultiGetRequest multiGetRequest, RequestOptions options, ActionListener listener)

其核心需要关注MultiGetRequest 。

Elasticsearch Multi Get、 Bulk API详解、原理与示例

从上面所知,mget及批量获取文档,通过add方法添加多个Item,每一个item代表一个文件获取请求,其相关字段已在get API中详细介绍,这里就不做过多详解。

Mget API使用示例

public static void testMget() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            MultiGetRequest request = new MultiGetRequest();
            request.add("twitter", "_doc", "10");
            request.add("twitter", "_doc", "11");
            request.add("twitter", "_doc", "12");
            request.add("gisdemo", "_doc", "10");
            MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

返回的结果其本质是一个 GetResponse的数组,不会因为其中一个失败,整个请求失败,但其结果中会标明每一个是否成功。其返回结果类图如下:

Elasticsearch Multi Get、 Bulk API详解、原理与示例

其字段过滤(Source filtering)、路由等机制与Get API相同,故不重复讲解。

2、Bluk API详解

Bulk API可以在一次API调用中包含多个索引操作,例如更新索引,删除索引等。其API定义如下:

  • public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException

  • public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener)

其核心需要关注BulkRequest。

2.1BulkRequest详解

Elasticsearch Multi Get、 Bulk API详解、原理与示例

  • List requests:单个命令容器,DocWriteRequest的子类包括:IndexRequest、UpdateRequest、DeleteRequest。

  • private final Set indices:requests涉及到的索引。

  • List payloads :有效载荷,6.4.0版本,貌似该字段意义不大,通常命令的请求体(负载数据)存放在DocWriteRequest对象中,例如IndexRequest的source字段。

  • protected TimeValue timeout:timeout机制,针对一个Bulk请求生效。

  • ActiveShardCount waitForActiveShards:针对整个Bulk请求有效。

  • private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:刷新策略。

  • private long sizeInBytes = 0:整个Bulk请求的大小。

  • 通过add api为BulkRequest添加一个请求。

    2.2 Bulk API请求格式详解

    Bulk Rest请求协议基于如下格式:

    POST _bulk
    { "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
    { "field1" : "value1" }
    { "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
    { "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
    { "field1" : "value3" }
    { "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
    { "doc" : {"field2" : "value2"} }
    

    其请求格式定义如下(restfull):

    • POST请求,其Content-Type为application/x-ndjson。

    • 每一个命令占用两行,每行的结束字符为\r\n。

    • 第一行为元数据,"opType" : {元数据}。

    • 第二行为有效载体(非必选),例如Index操作,其有效载荷为IndexRequest#source字段。

    • opType可选值 index、create、update、delete。

    • 公用元数据(index、create、update、delete)如下

    1)_index :索引名

    2)_type:类型名

    3)_id:文档ID

    4)routing:路由值

    5)parent

    6)version:数据版本号

    7)version_type:版本类型

    • 各操作特有元数据

    1、index | create

    1)pipeline

    2、update

    1)retry_on_conflict :更新冲突时重试次数。

    2)_source:字段过滤。

    • 有效载荷说明

    1、index | create

    其有效载荷为_source字段。

    2、update

    其有效载荷为:partial doc, upsert and script。

    3、delete

    没有有效载荷。

    对请求格式为什么要设计成metdata+有效载体的方式,主要是为了在接受端节点(所谓的接受端节点是指收到命令的第一节点),只需解析metadata,然后将请求直接转发给对应的数据节点。

    2.3 bulk API通用特性分析

    2.3.1 版本管理

    每一个Bulk条目拥有独自的version,存在于请求条目的item的元数据中。

    2.3.2 路由

    每一个Bulk条目各自生效。

    2.3.3 Wait For Active Shards

    通常可以设置BulkRequest#waitForActiveShards来要求Bulk批量执行之前要求处于激活的最小副本数。

    2.3.4 Bulk Demo

    public static final void testBulk() {
            RestHighLevelClient client = EsClient.getClient();
            try {
                IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12")
                        .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk"));
    
                UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11")
                            .doc(new IndexRequest("twitter", "_doc", "11")
                                    .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update")));
    
                BulkRequest request = new BulkRequest();
                request.add(indexRequest);
                request.add(updateRequest);
                BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
                for (BulkItemResponse bulkItemResponse : bulkResponse) { 
                    if (bulkItemResponse.isFailed()) { 
                        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); 
                        System.out.println(failure);
                        continue;
                    }
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 
                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        System.out.println(indexRequest);
                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        System.out.println(updateRequest);
                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        System.out.println(deleteResponse);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                EsClient.close(client);
            }
        }
    

    更多文章请关注公众号中间件兴趣圈:

    Elasticsearch Multi Get、 Bulk API详解、原理与示例

    本文分享自微信公众号 - 中间件兴趣圈(dingwpmz_zjj)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    点赞
    收藏
    评论区
    推荐文章
    blmius blmius
    3年前
    MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
    文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
    Wesley13 Wesley13
    3年前
    java将前端的json数组字符串转换为列表
    记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
    皕杰报表之UUID
    ​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
    待兔 待兔
    5个月前
    手写Java HashMap源码
    HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
    Jacquelyn38 Jacquelyn38
    3年前
    2020年前端实用代码段,为你的工作保驾护航
    有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
    Wesley13 Wesley13
    3年前
    mysql设置时区
    mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
    Wesley13 Wesley13
    3年前
    00:Java简单了解
    浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
    Stella981 Stella981
    3年前
    Django中Admin中的一些参数配置
    设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
    Wesley13 Wesley13
    3年前
    MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
    背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
    Python进阶者 Python进阶者
    11个月前
    Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
    大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这