Redis编程实践【pipeline和事务】

Stella981
• 阅读 688

本文转载自 http://shift-alt-ctrl.iteye.com/blog/1863790,感谢原作者分享

Redis或许已经在很多企业开始推广并试水,本文也根据个人的实践,简单描述一下Redis在实际开发过程中的使用(部署与架构,稍后介绍),程序执行环境为java + jedis,关于spring下如何集成redis-api,稍后介绍吧。

前言:下载redis-2.6.2,安装好redis之后,请在redis.conf文件中,将如下3个配置属性开启(仅供测试使用):

Xml代码  

  1. ##客户端链接的端口,也是server端侦听client链接的端口
  2. ##每个client实例,都将和server在此端口上建立tcp长链接
  3. port 6379
  4. server端绑定的ip地址,如果一个物理机器有多个网络接口时,可以明确指定为某个网口的ip地址

  5. bind 127.0.0.1
  6. ##链接中io操作空闲时间,如果在指定时间内,没有IO操作,链接将会被关闭
  7. ##此属性和TCP链接中的timeout选项一样,建议设置为0,很多时候,我们一个应用也只会有一个redis实例
  8. ##不过,如果你使用连接池的话,你需要对此参数做额外的考虑。
  9. timeout 0

1. Pipeline:“管道”,和很多设计模式中的“管道”具有同样的概念,pipleline的操作,将明确client与server端的交互,都是“单向的”:你可以将多个command,依次发给server,但在此期间,你将无法获得单个command的响应数据,此后你可以关闭“请求”,然后依次获取每个command的响应结果。

    从简单来说,在IO操作层面,对于client而言,就是一次批量的连续的“write”请求,然后是批量的连续的“read”操作。其实对于底层Socket-IO而言,对于client而言,只不过是多次write,然后一次read的操作;对于server端,input通道上read到数据之后,就会立即被实施,也会和非pipeline一样在output通道上输出执行的结果,只不过此时数据会被阻塞在网络缓冲区上,直到client端开始read或者关闭链接。神秘的面纱已被解开,或许你也能创造一个pipeline的实现。

    非pipleline模式:

    Request---->执行

---->Response

Request---->执行

---->Response

    Pipeline模式下:

    Request---->执行,Server将响应结果队列化

    Request---->执行,Server将响应结果队列化

    ---->Response

    ---->Response

    Client端根据Redis的数据协议,将响应结果进行解析,并将结果做类似于“队列化”的操作。

Java代码  

  1. public void pipeline(){

  2. String key = "pipeline-test";

  3. String old = jedis.get(key);

  4. if(old != null){

  5. System.out.println("Key:" + key + ",old value:" + old);

  6. }

  7. //代码模式1,这种模式是最常见的方式

  8. Pipeline p1 = jedis.pipelined();

  9. p1.incr(key);

  10. System.out.println("Request incr");

  11. p1.incr(key);

  12. System.out.println("Request incr");

  13. //结束pipeline,并开始从相应中获得数据

  14. List responses = p1.syncAndReturnAll();

  15. if(responses == null || responses.isEmpty()){

  16. throw new RuntimeException("Pipeline error: no response...");

  17. }

  18. for(Object resp : responses){

  19. System.out.println("Response:" + resp.toString());//注意,此处resp的类型为Long

  20. }

  21. //代码模式2

  22. Pipeline p2 = jedis.pipelined();

  23. Response r1 = p2.incr(key);

  24. try{

  25. r1.get();

  26. }catch(Exception e){

  27. System.out.println("Error,you cant get() before sync,because IO of response hasn't begin..");

  28. }

  29. Response r2 = p2.incr(key);

  30. p2.sync();

  31. System.out.println("Pipeline,mode 2,--->" + r1.get());

  32. System.out.println("Pipeline,mode 2,--->" + r2.get());

  33. }

  34.      不过需要明确一下,pipeline和“事务”是两个完全不同的概念,pipeline只是表达“交互”中操作的传递的方向性,pipeline也可以在事务中运行,也可以不在。无论如何,pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的相应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义;但是如果pipeline的操作被封装在事务中,那么将有事务来确保操作的成功与失败(事实上,Redis的事务,仍然不像严格意义上的事务,稍后介绍)。

    Java代码  

    1. public void txPipeline(){
    2. String key = "pipeline-test";
    3. String old = jedis.get(key);
    4. if(old != null){
    5. System.out.println("Key:" + key + ",old value:" + old);
    6. }
    7. Pipeline p1 = jedis.pipelined();
    8. p1.multi();//开启事务
    9. p1.incr(key);
    10. System.out.println("Request incr");
    11. p1.incr(key);
    12. System.out.println("Request incr");
    13. Response<List> txresult= p1.exec();//提交事务
    14. p1.sync();//关闭pipeline
    15. //结束pipeline,并开始从相应中获得数据
    16. List responses = txresult.get();
    17. if(responses == null || responses.isEmpty()){
    18. throw new RuntimeException("Pipeline error: no response...");
    19. }
    20. for(Object resp : responses){
    21. System.out.println("Response:" + resp.toString());//注意,此处resp的类型为Long
    22. }
    23. }
    24.      Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,而且对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP链接中较少了“交互往返”的时间。

          不过在编码时请注意,pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;比如在上述代码中间,使用jedis.set(key,value)等操作都将抛出异常。

          如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。不过pipeline事实上所能容忍的操作个数,和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;同时也意味着每个redis-server同时所能支撑的pipeline链接的个数,也是有限的,这将受限于server的物理内存或网络接口的缓冲能力。

          使用场景举例:因为业务需要,我们需要把用户的操作过程记录在日志中以方便以后的统计,每隔3个小时生成一个新的日志文件,那么后台处理线程,将会扫描日志文件并将每条日志输出为“operation”:1,即表示操作次数为1;如果每个operation都发送一个command,事实上性能是很差的,而且是没有必要的;那么我们就可以使用pipeline批量提交即可。

      2.Transaction(事务):

          Redis提供了简单的“事务”能力,MULTI,EXEC,DISCARD,WATCH/UNWATCH指令用来操作事务。

          1) MUTIL:开启事务,此后所有的操作将会添加到当前链接的事务“操作队列”中。

          2) EXEC:提交事务

          3) DISCARD:取消事务,记住,此指令不是严格意义上的“事务回滚”,只是表达了“事务操作被取消”的语义,将会导致事务的操作队列中的操作不会被执行,且事务关闭。

          4) WATCH/UNWATCH:“观察”,这个操作也可以说是Redis的特殊功能,但是也可说是Redis不能提供“绝对意义上”的事务能力而增加的一个“补充特性”(比如事务隔离,多事务中操作冲突解决等);在事务开启前,可以对某个KEY注册“WATCH”,如果在事务提交后,将会首先检测“WATCH”列表中的KEY集合是否被其他客户端修改,如果任意一个KEY 被修改,都将会导致事务直接被“DISCARD”;即使事务中没有操作某个WATCH KEY,如果此KEY被外部修改,仍然会导致事务取消。事务执行成功或者被DISCARD,都将会导致WATCH KEY被“UNWATCH”,因此事务之后,你需要重新WATCH。WATCH需要在事务开启之前执行。

          WATCH所注册的KEY,事实上无论是被其他Client修改还是当前Client修改,如果不重新WATCH,都将无法在事务中正确执行。WATCH指令本身就是为事务而生,你或许不会在其他场景下使用WATCH;例如:

      Java代码  

      1. String key = "transaction-key";
      2. jedis.set(key, "20");
      3. jedis.watch(key);//注册key,此后key将会被监控,如果在事务执行前被修改,则导致事务被DISCARD。
      4. jedis.incr(key);//此key被修改,即使是自己,也会导致watch在事务中执行失效
      5. jedis.unwatch();//取消注册
      6. jedis.watch(key);//重新注册,在重新注册前,必须unwatch
      7. Transaction tx = jedis.multi();//开启事务
      8. ....

           Redis中,如果一个事务被提交,那么事务中的所有操作将会被顺序执行,且在事务执行期间,其他client的操作将会被阻塞;Redis采取了这种简单而“粗鲁”的方式来确保事务的执行更加的快速和更少的外部干扰因素。

          EXEC指令将会触发事务中所有的操作被写入AOF文件(如果开启了AOF),然后开始在内存中实施这些数据变更操作;Redis将会尽力确保事务中所有的操作都能够执行,如果redis环境故障,有可能导致事务未能成功执行,那么需要在redis重启后增加额外的校验工作。

         如果在EXEC指令被提交之前,Redis-server即检测到提交的某个指令存在语法错误,那么此事务将会被提前标记为DISCARD,此后事务提交也将直接被驳回;但是如果在EXEC提交后,在实施数据变更时(Redis将不会预检测数据类型,比如你对一个“非数字”类型的key执行INCR操作),某个操作导致了ERROR,那么redis仍然不会回滚此前已经执行成功的操作,而且也不会中断ERROR之后的其他操作继续执行。对于开发者而言,你务必关注事务执行后返回的结果(结果将是一个集合,按照操作提交的顺序排列,对于执行失败的操作,结果将是一个ERROR)。

          Redis的事务之所以如此设计,它为了确保本身的性能,同时不引入“关系型数据库”的设计复杂度;你不能完全希望Redis能为你交付完美的事务操作,只能说,你选择了错误的工具。

      Java代码  

      1. public void transaction(){
      2. String key = "transaction-key";
      3. jedis.set(key, "20");
      4. jedis.watch(key);
      5. Transaction tx = jedis.multi();
      6. tx.incr(key);
      7. tx.incr(key);
      8. tx.incr(key);
      9. List result = tx.exec();
      10. if(result == null || result.isEmpty()){
      11. System.out.println("Transaction error...");//可能是watch-key被外部修改,或者是数据操作被驳回
      12. return;
      13. }
      14. for(Object rt : result){
      15. System.out.println(rt.toString());
      16. }
      17. }
      18. 关于这两个主题,暂且到此,如有错误,请不吝赐教。谢谢。

        点赞
        收藏
        评论区
        推荐文章
        blmius blmius
        3年前
        MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
        文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
        皕杰报表之UUID
        ​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
        待兔 待兔
        6个月前
        手写Java HashMap源码
        HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
        Jacquelyn38 Jacquelyn38
        3年前
        2020年前端实用代码段,为你的工作保驾护航
        有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
        Stella981 Stella981
        3年前
        KVM调整cpu和内存
        一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
        Stella981 Stella981
        3年前
        Python之time模块的时间戳、时间字符串格式化与转换
        Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
        Wesley13 Wesley13
        3年前
        00:Java简单了解
        浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
        Wesley13 Wesley13
        3年前
        MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
        背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
        Python进阶者 Python进阶者
        1年前
        Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
        大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
        美凌格栋栋酱 美凌格栋栋酱
        2小时前
        Oracle 分组与拼接字符串同时使用
        SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(