Redis(四)——消息队列

Stella981
• 阅读 443

Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。

**性质:**由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

(列表常用命令)

RPUSH : RPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表右端

LPUSH : LPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表左端

RPOP : RPOP key-name----------移除并返回列表最右端元素

LPOP :LPOP key-name----------移除并返回列表最左端元素

LINDEX : LINDEX key-name offset --------------返回列表中偏移量为offset的元素

LRANGE : LRANGE key-name start end -------------返回列表中偏移量从start到end范围内的元素

LTRIM : LTRIM key-name start end ----------------对列表进行修剪,只保留偏移量从start到end范围内的元素

其中简单示例如下: 首先连接redis服务器,其中我应用了Jedispool,代码如下:

package redis;

import java.io.IOException;
import java.util.Properties;

import org.springframework.core.io.support.PropertiesLoaderUtils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
 * redis单例连接池
 * @author admin
 *
 */
public class RedisPool {

    private static  int TIMEOUT = 1000*30;
    private static  int MAXTOTAL = 1024;
    private static  int MAXIDLE = 100;
    private static  String REDISIP = "bei1";
    private static  int PORT = 6379;
    private static  String PASSWORD ="default";

    static {
        try {
            Properties prop = PropertiesLoaderUtils.loadAllProperties("redis.properties");
            TIMEOUT = Integer.parseInt(prop.getProperty("TIMEOUT","300000"));
            MAXTOTAL = Integer.parseInt(prop.getProperty("MAXTOTAL","1024"));
            MAXIDLE = Integer.parseInt(prop.getProperty("MAXIDLE","100"));
            REDISIP = prop.getProperty("REDISIP","127.0.0.1");
            PORT = Integer.parseInt(prop.getProperty("PORT","6379"));
            PASSWORD = prop.getProperty("PASSWORD","default");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static JedisPool[] pool = new JedisPool[10];

    private  RedisPool() {}

    private static JedisPool getPool(int database) {
        if(database>10) {
            return null;
        }
        if(pool[database] == null) {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(MAXTOTAL);
            config.setMaxIdle(MAXIDLE);
            config.setMaxWaitMillis(TIMEOUT);
            config.setTestOnBorrow(true);
            pool[database] = new JedisPool(config,REDISIP,PORT,TIMEOUT,PASSWORD,database);
        }
        return pool[database];
    }
    //单例获取redis连接资源
    public static Jedis getResource(int database) {
        if(database>10) {
            return null;
        }
        Jedis jedis = null;
        if(pool[database] == null) {
            synchronized(RedisPool.class) {
                try {
                    if(pool[database] == null) {
                        pool[database] = getPool(database);
                        try {
                            if (pool[database] != null) {
                                jedis = pool[database].getResource();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }else {
            jedis = pool[database].getResource();
        }
        return jedis;
    }

}

定义一个生产者,代码:

package RedisMq;

import com.sun.deploy.util.StringUtils;
import redis.RedisPool;
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

/**
 * <p>  </p>
 *
 * @author ly
 * @since 2019/1/5
 */
public class Producer extends Thread{

    public static final String MESSAGE_KEY = "queue";
    private Jedis jedis;
    private String produceName;
    private volatile int count;

    public Producer(String name){
        this.produceName = name;
        init();
    }
    private void init(){
        jedis = RedisPool.getResource(1);

    }
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(produceName + ": 当前未被处理消息条数为:" + size);
        count++;
    }

    public int getCount() {
        return count;
    }
    @Override
    public void run() {
        try {
            while (true) {
                putMessage("hello world");
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer("myProducer");
        producer.start();

        for (; ; ) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

再定义一个消费者

package RedisMq;

import redis.RedisPool;
import redis.clients.jedis.Jedis;
/**
 * <p>  </p>
 *
 * @author ly
 * @since 2019/1/7
 */


    /**
     * 消息消费者
     * @author yamikaze
     */
    public class Customer extends Thread{

        private String customerName;
        private volatile int count;
        private Jedis jedis;

        public Customer(String name) {
            this.customerName = name;
            init();
        }

        private void init() {
            jedis = RedisPool.getResource(1);
        }

        public void processMessage() {
            String message = jedis.rpop(Producer.MESSAGE_KEY);
            if(message != null) {
                count++;
                handle(message);
            }
        }

        public void handle(String message) {
            System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
        }

        @Override
        public void run() {
            while (true) {
                processMessage();
            }
        }

        public static void main(String[] args) {
            Customer customer = new Customer("小花");
            customer.start();
        }
    }

运行后 生产者和消费者控制台信息分别如下: Redis(四)——消息队列 Redis(四)——消息队列

Redis 发布与订阅

redis 支持消息队列。发布订阅即是一种消息通信模式:发送者发送消息,订阅者订阅消息。

redis 客户端可以订阅任意数量的频道

(一)发布订阅 使用 publish 指令,格式为 publish channel message

127.0.0.1:6379> publish fruit "apple"
(integer) 0
 

该返回值为0,说明没有人订阅

(二)订阅消息 使用subscribe指令接受消息,格式为 subscribe channel

127.0.0.1:6379> subscribe fruit
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "fruit"
3) (integer) 1

可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。 回复信息分为3类: 1 如果为subscribe,第二个值表示订阅的频道,如上述代码 Redis(四)——消息队列

2 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息,如图: Redis(四)——消息队列

3 如果退订消息 ,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。则接受信息如下 Redis(四)——消息队列

(三)取消订阅 使用Unsubscribe 指令,格式为 UNSUBSCRIBE channel [channel ...]

127.0.0.1:6379>  unsubscribe fruit
1) "unsubscribe"
2) "fruit"
3) (integer) 0

参考文章https://blog.csdn.net/qq_34212276/article/details/78455004

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
3年前
Redis发布订阅(Pub
一、redis做消息队列1\.redis存储的list数据是双向链表实现的,可以作为队列2\.使用lpush和rpop实现入队和出队3\.每次使用lpush和rpop都要发起一次连接,性能不好4\.这是一次生产,一次消费的队列二、发布/订阅模式(publish/subscribe),也是作为消息队列1\.可以一次生产
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这