Redis cluster的思路是,当需要存储一个key时,先将key模拟发布在一个slot槽,在将key实际分布在slot关联的实际服务器上.
从头到尾看一下,跟key读写相关的源码.
int main(int argc, char **argv)
{
.
.
.
/* 确保只有0号数据库里有数据 */
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == REDIS_ERR) {
redisLog(REDIS_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
.
.
.
}
/* 检查当前节点的节点配置是否正确,包含的数据是否正确 */
int verifyClusterConfigWithData(void) {
int j;
int update_config = 0;
if (nodeIsSlave(myself)) return REDIS_OK;//不对从节点进行检查
/* 确保只有0号数据库有数据 */
for (j = 1; j < server.dbnum; j++) {
if (dictSize(server.db[j].dict)) return REDIS_ERR;
}
/* 检查槽表是否都有相应的节点,如果不是的话,进行修复 */
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
if (!countKeysInSlot(j)) continue; //空槽直接跳过
if (server.cluster->slots[j] == myself ||server.cluster->importing_slots_from[j] != NULL) continue;// 跳过正在导入的槽
/* If we are here data and cluster config don't agree, and we have
* slot 'j' populated even if we are not importing it, nor we are
* assigned to this slot. Fix this condition. */
update_config++;
if (server.cluster->slots[j] == NULL) {
/* 处理未被分配的槽 */
redisLog(REDIS_WARNING, "I've keys about slot %d that is "
"unassigned. Taking responsability "
"for it.",j);
clusterAddSlot(myself,j);
} else {
/* 如果一个槽已经被其他节点接管,那么将槽中的资料发送给对方 */
redisLog(REDIS_WARNING, "I've keys about slot %d that is "
"already assigned to a different node. "
"Setting it in importing state.",j);
server.cluster->importing_slots_from[j] = server.cluster->slots[j];
}
}
if (update_config) clusterSaveConfigOrDie(1);//保存 nodes.conf 文件
return REDIS_OK;
}
当客户端传来一条命令时,会执行processCommand()函数,该函数与cluster相关的代码如下:
int processCommand(redisClient *c)
{
.
.
.
/*
* 如果开启了集群模式,那么在这里进行转向操作。
* 不过,如果有以下情况出现,那么节点不进行转向:
* 1) 命令的发送者是本节点的主节点
* 2) 命令没有 key 参数
*/
if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
if (server.cluster->state != REDIS_CLUSTER_OK) {//集群已下线
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
return REDIS_OK;
/* 集群运作正常 */
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
// 不能执行多键处理命令
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;
// 命令针对的槽和键不是本节点处理的,进行转向
} else if (n != server.cluster->myself) {
flagTransaction(c);
/* 例如 -ASK 10086 127.0.0.1:12345 */
addReplySds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d\r\n", (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
return REDIS_OK;
}
// 如果执行到这里,说明键 key 所在的槽由本节点处理
// 或者客户端执行的是无参数命令
}
}
.
.
.
}
重点就是getNodeByQuery(),该函数可以根据客户端输入的命令,计算要映射到哪个slot槽上
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
// 初始化为 NULL ,
// 如果输入命令是无参数命令,那么 n 就会继续为 NULL
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
// 集群可以执行事务,
// 但必须确保事务中的所有命令都是针对某个相同的键进行的
// 这个 if 和接下来的 for 进行的就是这一合法性检测
if (cmd->proc == execCommand) {
/* If REDIS_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & REDIS_MULTI)) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, *keyindex, numkeys, j;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
// 定位命令的键位置
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
// 遍历命令中的所有键
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
// 这是事务中第一个被处理的键
// 获取该键的槽和负责处理该槽的节点
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
redisAssertWithInfo(c,firstkey,n != NULL);
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
/* If it is not the first key, make sure it is exactly
* the same key as the first we saw. */
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(keyindex);
if (error_code)
*error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
/* Flag this request as one with multiple different
* keys. */
multiple_keys = 1;
}
}
}
/* Migarting / Improrting slot? Count keys we don't have. */
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
}
/* No key at all in command? then we can serve the request
* without redirections or errors. */
if (n == NULL) return myself;
/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
/* This request is about a slot we are migrating into another instance?
* Then if we have all the keys. */
/* If we don't have all the keys and we are migrating the slot, send
* an ASK redirection. */
if (migrating_slot && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
/* If we are receiving the slot, and the client correctly flagged the
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot &&
(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about an hash slot our master
* is serving, we can reply without redirection. */
if (c->flags & REDIS_READONLY &&
cmd->flags & REDIS_CMD_READONLY &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
/* Base case: just return the right node. However if this node is not
* myself, set error_code to MOVED since we need to issue a rediretion. */
if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
// 返回负责处理槽 slot 的节点 n
return n;
}
// 计算给定键应该被分配到那个槽
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing betweeen {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}