10.cluster读写key

可莉
• 阅读 606

Redis cluster的思路是,当需要存储一个key时,先将key模拟发布在一个slot槽,在将key实际分布在slot关联的实际服务器上.

10.cluster读写key

从头到尾看一下,跟key读写相关的源码.

int main(int argc, char **argv) 
{
    .
    .
    .
    /* 确保只有0号数据库里有数据 */
    if (server.cluster_enabled) {
        if (verifyClusterConfigWithData() == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "You can't have keys in a DB different than DB 0 when in "
                "Cluster mode. Exiting.");
            exit(1);
        }
    }
    .
    .
    .
}



/* 检查当前节点的节点配置是否正确,包含的数据是否正确 */
int verifyClusterConfigWithData(void) {
    int j;
    int update_config = 0;

    if (nodeIsSlave(myself)) return REDIS_OK;//不对从节点进行检查

    /* 确保只有0号数据库有数据 */
    for (j = 1; j < server.dbnum; j++) {
        if (dictSize(server.db[j].dict)) return REDIS_ERR;
    }

    /* 检查槽表是否都有相应的节点,如果不是的话,进行修复 */
    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
        if (!countKeysInSlot(j)) continue; //空槽直接跳过
        
        if (server.cluster->slots[j] == myself ||server.cluster->importing_slots_from[j] != NULL) continue;// 跳过正在导入的槽

        /* If we are here data and cluster config don't agree, and we have
         * slot 'j' populated even if we are not importing it, nor we are
         * assigned to this slot. Fix this condition. */

        update_config++;
        if (server.cluster->slots[j] == NULL) {
            /* 处理未被分配的槽 */
            redisLog(REDIS_WARNING, "I've keys about slot %d that is "
                                    "unassigned. Taking responsability "
                                    "for it.",j);
            clusterAddSlot(myself,j);
        } else {
            /* 如果一个槽已经被其他节点接管,那么将槽中的资料发送给对方 */
            redisLog(REDIS_WARNING, "I've keys about slot %d that is "
                                    "already assigned to a different node. "
                                    "Setting it in importing state.",j);
            server.cluster->importing_slots_from[j] = server.cluster->slots[j];
        }
    }

    if (update_config) clusterSaveConfigOrDie(1);//保存 nodes.conf 文件

    return REDIS_OK;
}

当客户端传来一条命令时,会执行processCommand()函数,该函数与cluster相关的代码如下:

int processCommand(redisClient *c) 
{
    .
    .
    .
    /*
     * 如果开启了集群模式,那么在这里进行转向操作。
     * 不过,如果有以下情况出现,那么节点不进行转向:
     * 1) 命令的发送者是本节点的主节点
     * 2) 命令没有 key 参数 
     */
    if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {//集群已下线
            flagTransaction(c);
            addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
            return REDIS_OK;

        /* 集群运作正常 */
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            // 不能执行多键处理命令
            if (n == NULL) {
                flagTransaction(c);
                if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns mutliple keys in the same slot,
                     * but the slot is not "stable" currently as there is
                     * a migration or import in progress. */
                    addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                } else {
                    redisPanic("getNodeByQuery() unknown error.");
                }
                return REDIS_OK;

            // 命令针对的槽和键不是本节点处理的,进行转向
            } else if (n != server.cluster->myself) {
                flagTransaction(c);
                /* 例如 -ASK 10086 127.0.0.1:12345 */
                addReplySds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d\r\n", (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                    hashslot,n->ip,n->port));

                return REDIS_OK;
            }

            // 如果执行到这里,说明键 key 所在的槽由本节点处理
            // 或者客户端执行的是无参数命令
        }
    }
    .
    .
    .
}

重点就是getNodeByQuery(),该函数可以根据客户端输入的命令,计算要映射到哪个slot槽上

clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {

    // 初始化为 NULL ,
    // 如果输入命令是无参数命令,那么 n 就会继续为 NULL
    clusterNode *n = NULL;

    robj *firstkey = NULL;
    int multiple_keys = 0;
    multiState *ms, _ms;
    multiCmd mc;
    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

    /* Set error code optimistically for the base case. */
    if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;

    /* We handle all the cases as if they were EXEC commands, so we have
     * a common code path for everything */
    // 集群可以执行事务,
    // 但必须确保事务中的所有命令都是针对某个相同的键进行的
    // 这个 if 和接下来的 for 进行的就是这一合法性检测
    if (cmd->proc == execCommand) {
        /* If REDIS_MULTI flag is not set EXEC is just going to return an
         * error. */
        if (!(c->flags & REDIS_MULTI)) return myself;
        ms = &c->mstate;
    } else {
        /* In order to have a single codepath create a fake Multi State
         * structure if the client is not in MULTI/EXEC state, this way
         * we have a single codepath below. */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1;
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd;
    }

    /* Check that all the keys are in the same hash slot, and obtain this
     * slot and the node associated. */
    for (i = 0; i < ms->count; i++) {
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;

        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;

        // 定位命令的键位置
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        // 遍历命令中的所有键
        for (j = 0; j < numkeys; j++) {
            robj *thiskey = margv[keyindex[j]];
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

            if (firstkey == NULL) {
                // 这是事务中第一个被处理的键
                // 获取该键的槽和负责处理该槽的节点
                /* This is the first key we see. Check what is the slot
                 * and node. */
                firstkey = thiskey;
                slot = thisslot;
                n = server.cluster->slots[slot];
                redisAssertWithInfo(c,firstkey,n != NULL);
                /* If we are migrating or importing this slot, we need to check
                 * if we have all the keys in the request (the only way we
                 * can safely serve the request, otherwise we return a TRYAGAIN
                 * error). To do so we set the importing/migrating state and
                 * increment a counter for every missing key. */
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {
                /* If it is not the first key, make sure it is exactly
                 * the same key as the first we saw. */
                if (!equalStringObjects(firstkey,thiskey)) {
                    if (slot != thisslot) {
                        /* Error: multiple keys from different slots. */
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                    } else {
                        /* Flag this request as one with multiple different
                         * keys. */
                        multiple_keys = 1;
                    }
                }
            }

            /* Migarting / Improrting slot? Count keys we don't have. */
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(keyindex);
    }

    /* No key at all in command? then we can serve the request
     * without redirections or errors. */
    if (n == NULL) return myself;

    /* Return the hashslot by reference. */
    if (hashslot) *hashslot = slot;

    /* This request is about a slot we are migrating into another instance?
     * Then if we have all the keys. */

    /* If we don't have all the keys and we are migrating the slot, send
     * an ASK redirection. */
    if (migrating_slot && missing_keys) {
        if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    }

    /* If we are receiving the slot, and the client correctly flagged the
     * request as "ASKING", we can serve the request. However if the request
     * involves multiple keys and we don't have them all, the only option is
     * to send a TRYAGAIN error. */
    if (importing_slot &&
        (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }

    /* Handle the read-only client case reading from a slave: if this
     * node is a slave and the request is about an hash slot our master
     * is serving, we can reply without redirection. */
    if (c->flags & REDIS_READONLY &&
        cmd->flags & REDIS_CMD_READONLY &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    {
        return myself;
    }

    /* Base case: just return the right node. However if this node is not
     * myself, set error_code to MOVED since we need to issue a rediretion. */
    if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;

    // 返回负责处理槽 slot 的节点 n
    return n;
}



// 计算给定键应该被分配到那个槽
unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;

    /* No '{' ? Hash the whole key. This is the base case. */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* No '}' or nothing betweeen {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
GoJS API学习
varnode{};node"key""节点Key";node"loc""00";//节点坐标node"text""节点名称";//添加节点通过按钮点击,添加新的节点到画布myDiagram.model.addNodeData(nod
Stella981 Stella981
3年前
SpringBoot使用RedisTemplate操作Redis时,key值出现 -xac-xed-x00-x05t-x00-tb
原因分析原因与RedisTemplate源码中的默认序列化方式有关defaultSerializernewJdkSerializationRedisSerializer(classLoader!null?classLoader:this.getClass().getClassLoader()
Stella981 Stella981
3年前
JS 对象数组Array 根据对象object key的值排序sort,很风骚哦
有个js对象数组varary\{id:1,name:"b"},{id:2,name:"b"}\需求是根据name或者id的值来排序,这里有个风骚的函数函数定义:function keysrt(key,desc) {  return function(a,b){    return desc ? ~~(ak
Stella981 Stella981
3年前
CAS 实现站内单点登录及实现第三方 OAuth、OpenId 登录(四)
一、OAuth配置1.配置OAuth提供商<bean id"weibo" class"com.buession.oauth.provider.impl.WeiboProvider"    <property name"key" value"the_key_for_
Easter79 Easter79
3年前
SpringBoot使用RedisTemplate操作Redis时,key值出现 -xac-xed-x00-x05t-x00-tb
原因分析原因与RedisTemplate源码中的默认序列化方式有关defaultSerializernewJdkSerializationRedisSerializer(classLoader!null?classLoader:this.getClass().getClassLoader()
Wesley13 Wesley13
3年前
mysql select将多个字段横向合拼到一个字段
表模式:CREATE TABLE tbl_user (  id int(11) NOT NULL AUTO_INCREMENT,  name varchar(255) DEFAULT NULL,  age int(11) DEFAULT NULL,  PRIMARY KEY (id)
Wesley13 Wesley13
3年前
JS自定义结构体数组
废话不多说,直接上代码<script   vars\_keySearch{     key\_name:Array(),     key\_index:Array(),     key\_count:Array(),     key\_scount:Array()  };    for