Redis最全详解

Stella981
• 阅读 661

1.Redis核心数据结构与核心原理

Redis安装

下载地址:http://redis.io/download

安装步骤:

# 安装gcc

yum install gcc

# 把下载好的redis-5.0.3.tar.gz放在/usr/local文件夹下,并解压

wget http://download.redis.io/releases/redis-5.0.3.tar.gz

tar xzf redis-5.0.3.tar.gz

cd redis-5.0.3

# 进入到解压好的redis-5.0.3目录下,进行编译与安装

make

# 启动并指定配置文件

src/redis-server redis.conf(注意要使用后台启动,所以修改redis.conf里的daemonize改为yes)

# 验证启动是否成功

ps -ef | grep redis

# 进入redis客户端

src/redis-cli

# 退出客户端

quit

# 退出redis服务:

(1)pkill redis-server

(2)kill 进程号

(3)src/redis-cli shutdown

Redis的单线程和高性能

Redis 单线程为什么还能这么快?

因为它所有的数据都在内存中,所有的运算都是内存级别的运算,而且单线程避免了多线程的切换性能损耗问题。正因为 Redis 是单线程,所以要小心使用 Redis 指令,对于那些耗时的指令(比如keys),一定要谨慎使用,一不小心就可能会导致 Redis 卡顿。

Redis 单线程如何处理那么多的并发客户端连接?

Redis的IO多路复用:redis利用epoll来实现IO多路复用,将连接信息和事件放到队列中,依次放到文件事件分派器,事件分派器将事件分发给事件处理器。

Redis最全详解

# 查看redis支持的最大连接数,在redis.conf文件中可修改,# maxclients 10000

127.0.0.1:6379> CONFIG GET maxclients

##1) "maxclients"

##2) "10000"

其他高级命令

keys:全量遍历键,用来列出所有满足特定正则字符串规则的key,当redis数据量比较大时,性能比较差,要避免使用

Redis最全详解

scan:渐进式遍历键

SCAN cursor [MATCH pattern] [COUNT count]

scan 参数提供了三个参数,第一个是 cursor 整数值,第二个是 key 的正则模式,第三个是一次遍历的key的数量,并不是符合条件的结果数量。第一次遍历时,cursor 值为 0,然后将返回结果中第一个整数值作为下一次遍历的 cursor。一直遍历到返回的 cursor 值为 0 时结束。

Redis最全详解

Redis最全详解

Info:查看redis服务运行信息,分为 9 大块,每个块都有非常多的参数,这 9 个块分别是:

Server 服务器运行的环境参数

Clients 客户端相关信息

Memory 服务器运行内存统计数据

Persistence 持久化信息

Stats 通用统计数据

Replication 主从复制相关信息

CPU CPU 使用情况

Cluster 集群信息

KeySpace 键值对统计数量信息

Redis最全详解

2.Redis持久化、主从与哨兵架构详解

Redis持久化

RDB快照(snapshot)

在默认情况下, Redis 将内存数据库快照保存在名字为 dump.rdb 的二进制文件中。

你可以对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时, 自动保存一次数据集。

比如说, 以下设置会让 Redis 在满足“ 60 秒内有至少有 1000 个键被改动”这一条件时, 自动保存一次数据集:

# save 60 1000

关闭RDB只需要将所有的save保存策略注释掉即可

还可以手动执行命令生成RDB快照,进入redis客户端执行命令save或bgsave可以生成dump.rdb文件,每次命令执行都会将所有redis内存快照到一个新的rdb文件里,并覆盖原有rdb快照文件。

save是同步命令,bgsave是异步命令,bgsave会从redis主进程fork(fork()是linux函数)出一个子进程专门用来生成rdb快照文件

save与bgsave对比:

命令

save

bgsave

IO类型

同步

异步

是否阻塞redis其它命令

否(在生成子进程执行调用fork函数时会有短暂阻塞)

复杂度

O(n)

O(n)

优点

不会消耗额外内存

不阻塞客户端命令

缺点

阻塞客户端命令

需要fork子进程,消耗内存

配置自动生成rdb文件后台使用的是bgsave方式。

AOF(append-only file)

快照功能并不是非常耐久(durable): 如果 Redis 因为某些原因而造成故障停机, 那么服务器将丢失最近写入、且仍未保存到快照中的那些数据。从 1.1 版本开始, Redis 增加了一种完全耐久的持久化方式: AOF 持久化,将修改的每一条指令记录进文件appendonly.aof中

你可以通过修改配置文件来打开 AOF 功能:

# appendonly yes

从现在开始, 每当 Redis 执行一个改变数据集的命令时(比如 SET), 这个命令就会被追加到 AOF 文件的末尾。

这样的话, 当 Redis 重新启动时, 程序就可以通过重新执行 AOF 文件中的命令来达到重建数据集的目的。

你可以配置 Redis 多久才将数据 fsync 到磁盘一次。

有三个选项:

  • appendfsync always:每次有新命令追加到 AOF 文件时就执行一次 fsync ,非常慢,也非常安全。
  • appendfsync everysec:每秒 fsync 一次,足够快(和使用 RDB 持久化差不多),并且在故障时只会丢失 1 秒钟的数据。
  • appendfsync no:从不 fsync ,将数据交给操作系统来处理。更快,也更不安全的选择。

推荐(并且也是默认)的措施为每秒 fsync 一次, 这种 fsync 策略可以兼顾速度和安全性。

AOF重写

AOF文件里可能有太多没用指令,所以AOF会定期根据内存的最新数据生成aof文件

例如,执行了如下几条命令:

127.0.0.1:6379> incr readcount

(integer) 1

127.0.0.1:6379> incr readcount

(integer) 2

127.0.0.1:6379> incr readcount

(integer) 3

127.0.0.1:6379> incr readcount

(integer) 4

127.0.0.1:6379> incr readcount

(integer) 5

重写后AOF文件里变成

*3

$3

SET

$2

readcount

$1

5

如下两个配置可以控制AOF自动重写频率

# auto-aof-rewrite-min-size 64mb //aof文件至少要达到64M才会自动重写,文件太小恢复速度本来就很快,重写的意义不大

# auto-aof-rewrite-percentage 100 //aof文件自上一次重写后文件大小增长了100%则再次触发重写

当然AOF还可以手动重写,进入redis客户端执行命令bgrewriteaof重写AOF

注意,AOF重写redis会fork出一个子进程去做,不会对redis正常命令处理有太多影响

RDB 和 AOF ,我应该用哪一个?

命令

RDB

AOF

启动优先级

体积

恢复速度

数据安全性

容易丢数据

根据策略决定

redis启动时如果既有rdb文件又有aof文件则优先选择aof文件恢复数据,因为aof一般来说数据更全一点。

Redis 4.0 混合持久化

重启 Redis 时,我们很少使用 RDB来恢复内存状态,因为会丢失大量数据。我们通常使用 AOF 日志重放,但是重放 AOF 日志性能相对 RDB来说要慢很多,这样在 Redis 实例很大的情况下,启动需要花费很长的时间。 Redis 4.0 为了解决这个问题,带来了一个新的持久化选项——混合持久化。

通过如下配置可以开启混合持久化:

# aof-use-rdb-preamble yes

如果开启了混合持久化,AOF在重写时,不再是单纯将内存数据转换为RESP命令写入AOF文件,而是将重写这一刻之前的内存做RDB快照处理,并且将RDB快照内容和增量的AOF修改内存数据的命令存在一起,都写入新的AOF文件,新的文件一开始不叫appendonly.aof,等到重写完新的AOF文件才会进行改名,原子的覆盖原有的AOF文件,完成新旧两个AOF文件的替换。

于是在 Redis 重启的时候,可以先加载 RDB 的内容,然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放,因此重启效率大幅得到提升。

混合持久化AOF文件结构

Redis最全详解

Redis主从架构

Redis最全详解

Redis主从工作原理

如果你为master配置了一个slave,不管这个slave是否是第一次连接上Master,它都会发送一个SYNC命令(redis2.8版本之前的命令)给master请求复制数据。

master收到SYNC命令后,会在后台进行数据持久化通过bgsave生成最新的rdb快照文件,持久化期间,master会继续接收客户端的请求,它会把这些可能修改数据集的请求缓存在内存中。当持久化进行完毕以后,master会把这份rdb文件数据集发送给slave,slave会把接收到的数据进行持久化生成rdb,然后再加载到内存中。然后,master再将之前缓存在内存中的命令发送给slave。

当master与slave之间的连接由于某些原因而断开时,slave能够自动重连Master,如果master收到了多个slave并发连接请求,它只会进行一次持久化,而不是一个连接一次,然后再把这一份持久化的数据发送给多个并发连接的slave。

当master和slave断开重连后,一般都会对整份数据进行复制。但从redis2.8版本开始,master和slave断开重连后支持部分复制。

数据部分复制

从2.8版本开始,slave与master能够在网络连接断开重连后只进行部分数据复制。

master会在其内存中创建一个复制数据用的缓存队列,缓存最近一段时间的数据,master和它所有的slave都维护了复制的数据下标offset和master的进程id,因此,当网络连接断开后,slave会请求master继续进行未完成的复制,从所记录的数据下标开始。如果master进程id变化了,或者从节点数据下标offset太旧,已经不在master的缓存队列里了,那么将会进行一次全量数据的复制。

从2.8版本开始,redis改用可以支持部分数据复制的命令PSYNC去master同步数据

主从复制(全量复制)流程图:

Redis最全详解

主从复制(部分复制)流程图:

Redis最全详解

redis主从架构搭建,配置从节点步骤:

1、复制一份redis.conf文件

2、将相关配置修改为如下值:

port 6380

pidfile /var/run/redis_6380.pid

logfile "6380.log"

dir /usr/local/redis-5.0.3/data/6380

3、配置主从复制

replicaof 192.168.0.60 6379   # 从本机6379的redis实例复制数据

replica-read-only yes

4、启动从节点

redis-server redis.conf

5、连接从节点

redis-cli -p 6380

6、测试在6379实例上写数据,6380实例是否能及时同步新修改数据

7、可以自己再配置一个6381的从节点

Jedis连接代码示例:

1、引入相关依赖:

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>2.9.0</version>

</dependency>

访问代码:

public class JedisSingleTest {

public static void main(String[] args) throws IOException {

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

jedisPoolConfig.setMaxTotal(20);

jedisPoolConfig.setMaxIdle(10);

jedisPoolConfig.setMinIdle(5);

// timeout,这里既是连接超时又是读写超时,从Jedis 2.8开始有区分connectionTimeout和soTimeout的构造函数

JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.0.60", 6379, 3000, null);

Jedis jedis = null;

try {

//从redis连接池里拿出一个连接执行命令

jedis = jedisPool.getResource();

System.out.println(jedis.set("single", "zhuge"));

System.out.println(jedis.get("single"));

//管道示例

//管道的命令执行方式:cat redis.txt | redis-cli -h 127.0.0.1 -a password - p 6379 --pipe

/*Pipeline pl = jedis.pipelined();

for (int i = 0; i < 10; i++) {

pl.incr("pipelineKey");

pl.set("zhuge" + i, "zhuge");

}

List<Object> results = pl.syncAndReturnAll();

System.out.println(results);*/

//lua脚本模拟一个商品减库存的原子操作

//lua脚本命令执行方式:redis-cli --eval /tmp/test.lua , 10

/*jedis.set("product_count_10016", "15");  //初始化商品10016的库存

String script = " local count = redis.call('get', KEYS[1]) " +

" local a = tonumber(count) " +

" local b = tonumber(ARGV[1]) " +

" if a >= b then " +

"   redis.call('set', KEYS[1], a-b) " +

"   return 1 " +

" end " +

" return 0 ";

Object obj = jedis.eval(script, Arrays.asList("product_count_10016"), Arrays.asList("10"));

System.out.println(obj);*/

} catch (Exception e) {

e.printStackTrace();

} finally {

//注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。

if (jedis != null)

jedis.close();

}

}

}

顺带讲下redis管道与调用lua脚本,代码示例上面已经给出:

管道(Pipeline)

客户端可以一次性发送多个请求而不用等待服务器的响应,待所有命令都发送完后再一次性读取服务的响应,这样可以极大的降低多条命令执行的网络传输开销,管道执行多条命令的网络开销实际上只相当于一次命令执行的网络开销。需要注意到是用pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。

pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的响应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行。

详细代码示例见上面jedis连接示例:

Pipeline pl = jedis.pipelined();

for (int i = 0; i < 10; i++) {

pl.incr("pipelineKey");

pl.set("zhuge" + i, "zhuge");

}

List<Object> results = pl.syncAndReturnAll();

System.out.println(results);

Redis Lua脚本

Redis在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到Redis中执行。使用脚本的好处如下:

1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延。这点跟管道类似。

2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的。

3、替代redis的事务功能:redis自带的事务功能很鸡肋,报错不支持回滚,而redis的lua脚本几乎实现了常规的事务功能,支持报错回滚操作,官方推荐如果要使用redis的事务功能可以用redis lua替代。

官网文档上有这样一段话:

A Redis script is transactional by definition, so everything you can do with a Redis transaction, you can also do with a script,

and usually the script will be both simpler and faster.

从Redis2.6.0版本开始,通过内置的Lua解释器,可以使用EVAL命令对Lua脚本进行求值。EVAL命令的格式如下:

EVAL script numkeys key [key ...] arg [arg ...] 

script参数是一段Lua脚本程序,它会被运行在Redis服务器上下文中,这段脚本不必(也不应该)定义为一个Lua函数。numkeys参数用于指定键名参数的个数。键名参数 key [key ...] 从EVAL的第三个参数开始算起,表示在脚本中所用到的那些Redis键(key),这些键名参数可以在 Lua中通过全局变量KEYS数组,用1为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。

在命令的最后,那些不是键名参数的附加参数 arg [arg ...] ,可以在Lua中通过全局变量ARGV数组访问,访问的形式和KEYS变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)。例如

127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second

1) "key1"

2) "key2"

3) "first"

4) "second"

其中 "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 是被求值的Lua脚本,数字2指定了键名参数的数量, key1和key2是键名参数,分别使用 KEYS[1] 和 KEYS[2] 访问,而最后的 first 和 second 则是附加参数,可以通过 ARGV[1] 和 ARGV[2] 访问它们。

在 Lua 脚本中,可以使用redis.call()函数来执行Redis命令

Jedis调用示例详见上面jedis连接示例:

jedis.set("product_stock_10016", "15");  //初始化商品10016的库存

String script = " local count = redis.call('get', KEYS[1]) " +

" local a = tonumber(count) " +

" local b = tonumber(ARGV[1]) " +

" if a >= b then " +

"   redis.call('set', KEYS[1], count-b) " +

"   return 1 " +

" end " +

" return 0 ";

Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));

System.out.println(obj);

注意,不要在Lua脚本中出现死循环和耗时的运算,否则redis会阻塞,将不接受其他的命令, 所以使用时要注意不能出现死循环、耗时的运算。redis是单进程、单线程执行脚本。管道不会阻塞redis。

Redis哨兵高可用架构

Redis最全详解

sentinel哨兵是特殊的redis服务,不提供读写服务,主要用来监控redis实例节点。

哨兵架构下client端第一次从哨兵找出redis的主节点,后续就直接访问redis的主节点,不会每次都通过sentinel代理访问redis的主节点,当redis的主节点发生变化,哨兵会第一时间感知到,并且将新的redis主节点通知给client端(这里面redis的client端一般都实现了订阅功能,订阅sentinel发布的节点变动消息)

redis哨兵架构搭建步骤:

1、复制一份sentinel.conf文件

cp sentinel.conf sentinel-26379.conf

2、将相关配置修改为如下值:

port 26379

daemonize yes

pidfile "/var/run/redis-sentinel-26379.pid"

logfile "26379.log"

dir "/usr/local/redis-5.0.3/data"

# sentinel monitor <master-name> <ip> <redis-port> <quorum>

# quorum是一个数字,指明当有多少个sentinel认为一个master失效时(值一般为:sentinel总数/2 + 1),master才算真正失效

sentinel monitor mymaster 192.168.0.60 6379 2

3、启动sentinel哨兵实例

src/redis-sentinel sentinel-26379.conf

4、查看sentinel的info信息

src/redis-cli -p 26379

127.0.0.1:26379>info

可以看到Sentinel的info里已经识别出了redis的主从

5、可以自己再配置两个sentinel,端口26380和26381,注意上述配置文件里的对应数字都要修改

哨兵的Jedis连接代码:

public class JedisSentinelTest {

public static void main(String[] args) throws IOException {

JedisPoolConfig config = new JedisPoolConfig();

config.setMaxTotal(20);

config.setMaxIdle(10);

config.setMinIdle(5);

String masterName = "mymaster";

Set<String> sentinels = new HashSet<String>();

sentinels.add(new HostAndPort("192.168.0.60",26379).toString());

sentinels.add(new HostAndPort("192.168.0.60",26380).toString());

sentinels.add(new HostAndPort("192.168.0.60",26381).toString());

//JedisSentinelPool其实本质跟JedisPool类似,都是与redis主节点建立的连接池

//JedisSentinelPool并不是说与sentinel建立的连接池,而是通过sentinel发现redis主节点并与其建立连接

JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(masterName, sentinels, config, 3000, null);

Jedis jedis = null;

try {

jedis = jedisSentinelPool.getResource();

System.out.println(jedis.set("sentinel", "zhuge"));

System.out.println(jedis.get("sentinel"));

} catch (Exception e) {

e.printStackTrace();

} finally {

//注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。

if (jedis != null)

jedis.close();

}

}

}

哨兵的Spring Boot整合Redis连接代码见示例项目:redis-sentinel-cluster

1、引入相关依赖:

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

<dependency>

<groupId>org.apache.commons</groupId>

<artifactId>commons-pool2</artifactId>

</dependency>

springboot项目核心配置:

server:

port: 8080

spring:

redis:

database: 0

timeout: 3000

sentinel:    #哨兵模式

master: mymaster #主服务器所在集群名称

nodes: 192.168.0.60:26379,192.168.0.60:26380,192.168.0.60:26381

lettuce:

pool:

max-idle: 50

min-idle: 10

max-active: 100

max-wait: 1000

访问代码:

@RestController

public class IndexController {

private static final Logger logger = LoggerFactory.getLogger(IndexController.class);

@Autowired

private StringRedisTemplate stringRedisTemplate;

/**

* 测试节点挂了哨兵重新选举新的master节点,客户端是否能动态感知到

* 新的master选举出来后,哨兵会把消息发布出去,客户端实际上是实现了一个消息监听机制,

* 当哨兵把新master的消息发布出去,客户端会立马感知到新master的信息,从而动态切换访问的masterip

*

* @throws InterruptedException

*/

@RequestMapping("/test_sentinel")

public void testSentinel() throws InterruptedException {

int i = 1;

while (true){

try {

stringRedisTemplate.opsForValue().set("zhuge"+i, i+"");

System.out.println("设置key:"+ "zhuge" + i);

i++;

Thread.sleep(1000);

}catch (Exception e){

logger.error("错误:", e);

}

}

}

}

StringRedisTemplate与RedisTemplate

spring 封装了 RedisTemplate 对象来进行对redis的各种操作,它支持所有的 redis 原生的 api。在RedisTemplate中提供了几个常用的接口方法的使用,分别是:

private ValueOperations<K, V> valueOps;

private HashOperations<K, V> hashOps;

private ListOperations<K, V> listOps;

private SetOperations<K, V> setOps;

private ZSetOperations<K, V> zSetOps;

RedisTemplate中定义了对5种数据结构操作

redisTemplate.opsForValue();//操作字符串

redisTemplate.opsForHash();//操作hash

redisTemplate.opsForList();//操作list

redisTemplate.opsForSet();//操作set

redisTemplate.opsForZSet();//操作有序set

StringRedisTemplate继承自RedisTemplate,也一样拥有上面这些操作。

StringRedisTemplate默认采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的。

RedisTemplate默认采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。

Redis客户端命令对应的RedisTemplate中的方法列表:

String类型结构

Redis

RedisTemplate rt

set key value

rt.opsForValue().set("key","value")

get key

rt.opsForValue().get("key")

del key

rt.delete("key")

strlen key

rt.opsForValue().size("key")

getset key value

rt.opsForValue().getAndSet("key","value")

getrange key start end

rt.opsForValue().get("key",start,end)

append key value

rt.opsForValue().append("key","value")

 

 

Hash结构

hmset key field1 value1 field2 value2...

rt.opsForHash().putAll("key",map) //map是一个集合对象

hset key field value

rt.opsForHash().put("key","field","value")

hexists key field

rt.opsForHash().hasKey("key","field")

hgetall key

rt.opsForHash().entries("key") //返回Map对象

hvals key

rt.opsForHash().values("key") //返回List对象

hkeys key

rt.opsForHash().keys("key") //返回List对象

hmget key field1 field2...

rt.opsForHash().multiGet("key",keyList)

hsetnx key field value

rt.opsForHash().putIfAbsent("key","field","value"

hdel key field1 field2

rt.opsForHash().delete("key","field1","field2")

hget key field

rt.opsForHash().get("key","field")

 

 

List结构

lpush list node1 node2 node3...

rt.opsForList().leftPush("list","node")

rt.opsForList().leftPushAll("list",list) //list是集合对象

rpush list node1 node2 node3...

rt.opsForList().rightPush("list","node")

rt.opsForList().rightPushAll("list",list) //list是集合对象

lindex key index

rt.opsForList().index("list", index)

llen key

rt.opsForList().size("key")

lpop key

rt.opsForList().leftPop("key")

rpop key

rt.opsForList().rightPop("key")

lpushx list node

rt.opsForList().leftPushIfPresent("list","node")

rpushx list node

rt.opsForList().rightPushIfPresent("list","node")

lrange list start end

rt.opsForList().range("list",start,end)

lrem list count value

rt.opsForList().remove("list",count,"value")

lset key index value

rt.opsForList().set("list",index,"value")

 

 

Set结构

sadd key member1 member2...

rt.boundSetOps("key").add("member1","member2",...)

rt.opsForSet().add("key", set) //set是一个集合对象

scard key

rt.opsForSet().size("key")

sidff key1 key2

rt.opsForSet().difference("key1","key2") //返回一个集合对象

sinter key1 key2

rt.opsForSet().intersect("key1","key2")//同上

sunion key1 key2

rt.opsForSet().union("key1","key2")//同上

sdiffstore des key1 key2

rt.opsForSet().differenceAndStore("key1","key2","des")

sinter des key1 key2

rt.opsForSet().intersectAndStore("key1","key2","des")

sunionstore des key1 key2

rt.opsForSet().unionAndStore("key1","key2","des")

sismember key member

rt.opsForSet().isMember("key","member")

smembers key

rt.opsForSet().members("key")

spop key

rt.opsForSet().pop("key")

srandmember key count

rt.opsForSet().randomMember("key",count)

srem key member1 member2...

rt.opsForSet().remove("key","member1","member2",...)

3.Redis缓存高可用集群

1、Redis集群方案比较

  • 哨兵模式

Redis最全详解

在redis3.0以前的版本要实现集群一般是借助哨兵sentinel工具来监控master节点的状态,如果master节点异常,则会做主从切换,将某一台slave作为master,哨兵的配置略微复杂,并且性能和高可用性等各方面表现一般,特别是在主从切换的瞬间存在访问瞬断的情况,而且哨兵模式只有一个主节点对外提供服务,没法支持很高的并发,且单个主节点内存也不宜设置得过大,否则会导致持久化文件过大,影响数据恢复或主从同步的效率

  • 高可用集群模式

Redis最全详解

redis集群是一个由多个主从节点群组成的分布式服务器群,它具有复制、高可用和分片特性。Redis集群不需要sentinel哨兵也能完成节点移除和故障转移的功能。需要将每个节点设置成集群模式,这种集群模式没有中心节点,可水平扩展,据官方文档称可以线性扩展到上万个节点(官方推荐不超过1000个节点)。redis集群的性能和高可用性均优于之前版本的哨兵模式,且集群配置非常简单

2、Redis高可用集群搭建

  • redis安装

    下载地址:http://redis.io/download

    安装步骤:

    安装gcc

    yum install gcc

    把下载好的redis-5.0.3.tar.gz放在/usr/local文件夹下,并解压

    wget http://download.redis.io/releases/redis-5.0.3.tar.gz

    tar xzf redis-5.0.3.tar.gz

    cd redis-5.0.3

    进入到解压好的redis-5.0.3目录下,进行编译与安装

    make

    启动并指定配置文件

    src/redis-server redis.conf(注意要使用后台启动,所以修改redis.conf里的daemonize改为yes)

    验证启动是否成功

    ps -ef | grep redis

    进入redis客户端

    /usr/local/redis/bin/redis-cli

    退出客户端

    quit

    退出redis服务:

    (1)pkill redis-server

    (2)kill 进程号

    (3)src/redis-cli shutdown

  • redis集群搭建

redis集群需要至少要三个master节点,我们这里搭建三个master节点,并且给每个master再搭建一个slave节点,总共6个redis节点,这里用三台机器部署6个redis实例,每台机器一主一从,搭建集群的步骤如下:

第一步:在第一台机器的/usr/local下创建文件夹redis-cluster,然后在其下面分别创建2个文件夾如下

(1)mkdir -p /usr/local/redis-cluster

(2)mkdir 8001 8004

第一步:把之前的redis.conf配置文件copy到8001下,修改如下内容:

(1)daemonize yes

(2)port 8001(分别对每个机器的端口号进行设置)

(3)dir /usr/local/redis-cluster/8001/(指定数据文件存放位置,必须要指定不同的目录位置,不然会丢失数据)

(4)cluster-enabled yes(启动集群模式)

(5)cluster-config-file nodes-8001.conf(集群节点信息文件,这里800x最好和port对应上)

(6)cluster-node-timeout 5000

 (7)# bind 127.0.0.1(去掉bind绑定访问ip信息)

 (8)protected-mode  no   (关闭保护模式)

 (9)appendonly yes

如果要设置密码需要增加如下配置:

 (10)requirepass zhuge     (设置redis访问密码)

 (11)masterauth zhuge      (设置集群节点间访问密码,跟上面一致)

第三步:把修改后的配置文件,copy到8004,修改第2、3、5项里的端口号,可以用批量替换:

:%s/源字符串/目的字符串/g 

第四步:另外两台机器也需要做上面几步操作,第二台机器用8002和8005,第三台机器用8003和8006

第五步:分别启动6个redis实例,然后检查是否启动成功

(1)/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/800*/redis.conf

(2)ps -ef | grep redis 查看是否启动成功

第六步:用redis-cli创建整个redis集群(redis5以前的版本集群是依靠ruby脚本redis-trib.rb实现)

# 下面命令里的1代表为每个创建的主服务器节点创建一个从服务器节点

# 执行这条命令需要确认三台机器之间的redis实例要能相互访问,可以先简单把所有机器防火墙关掉,如果不关闭防火墙则需要打开redis服务端口和集群节点gossip通信端口16379(默认是在redis端口号上加1W)

# 关闭防火墙

# systemctl stop firewalld # 临时关闭防火墙

# systemctl disable firewalld # 禁止开机启动

(1)/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster create --cluster-replicas 1 192.168.0.61:8001 192.168.0.62:8002 192.168.0.63:8003 192.168.0.61:8004 192.168.0.62:8005 192.168.0.63:8006 

第七步:验证集群:

(1)连接任意一个客户端即可:./redis-cli -c -h -p (-a访问服务端密码,-c表示集群模式,指定ip地址和端口号)如:/usr/local/redis-5.0.3/src/redis-cli -a zhuge -c -h 192.168.0.61 -p 800*

(2)进行验证: cluster info(查看集群信息)、cluster nodes(查看节点列表)

(3)进行数据操作验证

(4)关闭集群则需要逐个进行关闭,使用命令:

/usr/local/redis-5.0.3/src/redis-cli -a zhuge -c -h 192.168.0.60 -p 800* shutdown

3、Java操作redis集群

借助redis的java客户端jedis可以操作以上集群,引用jedis版本的maven坐标如下:

<dependency>

    <groupId>redis.clients</groupId>

    <artifactId>jedis</artifactId>

    <version>2.9.0</version>

</dependency>

Java编写访问redis集群的代码非常简单,如下所示:

public class JedisClusterTest {

    public static void main(String[] args) throws IOException {

        JedisPoolConfig config = new JedisPoolConfig();

        config.setMaxTotal(20);

        config.setMaxIdle(10);

        config.setMinIdle(5);

        Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();

        jedisClusterNode.add(new HostAndPort("192.168.0.61", 8001));

        jedisClusterNode.add(new HostAndPort("192.168.0.62", 8002));

        jedisClusterNode.add(new HostAndPort("192.168.0.63", 8003));

        jedisClusterNode.add(new HostAndPort("192.168.0.61", 8004));

        jedisClusterNode.add(new HostAndPort("192.168.0.62", 8005));

        jedisClusterNode.add(new HostAndPort("192.168.0.63", 8006));

        JedisCluster jedisCluster = null;

        try {

            //connectionTimeout:指的是连接一个url的连接等待时间

            //soTimeout:指的是连接上一个url,获取response的返回等待时间

            jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "zhuge", config);

            System.out.println(jedisCluster.set("cluster", "zhuge"));

            System.out.println(jedisCluster.get("cluster"));

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            if (jedisCluster != null)

                jedisCluster.close();

        }

    }

}

运行效果如下:

OK

zhuge

集群的Spring Boot整合Redis连接代码见示例项目:redis-sentinel-cluster

1、引入相关依赖:

<dependency>

   <groupId>org.springframework.boot</groupId>

   <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

<dependency>

   <groupId>org.apache.commons</groupId>

   <artifactId>commons-pool2</artifactId>

</dependency>

springboot项目核心配置:

server:

  port: 8080

spring:

  redis:

    database: 0

    timeout: 3000

    password: zhuge

    cluster:

      nodes: 192.168.0.61:8001,192.168.0.62:8002,192.168.0.63:8003,192.168.0.61:8004,192.168.0.62:8005,192.168.0.63:8006

   lettuce:

      pool:

        max-idle: 50

        min-idle: 10

        max-active: 100

        max-wait: 1000

访问代码:

@RestController

public class IndexController {

    private static final Logger logger = LoggerFactory.getLogger(IndexController.class);

    @Autowired

    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/test_cluster")

    public void testCluster() throws InterruptedException {

       stringRedisTemplate.opsForValue().set("zhuge", "666");

       System.out.println(stringRedisTemplate.opsForValue().get("zhuge"));

    }

}

4、Redis集群原理分析

Redis Cluster 将所有数据划分为 16384 个 slots(槽位),每个节点负责其中一部分槽位。槽位的信息存储于每个节点中。

当 Redis Cluster 的客户端来连接集群时,它也会得到一份集群的槽位配置信息并将其缓存在客户端本地。这样当客户端要查找某个 key 时,可以直接定位到目标节点。同时因为槽位的信息可能会存在客户端与服务器不一致的情况,还需要纠正机制来实现槽位信息的校验调整。

槽位定位算法

Cluster 默认会对 key 值使用 crc16 算法进行 hash 得到一个整数值,然后用这个整数值对 16384 进行取模来得到具体槽位。

HASH_SLOT = CRC16(key) mod 16384

跳转重定位

当客户端向一个错误的节点发出了指令,该节点会发现指令的 key 所在的槽位并不归自己管理,这时它会向客户端发送一个特殊的跳转指令携带目标操作的节点地址,告诉客户端去连这个节点去获取数据。客户端收到指令后除了跳转到正确的节点上去操作,还会同步更新纠正本地的槽位映射表缓存,后续所有 key 将使用新的槽位映射表。

Redis集群节点间的通信机制

redis cluster节点间采取gossip协议进行通信

  • 维护集群的元数据有两种方式:集中式和gossip

集中式:

优点在于元数据的更新和读取,时效性非常好,一旦元数据出现变更立即就会更新到集中式的存储中,其他节点读取的时候立即就可以立即感知到;不足在于所有的元数据的更新压力全部集中在一个地方,可能导致元数据的存储压力。

gossip:

Redis最全详解

gossip协议包含多种消息,包括ping,pong,meet,fail等等。

ping:每个节点都会频繁给其他节点发送ping,其中包含自己的状态还有自己维护的集群元数据,互相通过ping交换元数据;

pong: 返回ping和meet,包含自己的状态和其他信息,也可以用于信息广播和更新;

fail: 某个节点判断另一个节点fail之后,就发送fail给其他节点,通知其他节点,指定的节点宕机了。

meet:某个节点发送meet给新加入的节点,让新节点加入集群中,然后新节点就会开始与其他节点进行通信,不需要发送形成网络的所需的所有CLUSTER MEET命令。发送CLUSTER MEET消息以便每个节点能够达到其他每个节点只需通过一条已知的节点链就够了。由于在心跳包中会交换gossip信息,将会创建节点间缺失的链接。

gossip协议的优点在于元数据的更新比较分散,不是集中在一个地方,更新请求会陆陆续续,打到所有节点上去更新,有一定的延时,降低了压力;缺点在于元数据更新有延时可能导致集群的一些操作会有一些滞后。

  • 10000端口

每个节点都有一个专门用于节点间通信的端口,就是自己提供服务的端口号+10000,比如7001,那么用于节点间通信的就是17001端口。 每个节点每隔一段时间都会往另外几个节点发送ping消息,同时其他几点接收到ping消息之后返回pong消息。

网络抖动

真实世界的机房网络往往并不是风平浪静的,它们经常会发生各种各样的小问题。比如网络抖动就是非常常见的一种现象,突然之间部分连接变得不可访问,然后很快又恢复正常。

为解决这种问题,Redis Cluster 提供了一种选项cluster-node-timeout,表示当某个节点持续 timeout 的时间失联时,才可以认定该节点出现故障,需要进行主从切换。如果没有这个选项,网络抖动会导致主从频繁切换 (数据的重新复制)。

Redis集群选举原理分析

当slave发现自己的master变为FAIL状态时,便尝试进行Failover,以期成为新的master。由于挂掉的master可能会有多个slave,从而存在多个slave竞争成为master节点的过程, 其过程如下:

1.slave发现自己的master变为FAIL

2.将自己记录的集群currentEpoch加1,并广播FAILOVER_AUTH_REQUEST 信息

3.其他节点收到该信息,只有master响应,判断请求者的合法性,并发送FAILOVER_AUTH_ACK,对每一个epoch只发送一次ack

4.尝试failover的slave收集master返回的FAILOVER_AUTH_ACK

5.slave收到超过半数master的ack后变成新Master(这里解释了集群为什么至少需要三个主节点,如果只有两个,当其中一个挂了,只剩一个主节点是不能选举成功的)

6.slave广播Pong消息通知其他集群节点。

从节点并不是在主节点一进入 FAIL 状态就马上尝试发起选举,而是有一定延迟,一定的延迟确保我们等待FAIL状态在集群中传播,slave如果立即尝试选举,其它masters或许尚未意识到FAIL状态,可能会拒绝投票

•延迟计算公式:

DELAY = 500ms + random(0 ~ 500ms) + SLAVE_RANK * 1000ms

•SLAVE_RANK表示此slave已经从master复制数据的总量的rank。Rank越小代表已复制的数据越新。这种方式下,持有最新数据的slave将会首先发起选举(理论上)。

集群是否完整才能对外提供服务

当redis.conf的配置cluster-require-full-coverage为no时,表示当负责一个插槽的主库下线且没有相应的从库进行故障恢复时,集群仍然可用,如果为yes则集群不可用。

Redis集群为什么至少需要三个master节点,并且推荐节点数为奇数?

因为新master的选举需要大于半数的集群master节点同意才能选举成功,如果只有两个master节点,当其中一个挂了,是达不到选举新master的条件的。

奇数个master节点可以在满足选举该条件的基础上节省一个节点,比如三个master节点和四个master节点的集群相比,大家如果都挂了一个master节点都能选举新master节点,如果都挂了两个master节点都没法选举新master节点了,所以奇数的master节点更多的是从节省机器资源角度出发说的。

Redis集群对批量操作命令的支持

对于类似mset,mget这样的多个key的原生批量操作命令,redis集群只支持所有key落在同一slot的情况,如果有多个key一定要用mset命令在redis集群上操作,则可以在key的前面加上{XX},这样参数数据分片hash计算的只会是大括号里的值,这样能确保不同的key能落到统一slot里去,示例如下:

mset {user1}:name zhuge {user1}:age 18

假设name和age计算的hash slot值不一样,但是这条命令在集群下执行,redis只会用大括号里的 user1 做hash slot计算,所以算出来的slot值肯定相同,最后都能落在同一slot。

哨兵leader选举流程

当一个master服务器被某sentinel视为客观下线状态后,该sentinel会与其他sentinel协商选出sentinel的leader进行故障转移工作。每个发现master服务器进入客观下线的sentinel都可以要求其他sentinel选自己为sentinel的leader,选举是先到先得。同时每个sentinel每次选举都会自增配置纪元(选举周期),每个纪元中只会选择一个sentinel的leader。如果所有超过一半的sentinel选举某sentinel作为leader。之后该sentinel进行故障转移操作,从存活的slave中选举出新的master,这个选举过程跟集群的master选举很类似。

哨兵集群只有一个哨兵节点,redis的主从也能正常运行以及选举master,如果master挂了,那唯一的那个哨兵节点就是哨兵leader了,可以正常选举新master。

不过为了高可用一般都推荐至少部署三个哨兵节点。为什么推荐奇数个哨兵节点原理跟集群奇数个master节点类似。

4. Redis高可用集群之水平扩展

Redis3.0以后的版本虽然有了集群功能,提供了比之前版本的哨兵模式更高的性能与可用性,但是集群的水平扩展却比较麻烦,今天就来带大家看看redis高可用集群如何做水平扩展,原始集群(见下图)由6个节点组成,6个节点分布在三台机器上,采用三主三从的模式

Redis最全详解

1、启动集群

# 启动整个集群

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8001/redis.conf

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8002/redis.conf

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8003/redis.conf

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8004/redis.conf

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8005/redis.conf

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8006/redis.conf

# 客户端连接8001端口的redis实例

/usr/local/redis-5.0.3/src/redis-cli -a zhuge -c -h 192.168.0.61 -p 8001

# 查看集群状态

192.168.0.61:8001> cluster  nodes

Redis最全详解

从上图可以看出,整个集群运行正常,三个master节点和三个slave节点,8001端口的实例节点存储0-5460这些hash槽,8002端口的实例节点存储5461-10922这些hash槽,8003端口的实例节点存储10923-16383这些hash槽,这三个master节点存储的所有hash槽组成redis集群的存储槽位,slave点是每个主节点的备份从节点,不显示存储槽位

2、集群操作

我们在原始集群基础上再增加一主(8007)一从(8008),增加节点后的集群参见下图,新增节点用虚线框表示

Redis最全详解

  • 增加redis实例

# 在/usr/local/redis-cluster下创建8007和8008文件夹,并拷贝8001文件夹下的redis.conf文件到8007和8008这两个文件夹下

mkdir 8007 8008

cd 8001

cp redis.conf /usr/local/redis-cluster/8007/

cp redis.conf /usr/local/redis-cluster/8008/

# 修改8007文件夹下的redis.conf配置文件

vim /usr/local/redis-cluster/8007/redis.conf

# 修改如下内容:

port:8007

dir /usr/local/redis-cluster/8007/

cluster-config-file nodes-8007.conf

# 修改8008文件夹下的redis.conf配置文件

vim /usr/local/redis-cluster/8008/redis.conf

修改内容如下:

port:8008

dir /usr/local/redis-cluster/8008/

cluster-config-file nodes8008.conf

# 启动8007和8008俩个服务并查看服务状态

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8007/redis.conf

/usr/local/redis-5.0.3/src/redis-server /usr/local/redis-cluster/8008/redis.conf

ps -el | grep redis
  • 查看redis集群的命令帮助

    cd /usr/local/redis-5.0.3

    src/redis-cli --cluster help

Redis最全详解

1.create:创建一个集群环境host1:port1 ... hostN:portN

2.call:可以执行redis命令

3.add-node:将一个节点添加到集群里,第一个参数为新节点的ip:port,第二个参数为集群中任意一个已经存在的节点的ip:port

4.del-node:移除一个节点

5.reshard:重新分片

6.check:检查集群状态

  • 配置8007为集群主节点

# 使用add-node命令新增一个主节点8007(master),前面的ip:port为新增节点,后面的ip:port为已知存在节点,看到日志最后有"[OK] New node added correctly"提示代表新节点加入成功

/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster add-node 192.168.0.61:8007 192.168.0.61:8001

# 查看集群状态

/usr/local/redis-5.0.3/src/redis-cli -a zhuge -c -h 192.168.0.61 -p 8001

192.168.0.61:8001> cluster nodes

Redis最全详解

注意:当添加节点成功以后,新增的节点不会有任何数据,因为它还没有分配任何的slot(hash槽),我们需要为新节点手工分配hash槽

# 使用redis-cli命令为8007分配hash槽,找到集群中的任意一个主节点,对其进行重新分片工作。

/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster reshard 192.168.0.61:8001

输出如下:

... ...

How many slots do you want to move (from 1 to 16384)? 600

(ps:需要多少个槽移动到新的节点上,自己设置,比如600个hash槽)

What is the receiving node ID? 2728a594a0498e98e4b83a537e19f9a0a3790f38

(ps:把这600个hash槽移动到哪个节点上去,需要指定节点id)

Please enter all the source node IDs.

Type 'all' to use all the nodes as source nodes for the hash slots.

Type 'done' once you entered all the source nodes IDs.

Source node 1:all

(ps:输入all为从所有主节点(8001,8002,8003)中分别抽取相应的槽数指定到新节点中,抽取的总槽数为600个)

... ...

Do you want to proceed with the proposed reshard plan (yes/no)? yes

(ps:输入yes确认开始执行分片任务)

... ...

# 查看下最新的集群状态

/usr/local/redis-5.0.3/src/redis-cli -a zhuge -c -h 192.168.0.61 -p 8001

192.168.0.61:8001> cluster nodes

Redis最全详解

如上图所示,现在我们的8007已经有hash槽了,也就是说可以在8007上进行读写数据啦!到此为止我们的8007已经加入到集群中,并且是主节点(Master)

  • 配置8008为8007的从节点

# 添加从节点8008到集群中去并查看集群状态

/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster add-node 192.168.0.61:8008 192.168.0.61:8001

Redis最全详解

如图所示,还是一个master节点,没有被分配任何的hash槽。

# 我们需要执行replicate命令来指定当前节点(从节点)的主节点id为哪个,首先需要连接新加的8008节点的客户端,然后使用集群命令进行操作,把当前的8008(slave)节点指定到一个主节点下(这里使用之前创建的8007主节点)

/usr/local/redis-5.0.3/src/redis-cli -a zhuge -c -h 192.168.0.61 -p 8008

192.168.0.61:8008> cluster replicate 2728a594a0498e98e4b83a537e19f9a0a3790f38  #后面这串id为8007的节点id

# 查看集群状态,8008节点已成功添加为8007节点的从节点

Redis最全详解

  • 删除8008从节点

# 用del-node删除从节点8008,指定删除节点ip和端口,以及节点id(红色为8008节点id)

/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster del-node 192.168.0.61:8008 a1cfe35722d151cf70585cee21275565393c0956

# 再次查看集群状态,如下图所示,8008这个slave节点已经移除,并且该节点的redis服务也已被停止

Redis最全详解

  • 删除8007主节点

最后,我们尝试删除之前加入的主节点8007,这个步骤相对比较麻烦一些,因为主节点的里面是有分配了hash槽的,所以我们这里必须先把8007里的hash槽放入到其他的可用主节点中去,然后再进行移除节点操作,不然会出现数据丢失问题(目前只能把master的数据迁移到一个节点上,暂时做不了平均分配功能),执行命令如下:

/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster reshard 192.168.0.61:8007

输出如下:

... ...

How many slots do you want to move (from 1 to 16384)? 600

What is the receiving node ID? dfca1388f124dec92f394a7cc85cf98cfa02f86f

(ps:这里是需要把数据移动到哪?8001的主节点id)

Please enter all the source node IDs.

Type 'all' to use all the nodes as source nodes for the hash slots.

Type 'done' once you entered all the source nodes IDs.

Source node 1:2728a594a0498e98e4b83a537e19f9a0a3790f38

(ps:这里是需要数据源,也就是我们的8007节点id)

Source node 2:done

(ps:这里直接输入done 开始生成迁移计划)

... ...

Do you want to proceed with the proposed reshard plan (yes/no)? Yes

(ps:这里输入yes开始迁移)

至此,我们已经成功的把8007主节点的数据迁移到8001上去了,我们可以看一下现在的集群状态如下图,你会发现8007下面已经没有任何hash槽了,证明迁移成功!

Redis最全详解

# 最后我们直接使用del-node命令删除8007主节点即可

/usr/local/redis-5.0.3/src/redis-cli -a zhuge --cluster del-node 192.168.0.61:8007 2728a594a0498e98e4b83a537e19f9a0a3790f38

# 查看集群状态,一切还原为最初始状态啦!大功告成!

Redis最全详解

5.Redis缓存设计与性能优化

多级缓存架构

Redis最全详解

缓存设计

缓存穿透

缓存穿透是指查询一个根本不存在的数据, 缓存层和存储层都不会命中, 通常出于容错的考虑, 如果从存储层查不到数据则不写入缓存层。

缓存穿透将导致不存在的数据每次请求都要到存储层去查询, 失去了缓存保护后端存储的意义。

造成缓存穿透的基本原因有两个:

第一, 自身业务代码或者数据出现问题。

第二, 一些恶意攻击、 爬虫等造成大量空命中。

缓存穿透问题解决方案:

1、缓存空对象

String get(String key) {

    // 从缓存中获取数据

    String cacheValue = cache.get(key);

    // 缓存为空

    if (StringUtils.isBlank(cacheValue)) {

        // 从存储中获取

        String storageValue = storage.get(key);

        cache.set(key, storageValue);

        // 如果存储数据为空, 需要设置一个过期时间(300秒)

        if (storageValue == null) {

            cache.expire(key, 60 * 5);

        }

        return storageValue;

    } else {

        // 缓存非空

        return cacheValue;

    }

}

2、布隆过滤器

对于恶意攻击,向服务器请求大量不存在的数据造成的缓存穿透,还可以用布隆过滤器先做一次过滤,对于不存在的数据布隆过滤器一般都能够过滤掉,不让请求再往后端发送。当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在。

Redis最全详解

布隆过滤器就是一个大型的位数组和几个不一样的无偏 hash 函数。所谓无偏就是能够把元素的 hash 值算得比较均匀。

向布隆过滤器中添加 key 时,会使用多个 hash 函数对 key 进行 hash 算得一个整数索引值然后对位数组长度进行取模运算得到一个位置,每个 hash 函数都会算得一个不同的位置。再把位数组的这几个位置都置为 1 就完成了 add 操作。

向布隆过滤器询问 key 是否存在时,跟 add 一样,也会把 hash 的几个位置都算出来,看看位数组中这几个位置是否都为 1,只要有一个位为 0,那么说明布隆过滤器中这个key 不存在。如果都是 1,这并不能说明这个 key 就一定存在,只是极有可能存在,因为这些位被置为 1 可能是因为其它的 key 存在所致。如果这个位数组比较稀疏,这个概率就会很大,如果这个位数组比较拥挤,这个概率就会降低。

这种方法适用于数据命中不高、 数据相对固定、 实时性低(通常是数据集较大) 的应用场景, 代码维护较为复杂, 但是缓存空间占用很少。

可以用guvua包自带的布隆过滤器,引入依赖:

<dependency>

    <groupId>com.google.guava</groupId>

    <artifactId>guava</artifactId>

    <version>22.0</version>

</dependency>

示例伪代码:

import com.google.common.hash.BloomFilter;

//初始化布隆过滤器

//1000:期望存入的数据个数,0.001:期望的误差率

BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")), 1000, 0.001);  

//把所有数据存入布隆过滤器

void init(){

    for (String key: keys) {

        bloomFilter.put(key);

    }

}

String get(String key) {

    // 从布隆过滤器这一级缓存判断下key是否存在

    Boolean exist = bloomFilter.mightContain(key);

    if(!exist){

        return "";

    }

    // 从缓存中获取数据

    String cacheValue = cache.get(key);

    // 缓存为空

    if (StringUtils.isBlank(cacheValue)) {

        // 从存储中获取

        String storageValue = storage.get(key);

        cache.set(key, storageValue);

        // 如果存储数据为空, 需要设置一个过期时间(300秒)

        if (storageValue == null) {

            cache.expire(key, 60 * 5);

        }

        return storageValue;

    } else {

        // 缓存非空

        return cacheValue;

    }

}

缓存失效

由于大批量缓存在同一时间失效可能导致大量请求同时穿透缓存直达数据库,可能会造成数据库瞬间压力过大甚至挂掉,对于这种情况我们在批量增加缓存时最好将这一批数据的缓存过期时间设置为一个时间段内的不同时间。

示例伪代码:

String get(String key) {

    // 从缓存中获取数据

    String cacheValue = cache.get(key);

    // 缓存为空

    if (StringUtils.isBlank(cacheValue)) {

        // 从存储中获取

        String storageValue = storage.get(key);

        cache.set(key, storageValue);

        //设置一个过期时间(300到600之间的一个随机数)

        int expireTime = new Random().nextInt(300)  + 300;

        if (storageValue == null) {

            cache.expire(key, expireTime);

        }

        return storageValue;

    } else {

        // 缓存非空

        return cacheValue;

    }

}

缓存雪崩

缓存雪崩指的是缓存层支撑不住或宕掉后, 流量会像奔逃的野牛一样, 打向后端存储层。

由于缓存层承载着大量请求, 有效地保护了存储层, 但是如果缓存层由于某些原因不能提供服务(比如超大并发过来,缓存层支撑不住,或者由于缓存设计不好,类似大量请求访问bigkey,导致缓存能支撑的并发急剧下降), 于是大量请求都会达到存储层, 存储层的调用量会暴增, 造成存储层也会级联宕机的情况。

预防和解决缓存雪崩问题, 可以从以下三个方面进行着手。

1) 保证缓存层服务高可用性,比如使用Redis Sentinel或Redis Cluster。

2) 依赖隔离组件为后端限流并降级。比如使用Hystrix限流降级组件。

3) 提前演练。 在项目上线前, 演练缓存层宕掉后, 应用以及后端的负载情况以及可能出现的问题, 在此基础上做一些预案设定。

热点缓存key重建优化

开发人员使用“缓存+过期时间”的策略既可以加速数据读写, 又保证数据的定期更新, 这种模式基本能够满足绝大部分需求。 但是有两个问题如果同时出现, 可能就会对应用造成致命的危害:

  • 当前key是一个热点key(例如一个热门的娱乐新闻),并发量非常大。
  • 重建缓存不能在短时间完成, 可能是一个复杂计算, 例如复杂的SQL、 多次IO、 多个依赖等。

在缓存失效的瞬间, 有大量线程来重建缓存, 造成后端负载加大, 甚至可能会让应用崩溃。

要解决这个问题主要就是要避免大量线程同时重建缓存。

我们可以利用互斥锁来解决,此方法只允许一个线程重建缓存, 其他线程等待重建缓存的线程执行完, 重新从缓存获取数据即可。

示例伪代码:

String get(String key) {

    // 从Redis中获取数据

    String value = redis.get(key);

    // 如果value为空, 则开始重构缓存

    if (value == null) {

        // 只允许一个线程重建缓存, 使用nx, 并设置过期时间ex

        String mutexKey = "mutext:key:" + key;

        if (redis.set(mutexKey, "1", "ex 180", "nx")) {

             // 从数据源获取数据

            value = db.get(key);

            // 回写Redis, 并设置过期时间

            redis.setex(key, timeout, value);

            // 删除key_mutex

            redis.delete(mutexKey);

        }// 其他线程休息50毫秒后重试

        else {

            Thread.sleep(50);

            get(key);

        }

    }

    return value;

}

开发规范与性能优化

一、键值设计

1. key名设计

  • (1)【建议】: 可读性和可管理性

以业务名(或数据库名)为前缀(防止key冲突),用冒号分隔,比如业务名:表名:id

trade:order:1
  • (2)【建议】:简洁性

保证语义的前提下,控制key的长度,当key较多时,内存占用也不容忽视,例如:

user:{uid}:friends:messages:{mid} 简化为 u:{uid}:fr:m:{mid}
  • (3)【强制】:不要包含特殊字符

反例:包含空格、换行、单双引号以及其他转义字符

2. value设计

  • (1)【强制】:拒绝bigkey(防止网卡流量、慢查询)

在Redis中,一个字符串最大512MB,一个二级数据结构(例如hash、list、set、zset)可以存储大约40亿个(2^32-1)个元素,但实际中如果下面两种情况,我就会认为它是bigkey。

  1. 字符串类型:它的big体现在单个value值很大,一般认为超过10KB就是bigkey。
  2. 非字符串类型:哈希、列表、集合、有序集合,它们的big体现在元素个数太多。

一般来说,string类型控制在10KB以内,hash、list、set、zset元素个数不要超过5000。

反例:一个包含200万个元素的list。

非字符串的bigkey,不要使用del删除,使用hscan、sscan、zscan方式渐进式删除,同时要注意防止bigkey过期时间自动删除问题(例如一个200万的zset设置1小时过期,会触发del操作,造成阻塞)

bigkey的危害:

1.导致redis阻塞

2.网络拥塞

bigkey也就意味着每次获取要产生的网络流量较大,假设一个bigkey为1MB,客户端每秒访问量为1000,那么每秒产生1000MB的流量,对于普通的千兆网卡(按照字节算是128MB/s)的服务器来说简直是灭顶之灾,而且一般服务器会采用单机多实例的方式来部署,也就是说一个bigkey可能会对其他实例也造成影响,其后果不堪设想。

3. 过期删除

有个bigkey,它安分守己(只执行简单的命令,例如hget、lpop、zscore等),但它设置了过期时间,当它过期后,会被删除,如果没有使用Redis 4.0的过期异步删除(lazyfree-lazy-expire yes),就会存在阻塞Redis的可能性。

bigkey的产生:

一般来说,bigkey的产生都是由于程序设计不当,或者对于数据规模预料不清楚造成的,来看几个例子:

(1) 社交类:粉丝列表,如果某些明星或者大v不精心设计下,必是bigkey。

(2) 统计类:例如按天存储某项功能或者网站的用户集合,除非没几个人用,否则必是bigkey。

(3) 缓存类:将数据从数据库load出来序列化放到Redis里,这个方式非常常用,但有两个地方需要注意,第一,是不是有必要把所有字段都缓存;第二,有没有相关关联的数据,有的同学为了图方便把相关数据都存一个key下,产生bigkey。

如何优化bigkey

1. 拆

big list: list1、list2、...listN

big hash:可以讲数据分段存储,比如一个大的key,假设存了1百万的用户数据,可以拆分成200个key,每个key下面存放5000个用户数据

2. 如果bigkey不可避免,也要思考一下要不要每次把所有元素都取出来(例如有时候仅仅需要hmget,而不是hgetall),删除也是一样,尽量使用优雅的方式来处理。

  • (2)【推荐】:选择适合的数据类型。

例如:实体类型(要合理控制和使用数据结构,但也要注意节省内存和性能之间的平衡)

反例:

set user:1:name tom

set user:1:age 19

set user:1:favor football

正例:

hmset user:1 name tom age 19 favor football

3.【推荐】:控制key的生命周期,redis不是垃圾桶。

建议使用expire设置过期时间(条件允许可以打散过期时间,防止集中过期)。

二、命令使用

1.【推荐】 O(N)命令关注N的数量

例如hgetall、lrange、smembers、zrange、sinter等并非不能使用,但是需要明确N的值。有遍历的需求可以使用hscan、sscan、zscan代替。

2.【推荐】:禁用命令

禁止线上使用keys、flushall、flushdb等,通过redis的rename机制禁掉命令,或者使用scan的方式渐进式处理。

3.【推荐】合理使用select

redis的多数据库较弱,使用数字进行区分,很多客户端支持较差,同时多业务用多数据库实际还是单线程处理,会有干扰。

4.【推荐】使用批量操作提高效率

原生命令:例如mget、mset。

非原生命令:可以使用pipeline提高效率。

但要注意控制一次批量操作的元素个数(例如500以内,实际也和元素字节数有关)。

注意两者不同:

1. 原生是原子操作,pipeline是非原子操作。

2. pipeline可以打包不同的命令,原生做不到

3. pipeline需要客户端和服务端同时支持。

5.【建议】Redis事务功能较弱,不建议过多使用,可以用lua替代

三、客户端使用

1.【推荐】

避免多个应用使用一个Redis实例

正例:不相干的业务拆分,公共数据做服务化。

2.【推荐】

使用带有连接池的数据库,可以有效控制连接,同时提高效率,标准使用方式:

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

jedisPoolConfig.setMaxTotal(5);

jedisPoolConfig.setMaxIdle(2);

jedisPoolConfig.setTestOnBorrow(true);

JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.0.60", 6379, 3000, null);

Jedis jedis = null;

try {

    jedis = jedisPool.getResource();

    //具体的命令

    jedis.executeCommand()

} catch (Exception e) {

    logger.error("op key {} error: " + e.getMessage(), key, e);

} finally {

    //注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。

    if (jedis != null) 

        jedis.close();

}

连接池参数含义:

序号

参数名

含义

默认值

使用建议

1

maxTotal

资源池中最大连接数

8

设置建议见下面

2

maxIdle

资源池允许最大空闲的连接数

8

设置建议见下面

3

minIdle

资源池确保最少空闲的连接数

0

设置建议见下面

4

blockWhenExhausted

当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

true

建议使用默认值

5

maxWaitMillis

当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

-1:表示永不超时

不建议使用默认值

6

testOnBorrow

向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

false

业务量很大时候建议设置为false(多一次ping的开销)。

7

testOnReturn

向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

false

业务量很大时候建议设置为false(多一次ping的开销)。

8

jmxEnabled

是否开启jmx监控,可用于监控

true

建议开启,但应用本身也要开启

优化建议:

1)maxTotal:最大连接数,早期的版本叫maxActive

实际上这个是一个很难回答的问题,考虑的因素比较多:

  • 业务希望Redis并发量
  • 客户端执行命令时间
  • Redis资源:例如 nodes(例如应用个数) * maxTotal 是不能超过redis的最大连接数maxclients。
  • 资源开销:例如虽然希望控制空闲连接(连接池此刻可马上使用的连接),但是不希望因为连接池的频繁释放创建连接造成不必靠开销。

以一个例子说明,假设:

  • 一次命令时间(borrow|return resource + Jedis执行命令(含网络) )的平均耗时约为1ms,一个连接的QPS大约是1000
  • 业务期望的QPS是50000

那么理论上需要的资源池大小是50000 / 1000 = 50个。但事实上这是个理论值,还要考虑到要比理论值预留一些资源,通常来讲maxTotal可以比理论值大一些。

但这个值不是越大越好,一方面连接太多占用客户端和服务端资源,另一方面对于Redis这种高QPS的服务器,一个大命令的阻塞即使设置再大资源池仍然会无济于事。

2)maxIdle和minIdle

maxIdle实际上才是业务需要的最大连接数,maxTotal是为了给出余量,所以maxIdle不要设置过小,否则会有new Jedis(新连接)开销。

连接池的最佳性能是maxTotal = maxIdle,这样就避免连接池伸缩带来的性能干扰。但是如果并发量不大或者maxTotal设置过高,会导致不必要的连接资源浪费。一般推荐maxIdle可以设置为按上面的业务期望QPS计算出来的理论连接数,maxTotal可以再放大一倍。

minIdle(最小空闲连接数),与其说是最小空闲连接数,不如说是"至少需要保持的空闲连接数",在使用连接的过程中,如果连接数超过了minIdle,那么继续建立连接,如果超过了maxIdle,当超过的连接执行完业务后会慢慢被移出连接池释放掉。

如果系统启动完马上就会有很多的请求过来,那么可以给redis连接池做预热,比如快速的创建一些redis连接,执行简单命令,类似ping(),快速的将连接池里的空闲连接提升到minIdle的数量。

连接池预热示例代码:

List<Jedis> minIdleJedisList = new ArrayList<Jedis>(jedisPoolConfig.getMinIdle());

for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {

    Jedis jedis = null;

    try {

        jedis = pool.getResource();

        minIdleJedisList.add(jedis);

        jedis.ping();

    } catch (Exception e) {

        logger.error(e.getMessage(), e);

    } finally {

        //注意,这里不能马上close将连接还回连接池,否则最后连接池里只会建立1个连接。。

        //jedis.close();

    }

}

//统一将预热的连接还回连接池

for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {

    Jedis jedis = null;

    try {

        jedis = minIdleJedisList.get(i);

        //将连接归还回连接池

        jedis.close();

    } catch (Exception e) {

        logger.error(e.getMessage(), e);

    } finally {

    }

}

总之,要根据实际系统的QPS和调用redis客户端的规模整体评估每个节点所使用的连接池大小。

3.【建议】

高并发下建议客户端添加熔断功能(例如netflix hystrix)

4.【推荐】

设置合理的密码,如有必要可以使用SSL加密访问

5.【建议】

Redis对于过期键有三种清除策略:

  • 被动删除:当读/写一个已经过期的key时,会触发惰性删除策略,直接删除掉这个过期key
  • 主动删除:由于惰性删除策略无法保证冷数据被及时删掉,所以Redis会定期主动淘汰一批已过期的key
  • 当前已用内存超过maxmemory限定时,触发主动清理策略

当REDIS运行在主从模式时,只有主结点才会执行被动和主动这两种过期删除策略,然后把删除操作”del key”同步到从结点。

第三种策略的情况如下:

当前已用内存超过maxmemory限定时,会触发主动清理策略

根据自身业务类型,选好maxmemory-policy(最大内存淘汰策略),设置好过期时间。如果不设置最大内存,当 Redis 内存超出物理内存限制时,内存的数据会开始和磁盘产生频繁的交换 (swap),会让 Redis 的性能急剧下降。

默认策略是volatile-lru,即超过最大内存后,在过期键中使用lru算法进行key的剔除,保证不过期数据不被删除,但是可能会出现OOM问题。

其他策略如下:

  • allkeys-lru:根据LRU算法删除键,不管数据有没有设置超时属性,直到腾出足够空间为止。
  • allkeys-random:随机删除所有键,直到腾出足够空间为止。
  • volatile-random: 随机删除过期键,直到腾出足够空间为止。
  • volatile-ttl:根据键值对象的ttl属性,删除最近将要过期数据。如果没有,回退到noeviction策略。
  • noeviction:不会剔除任何数据,拒绝所有写入操作并返回客户端错误信息"(error) OOM command not allowed when used memory",此时Redis只响应读操作。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这