Redis的set数据结构在此不多讲,同Java中原理一样,set也可以理解为是hash剥离了value的数据结构,即同为dic。但是zset(有序集合)其实在底层原理上完全不同于set。
所有原理实现基于Redis版本6.0.9。
创建命令实现分析数据结构
先看一下基本的指令实现,着重注意中文注解的地方。
t_zset.c
void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_NONE); } void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL; int j, elements; int scoreidx = 0; int added = 0; int updated = 0; int processed = 0; scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else if (!strcasecmp(opt,"gt")) flags |= ZADD_GT; else if (!strcasecmp(opt,"lt")) flags |= ZADD_LT; else break; scoreidx++; } int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; int gt = (flags & ZADD_GT) != 0; int lt = (flags & ZADD_LT) != 0; elements = c->argc-scoreidx; if (elements % 2 || !elements) { addReply(c,shared.syntaxerr); return; } elements /= 2; /* Now this holds the number of score-element pairs. */ if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } if ((gt && nx) || (lt && nx) || (gt && lt)) { addReplyError(c, "GT, LT, and/or NX options at the same time are not compatible"); return; } if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } /* Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db,key); if (checkType(c,zobj,OBJ_ZSET)) goto cleanup; //判断数据大小并选择数据类型 if (zobj == NULL) { if (xx) goto reply_to_client; //server.zset_max_ziplist_entries 可配置,最小节点数 默认128 //server.zset_max_ziplist_value 可配置,最小字符串 默认64 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { //节点多或字符串大时,创建zset zobj = createZsetObject(); } else { //创建ziplist zobj = createZsetZiplistObject(); } dbAdd(c->db,key,zobj); } //遍历要插入的数据 for (j = 0; j < elements; j++) { double newscore; //第j条数据的score score = scores[j]; int retflags = flags; ele = c->argv[scoreidx+1+j*2]->ptr; //调用zadd 实际执行插入的地方 int retval = zsetAdd(zobj, score, ele, &retflags, &newscore); if (retval == 0) { addReplyError(c,nanerr); goto cleanup; } if (retflags & ZADD_ADDED) added++; if (retflags & ZADD_UPDATED) updated++; if (!(retflags & ZADD_NOP)) processed++; score = newscore; } server.dirty += (added+updated); reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c,score); else addReplyNull(c); } else { /* ZADD. */ addReplyLongLong(c,ch ? added+updated : added); } cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } }
可以看出zset的数据结构不是固定的,在其元素数或是元素的字符串过长时,其结构为zset;否则使用ziplist数据结构(像hash一样为了节省空间),二者的创建方法如下:
//元素不大 robj *createZsetObject(void) { //创建了一个zset结构 zset *zs = zmalloc(sizeof(*zs)); robj *o; //为zset创建一个dict zs->dict = dictCreate(&zsetDictType,NULL); //为zset创建一个skiplist zs->zsl = zslCreate(); o = createObject(OBJ_ZSET,zs); o->encoding = OBJ_ENCODING_SKIPLIST; return o; } //元素过大 robj *createZsetZiplistObject(void) { //新建ziplist 没什么好说的 unsigned char *zl = ziplistNew(); robj *o = createObject(OBJ_ZSET,zl); o->encoding = OBJ_ENCODING_ZIPLIST; return o; }
ziplist的代码和原理可以参考我的博客Redis原理-源码解析:数据结构2 list-鱼鱼的Java小站,就是一个节省内存的压缩的链表结构。这里着重分析skiplist。
zset
这是zset的数据结构,包括一个skiplist和一个dic:
typedef struct zset { dict *dict; zskiplist *zsl; } zset;
其中zskiplist是跳跃表,用于存储skiplist的数据,dict用于存储value和score的映射指针,是一个冗余的结构,以便于以O(1)的时间复杂度查询score;
skiplist
zskiplist的数据结构其实就是一个以链表为基础的跳跃表:
//level代表最高的层数 typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist; typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; //可能有多个向后的链表,span表示跨度 struct zskiplistLevel { struct zskiplistNode *forward; unsigned long span; } level[]; } zskiplistNode;
skiplist是在链表的基础上,每个节点有不定数量的指针指向后面的跨越了多个的链表节点。大致结构如图:
图1 skiplist结构举例
可以看到每一个节点的指针数(高度)是非常随机的。
插入数据源码逻辑
通过插入数据的源码,我们可以看到查找元素和生成随机高度的逻辑。在插入时,判断是新数据并且是zset数据结构,会执行如下代码:
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { //………… zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; //…… ele = sdsdup(ele); //插入数据 znode = zslInsert(zs->zsl,score,ele); serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); *flags |= ZADD_ADDED; if (newscore) *newscore = score; return 1; zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { //暂存遍历中的前一个节点以便于插入 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; /上述前一个节点的排名(跳跃表节点从小到达排序,遍历过程中跨度的累计值即为遍历节点的排名) unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; //从高层向低层查找插入节点的前一节点 // 这个遍历是单向的 如果会一直遍历到最下层链表 for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; //若遍历的节点存在前向节点,并且节点均小于插入节点, 继续循环,直至找到插入节点的前向节点 while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { //继续向后遍历 推算span rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ //随机生成层数,方法在下面 level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,ele); //插入节点 for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; //更新跨度值 /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; return x; } //生成层数 这里的ZSKIPLIST_MAXLEVEL是32 ,很足够用了 int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
即如果我们在上面的图1数据结构中试图找节点10(这样称score为10的节点),则顺序为:
第4层节点1->第3层节点1->第3层节点4->第3层节点6->第2层节点6->第2层节点9->第1层节点9->第1层节点10
上下层切换可以忽略不计,我们总共跳跃4次变找到了节点10,相比最下层节省了很多时间。
总结
Redis中的sorted set采用了ziplist(当元素没那么多字符串没那么大时),以便于节省内存,这一点和hash类似,此种情况下查询时间复杂度为O(n),但是一般都不是太大的数据影响不大。
当数据达到一定大小时,便会从ziplist转换为zset,其包含一skiplist和dic,dic主要存储键值和score的映射,以便于在O(1)的效率下查询score。skiplist主要是一个多排链表,一个节点可能存储了向后推进多个元素的指针,使用skiplist做插入查询的时候,会从最高层逐渐遍历到最下层,一般都能有效缩短查询时间,每一个节点的高度都是完全随机生成的1-32。