Redis原理-源码解析:数据结构3 sorted set(zset))

  created  by  鱼鱼 {{tag}}
创建于 2021年02月25日 19:04:02 最后修改于 2021年02月28日 00:11:31

    Redis的set数据结构在此不多讲,同Java中原理一样,set也可以理解为是hash剥离了value的数据结构,即同为dic。但是zset(有序集合)其实在底层原理上完全不同于set。

    所有原理实现基于Redis版本6.0.9。

创建命令实现分析数据结构

    先看一下基本的指令实现,着重注意中文注解的地方。

    t_zset.c

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    int added = 0;     
    int updated = 0;    
    int processed = 0; 
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else if (!strcasecmp(opt,"gt")) flags |= ZADD_GT;
        else if (!strcasecmp(opt,"lt")) flags |= ZADD_LT;
        else break;
        scoreidx++;
    }
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;
    int gt = (flags & ZADD_GT) != 0;
    int lt = (flags & ZADD_LT) != 0;
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    if ((gt && nx) || (lt && nx) || (gt && lt)) {
        addReplyError(c,
            "GT, LT, and/or NX options at the same time are not compatible");
        return;
    }
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    //判断数据大小并选择数据类型
    if (zobj == NULL) {
        if (xx) goto reply_to_client; 
        //server.zset_max_ziplist_entries 可配置,最小节点数 默认128
        //server.zset_max_ziplist_value 可配置,最小字符串 默认64
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {    
         //节点多或字符串大时,创建zset
            zobj = createZsetObject();
        } else {
         //创建ziplist
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    }
        //遍历要插入的数据
    for (j = 0; j < elements; j++) {
        double newscore;
        //第j条数据的score
        score = scores[j];
        int retflags = flags;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        //调用zadd 实际执行插入的地方
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

    可以看出zset的数据结构不是固定的,在其元素数或是元素的字符串过长时,其结构为zset;否则使用ziplist数据结构(像hash一样为了节省空间),二者的创建方法如下:

//元素不大
robj *createZsetObject(void) {
    //创建了一个zset结构
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;
    //为zset创建一个dict
    zs->dict = dictCreate(&zsetDictType,NULL);
    //为zset创建一个skiplist
    zs->zsl = zslCreate();
    o = createObject(OBJ_ZSET,zs);
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}
//元素过大
robj *createZsetZiplistObject(void) {
    //新建ziplist 没什么好说的
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_ZSET,zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

    ziplist的代码和原理可以参考我的博客Redis原理-源码解析:数据结构2 list-鱼鱼的Java小站,就是一个节省内存的压缩的链表结构。这里着重分析skiplist。

zset

    这是zset的数据结构,包括一个skiplist和一个dic:

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

    其中zskiplist是跳跃表,用于存储skiplist的数据,dict用于存储value和score的映射指针,是一个冗余的结构,以便于以O(1)的时间复杂度查询score;

skiplist

    zskiplist的数据结构其实就是一个以链表为基础的跳跃表:

//level代表最高的层数
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    //可能有多个向后的链表,span表示跨度
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

skiplist是在链表的基础上,每个节点有不定数量的指针指向后面的跨越了多个的链表节点。大致结构如图:

图1 skiplist结构举例

可以看到每一个节点的指针数(高度)是非常随机的。

插入数据源码逻辑

    通过插入数据的源码,我们可以看到查找元素和生成随机高度的逻辑。在插入时,判断是新数据并且是zset数据结构,会执行如下代码:

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
        //…………
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        //……
    ele = sdsdup(ele);
    //插入数据
    znode = zslInsert(zs->zsl,score,ele);
    serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
    *flags |= ZADD_ADDED;
    if (newscore) *newscore = score;
    return 1;

    
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    //暂存遍历中的前一个节点以便于插入
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    /上述前一个节点的排名(跳跃表节点从小到达排序,遍历过程中跨度的累计值即为遍历节点的排名)
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    //从高层向低层查找插入节点的前一节点
    // 这个遍历是单向的 如果会一直遍历到最下层链表
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        //若遍历的节点存在前向节点,并且节点均小于插入节点, 继续循环,直至找到插入节点的前向节点
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            //继续向后遍历 推算span
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    //随机生成层数,方法在下面
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    //插入节点
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
                //更新跨度值
        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
//生成层数 这里的ZSKIPLIST_MAXLEVEL是32 ,很足够用了
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

    即如果我们在上面的图1数据结构中试图找节点10(这样称score为10的节点),则顺序为:

第4层节点1->第3层节点1->第3层节点4->第3层节点6->第2层节点6->第2层节点9->第1层节点9->第1层节点10

    上下层切换可以忽略不计,我们总共跳跃4次变找到了节点10,相比最下层节省了很多时间。

总结

    Redis中的sorted set采用了ziplist(当元素没那么多字符串没那么大时),以便于节省内存,这一点和hash类似,此种情况下查询时间复杂度为O(n),但是一般都不是太大的数据影响不大。

    当数据达到一定大小时,便会从ziplist转换为zset,其包含一skiplist和dic,dic主要存储键值和score的映射,以便于在O(1)的效率下查询score。skiplist主要是一个多排链表,一个节点可能存储了向后推进多个元素的指针,使用skiplist做插入查询的时候,会从最高层逐渐遍历到最下层,一般都能有效缩短查询时间,每一个节点的高度都是完全随机生成的1-32。

评论区
评论
{{comment.creator}}
{{comment.createTime}} {{comment.index}}楼
评论

Redis原理-源码解析:数据结构3 sorted set(zset))

Redis原理-源码解析:数据结构3 sorted set(zset))

    Redis的set数据结构在此不多讲,同Java中原理一样,set也可以理解为是hash剥离了value的数据结构,即同为dic。但是zset(有序集合)其实在底层原理上完全不同于set。

    所有原理实现基于Redis版本6.0.9。

创建命令实现分析数据结构

    先看一下基本的指令实现,着重注意中文注解的地方。

    t_zset.c

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    int added = 0;     
    int updated = 0;    
    int processed = 0; 
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else if (!strcasecmp(opt,"gt")) flags |= ZADD_GT;
        else if (!strcasecmp(opt,"lt")) flags |= ZADD_LT;
        else break;
        scoreidx++;
    }
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;
    int gt = (flags & ZADD_GT) != 0;
    int lt = (flags & ZADD_LT) != 0;
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    if ((gt && nx) || (lt && nx) || (gt && lt)) {
        addReplyError(c,
            "GT, LT, and/or NX options at the same time are not compatible");
        return;
    }
    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    //判断数据大小并选择数据类型
    if (zobj == NULL) {
        if (xx) goto reply_to_client; 
        //server.zset_max_ziplist_entries 可配置,最小节点数 默认128
        //server.zset_max_ziplist_value 可配置,最小字符串 默认64
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {    
         //节点多或字符串大时,创建zset
            zobj = createZsetObject();
        } else {
         //创建ziplist
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    }
        //遍历要插入的数据
    for (j = 0; j < elements; j++) {
        double newscore;
        //第j条数据的score
        score = scores[j];
        int retflags = flags;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        //调用zadd 实际执行插入的地方
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

    可以看出zset的数据结构不是固定的,在其元素数或是元素的字符串过长时,其结构为zset;否则使用ziplist数据结构(像hash一样为了节省空间),二者的创建方法如下:

//元素不大
robj *createZsetObject(void) {
    //创建了一个zset结构
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;
    //为zset创建一个dict
    zs->dict = dictCreate(&zsetDictType,NULL);
    //为zset创建一个skiplist
    zs->zsl = zslCreate();
    o = createObject(OBJ_ZSET,zs);
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}
//元素过大
robj *createZsetZiplistObject(void) {
    //新建ziplist 没什么好说的
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_ZSET,zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

    ziplist的代码和原理可以参考我的博客Redis原理-源码解析:数据结构2 list-鱼鱼的Java小站,就是一个节省内存的压缩的链表结构。这里着重分析skiplist。

zset

    这是zset的数据结构,包括一个skiplist和一个dic:

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

    其中zskiplist是跳跃表,用于存储skiplist的数据,dict用于存储value和score的映射指针,是一个冗余的结构,以便于以O(1)的时间复杂度查询score;

skiplist

    zskiplist的数据结构其实就是一个以链表为基础的跳跃表:

//level代表最高的层数
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    //可能有多个向后的链表,span表示跨度
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

skiplist是在链表的基础上,每个节点有不定数量的指针指向后面的跨越了多个的链表节点。大致结构如图:

图1 skiplist结构举例

可以看到每一个节点的指针数(高度)是非常随机的。

插入数据源码逻辑

    通过插入数据的源码,我们可以看到查找元素和生成随机高度的逻辑。在插入时,判断是新数据并且是zset数据结构,会执行如下代码:

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
        //…………
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        //……
    ele = sdsdup(ele);
    //插入数据
    znode = zslInsert(zs->zsl,score,ele);
    serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
    *flags |= ZADD_ADDED;
    if (newscore) *newscore = score;
    return 1;

    
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    //暂存遍历中的前一个节点以便于插入
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    /上述前一个节点的排名(跳跃表节点从小到达排序,遍历过程中跨度的累计值即为遍历节点的排名)
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    //从高层向低层查找插入节点的前一节点
    // 这个遍历是单向的 如果会一直遍历到最下层链表
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        //若遍历的节点存在前向节点,并且节点均小于插入节点, 继续循环,直至找到插入节点的前向节点
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            //继续向后遍历 推算span
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. */
    //随机生成层数,方法在下面
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    //插入节点
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
                //更新跨度值
        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}
//生成层数 这里的ZSKIPLIST_MAXLEVEL是32 ,很足够用了
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

    即如果我们在上面的图1数据结构中试图找节点10(这样称score为10的节点),则顺序为:

第4层节点1->第3层节点1->第3层节点4->第3层节点6->第2层节点6->第2层节点9->第1层节点9->第1层节点10

    上下层切换可以忽略不计,我们总共跳跃4次变找到了节点10,相比最下层节省了很多时间。

总结

    Redis中的sorted set采用了ziplist(当元素没那么多字符串没那么大时),以便于节省内存,这一点和hash类似,此种情况下查询时间复杂度为O(n),但是一般都不是太大的数据影响不大。

    当数据达到一定大小时,便会从ziplist转换为zset,其包含一skiplist和dic,dic主要存储键值和score的映射,以便于在O(1)的效率下查询score。skiplist主要是一个多排链表,一个节点可能存储了向后推进多个元素的指针,使用skiplist做插入查询的时候,会从最高层逐渐遍历到最下层,一般都能有效缩短查询时间,每一个节点的高度都是完全随机生成的1-32。


Redis原理-源码解析:数据结构3 sorted set(zset))2021-02-28鱼鱼

{{commentTitle}}

评论   ctrl+Enter 发送评论