Redis使用pipeLine批量获取数据加快接口响应速度

Stella981
• 阅读 819

一、背景

  • 需求:

    redis通过tcp来对外提供服务,client通过socket连接发起请求,每个请求在命令发出后会阻塞等待redis服务器进行处理,处理完毕后将结果返回给client。

    其实和一个http的服务器类似,一问一答,请求一次给一次响应。而这个过程在排除掉redis服务本身做复杂操作时的耗时的话,可以看到最耗时的就是这个网络传输过程。每一个命令都对应了发送、接收两个网络传输,假如一个流程需要0.1秒,那么一秒最多只能处理10个请求,将严重制约redis的性能。

    在很多场景下,我们要完成一个业务,可能会对redis做连续的多个操作,譬如库存减一、订单加一、余额扣减等等,这有很多个步骤是需要依次连续执行的。

  • 潜在隐患:这样的场景,网络传输的耗时将是限制redis处理量的主要瓶颈。循环key,获取value,可能会造成连接池的连接数增多,连接的创建和摧毁,消耗性能

  • 解决方法:

    可以引入pipeline了,pipeline管道就是解决执行大量命令时、会产生大量同学次数而导致延迟的技术。

    其实原理很简单,pipeline就是把所有的命令一次发过去,避免频繁的发送、接收带来的网络开销,redis在打包接收到一堆命令后,依次执行,然后把结果再打包返回给客户端。

    根据项目中的缓存数据结构的实际情况,数据结构为string类型的,使用RedisTemplate的multiGet方法;数据结构为hash,使用Pipeline(管道),组合命令,批量操作redis。

二、操作

  1. RedisTemplate的multiGet的操作

    • 针对数据结构为String类型

    • 示例代码

    List keys = new ArrayList<>(); for (Book e : booklist) { String key = generateKey.getKey(e); keys.add(key); } List resultStr = template.opsForValue().multiGet(

    2.RedisTemplate的Pipeline使用

    为什么Pipelining这么快?    

    先看看原来的多条命令,是如何执行的:    

    Redis Client->>Redis Server: 发送第1个命令

    Redis Server->>Redis Client: 响应第1个命令

    Redis Client->>Redis Server: 发送第2个命令

    Redis Server->>Redis Client: 响应第2个命令

    Redis Client->>Redis Server: 发送第n个命令

    Redis Server->>Redis Client: 响应第n个命令

 Pipeling机制是怎样的呢:
    Redis Client->>Redis Server: 发送第1个命令(缓存在Redis Client,未即时发送)
    Redis Client->>Redis Server: 发送第2个命令(缓存在Redis Client,未即时发送)
    Redis Client->>Redis Server: 发送第n个命令(缓存在Redis Client,未即时发送)
    Redis Client->>Redis Server: 发送累积的命令
    Redis Server->>Redis Client: 响应第1、2、n个命令

  • 示例代码

    package cn.chinotan.controller;

    import cn.chinotan.service.RedisService; import lombok.extern.java.Log; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;

    import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit;

    /**

    • @program: test
    • @description: redis批量数据测试
    • @author: xingcheng
    • @create: 2019-03-16 16:26

    **/ @RestController @RequestMapping("/redisBatch") @Log public class RedisBatchController {

    @Autowired
    StringRedisTemplate redisTemplate;
    
    @Autowired
    Map<String, RedisService> redisServiceMap;
    
    /**
     * VALUE缓存时间 3分钟
     */
    public static final Integer VALUE_TIME = 1;
    
    /**
     * 测试列表长度
     */
    public static final Integer SIZE = 100000;
    
    @GetMapping(value = "/test/{model}")
    public Object hello(@PathVariable("model") String model) {
        List<Map<String, String>> saveList = new ArrayList<>(SIZE);
        List<String> keyList = new ArrayList<>(SIZE);
        for (int i = 0; i < SIZE; i++) {
            Map<String, String> objectObjectMap = new HashMap<>();
            String key = String.valueOf(i);
            objectObjectMap.put("key", key);
            StringBuilder sb = new StringBuilder();
            objectObjectMap.put("value", sb.append("value").append(i).toString());
            saveList.add(objectObjectMap);
            // 记录全部key
            keyList.add(key);
        }
        
        // 获取对应的实现
        RedisService redisService = redisServiceMap.get(model);
        
        long saveStart = System.currentTimeMillis();
        redisService.batchInsert(saveList, TimeUnit.MINUTES, VALUE_TIME);
        long saveEnd = System.currentTimeMillis();
        log.info("插入耗时:" + (saveEnd - saveStart) + " ms");
        // 批量获取
        long getStart = System.currentTimeMillis();
        List<String> valueList = redisService.batchGet(keyList);
        long getEnd = System.currentTimeMillis();
        log.info("获取耗时:" + (getEnd - getStart) + " ms");
        return valueList;
    }
    

    }

    package cn.chinotan.controller;

    import cn.chinotan.service.RedisService; import lombok.extern.java.Log; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;

    import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit;

    /**

    • @program: test
    • @description: redis批量数据测试
    • @author: xingcheng
    • @create: 2019-03-16 16:26

    **/ @RestController @RequestMapping("/redisBatch") @Log public class RedisBatchController {

    @Autowired
    StringRedisTemplate redisTemplate;
    
    @Autowired
    Map<String, RedisService> redisServiceMap;
    
    /**
     * VALUE缓存时间 3分钟
     */
    public static final Integer VALUE_TIME = 1;
    
    /**
     * 测试列表长度
     */
    public static final Integer SIZE = 100000;
    
    @GetMapping(value = "/test/{model}")
    public Object hello(@PathVariable("model") String model) {
        List<Map<String, String>> saveList = new ArrayList<>(SIZE);
        List<String> keyList = new ArrayList<>(SIZE);
        for (int i = 0; i < SIZE; i++) {
            Map<String, String> objectObjectMap = new HashMap<>();
            String key = String.valueOf(i);
            objectObjectMap.put("key", key);
            StringBuilder sb = new StringBuilder();
            objectObjectMap.put("value", sb.append("value").append(i).toString());
            saveList.add(objectObjectMap);
            // 记录全部key
            keyList.add(key);
        }
        
        // 获取对应的实现
        RedisService redisService = redisServiceMap.get(model);
        
        long saveStart = System.currentTimeMillis();
        redisService.batchInsert(saveList, TimeUnit.MINUTES, VALUE_TIME);
        long saveEnd = System.currentTimeMillis();
        log.info("插入耗时:" + (saveEnd - saveStart) + " ms");
        // 批量获取
        long getStart = System.currentTimeMillis();
        List<String> valueList = redisService.batchGet(keyList);
        long getEnd = System.currentTimeMillis();
        log.info("获取耗时:" + (getEnd - getStart) + " ms");
        return valueList;
    }
    

    }

    package cn.chinotan.service;

    import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.StringRedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.SessionCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service;

    import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors;

    /**

    • @program: test
    • @description: redis管道操作
    • @author: xingcheng
    • @create: 2019-03-16 16:47

    **/ @Service("pipe") public class RedisPipelineService implements RedisService {

    @Autowired
    StringRedisTemplate redisTemplate;
    
    @Override
    public void batchInsert(List<Map<String, String>> saveList, TimeUnit unit, int timeout) {
        /* 插入多条数据 */
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
                for (Map<String, String> needSave : saveList) {
                    redisTemplate.opsForValue().set(needSave.get("key"), needSave.get("value"), timeout,unit);
                }
                return null;
            }
        });
    }
    
    @Override
    public List<String> batchGet(List<String> keyList) {
        /* 批量获取多条数据 */
        List<Object> objects = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
                StringRedisConnection stringRedisConnection = (StringRedisConnection) redisConnection;
                for (String key : keyList) {
                    stringRedisConnection.get(key);
                }
                return null;
            }
        });
    
        List<String> collect = objects.stream().map(val -> String.valueOf(val)).collect(Collectors.toList());
    
        return collect;
    }
    

    }

    package cn.chinotan.service;

    import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service;

    import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit;

    /**

    • @program: test
    • @description: redis普通遍历操作
    • @author: xingcheng
    • @create: 2019-03-16 16:47

    **/ @Service("generic") public class RedisGenericService implements RedisService {

    @Autowired
    StringRedisTemplate redisTemplate;
    
    @Override
    public void batchInsert(List<Map<String, String>> saveList, TimeUnit unit, int timeout) {
        for (Map<String, String> needSave : saveList) {
            redisTemplate.opsForValue().set(needSave.get("key"), needSave.get("value"), timeout,unit);
        }
    }
    
    @Override
    public List<String> batchGet(List<String> keyList) {
        List<String> values = new ArrayList<>(keyList.size());
        for (String key : keyList) {
            String value = redisTemplate.opsForValue().get(key);
            values.add(value);
        }
        return values;
    }
    

    }

测试结果:

Redis使用pipeLine批量获取数据加快接口响应速度

Redis使用pipeLine批量获取数据加快接口响应速度

Redis使用pipeLine批量获取数据加快接口响应速度

Redis使用pipeLine批量获取数据加快接口响应速度

可以看到性能提升了20倍之多

基于其特性,它有两个明显的局限性:

  • 鉴于Pipepining发送命令的特性,Redis服务器是以队列来存储准备执行的命令,而队列是存放在有限的内存中的,所以不宜一次性发送过多的命令。如果需要大量的命令,可分批进行,效率不会相差太远滴,总好过内存溢出嘛~~
  • 由于pipeline的原理是收集需执行的命令,到最后才一次性执行。所以无法在中途立即查得数据的结果(需待pipelining完毕后才能查得结果),这样会使得无法立即查得数据进行条件判断(比如判断是非继续插入记录)。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这