ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Redis 6.0 源码阅读笔记(7) -- ZSet 数据类型源码分析

2021-05-09 19:32:06  阅读:277  来源: 互联网

标签:score ZSet level 数据类型 zset 源码 flags ZADD 节点


1. 存储结构
在 有序集合对象 ZSet 的介绍中已经提到 ZSet 集合的底层存储结构主要有两种,其结构示例如下:

OBJ_ENCODING_ZIPLIST
当 ziplist 作为 zset 的底层存储结构时,每个集合元素使用两个紧挨在一起的压缩列表节点来保存,第一个节点保存元素值,第二个元素保存元素的分值,而且分值小的靠近表头,大的靠近表尾

OBJ_ENCODING_SKIPLIST
底层实现是跳跃表结合字典。每个跳跃表节点都保存一个集合元素,并按分值从小到大排列,每个节点的 ele 属性保存了元素的值,score属性保存分值;字典的每个键值对保存一个集合元素,元素值包装为字典的键,元素分值保存为字典的值。注意集合的元素成员和分值是共享的,跳跃表和字典通过指针指向同一地址,不会浪费内存

2. 源码解析
2.1 存储过程分析
对有序集合 ZSet 的操作命令的处理函数在t_zset.c 文件中,向 ZSet 添加元素调用的函数为t_zset.c#zaddCommand()。从源码可以看到,这个函数其实主要调用了t_zset.c#zaddGenericCommand(),zaddGenericCommand() 的主要逻辑如下:

解析参数得到每个元素及其对应的分值
查找key对应的 ZSet 集合对象是否存在,不存在则创建。此处创建新的 ZSet 集合对象有两个配置需要注意,如果 zset_max_ziplist_entries(默认 128) 为 0 或者要存储的元素长度大于 zset_max_ziplist_value(默认 64),则创建底层结构为 OBJ_ENCODING_SKIPLIST 的有序集合对象,否则创建底层结构为 OBJ_ENCODING_ZIPLIST 的有序集合对象
对于所有的元素,循环调用 t_zset.c#zsetAdd()函数将其添加到有序集合中

void zaddCommand(client *c) {
 zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
 static char *nanerr = "resulting score is not a number (NaN)";
 robj *key = c->argv[1];
 robj *zobj;
 sds ele;
 double score = 0, *scores = NULL;
 int j, elements;
 int scoreidx = 0;
 /* The following vars are used in order to track what the command actually
  * did during the execution, to reply to the client and to trigger the
  * notification of keyspace change. */
 int added = 0;      /* Number of new elements added. */
 int updated = 0;    /* Number of elements with updated score. */
 int processed = 0;  /* Number of elements processed, may remain zero with
                        options like XX. */

 /* Parse options. At the end 'scoreidx' is set to the argument position
  * of the score of the first score-element pair. */
 scoreidx = 2;
 while(scoreidx < c->argc) {
     char *opt = c->argv[scoreidx]->ptr;
     if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
     else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
     else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
     else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
     else break;
     scoreidx++;
 }

 /* Turn options into simple to check vars. */
 int incr = (flags & ZADD_INCR) != 0;
 int nx = (flags & ZADD_NX) != 0;
 int xx = (flags & ZADD_XX) != 0;
 int ch = (flags & ZADD_CH) != 0;

 /* After the options, we expect to have an even number of args, since
  * we expect any number of score-element pairs. */
 elements = c->argc-scoreidx;
 if (elements % 2 || !elements) {
     addReply(c,shared.syntaxerr);
     return;
 }
 elements /= 2; /* Now this holds the number of score-element pairs. */

 /* Check for incompatible options. */
 if (nx && xx) {
     addReplyError(c,
         "XX and NX options at the same time are not compatible");
     return;
 }

 if (incr && elements > 1) {
     addReplyError(c,
         "INCR option supports a single increment-element pair");
     return;
 }

 /* Start parsing all the scores, we need to emit any syntax error
  * before executing additions to the sorted set, as the command should
  * either execute fully or nothing at all. */
 scores = zmalloc(sizeof(double)*elements);
 for (j = 0; j < elements; j++) {
     if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
         != C_OK) goto cleanup;
 }

 /* Lookup the key and create the sorted set if does not exist. */
 zobj = lookupKeyWrite(c->db,key);
 if (zobj == NULL) {
     if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
     if (server.zset_max_ziplist_entries == 0 ||
         server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
     {
         zobj = createZsetObject();
     } else {
         zobj = createZsetZiplistObject();
     }
     dbAdd(c->db,key,zobj);
 } else {
     if (zobj->type != OBJ_ZSET) {
         addReply(c,shared.wrongtypeerr);
         goto cleanup;
     }
 }

 for (j = 0; j < elements; j++) {
     double newscore;
     score = scores[j];
     int retflags = flags;

     ele = c->argv[scoreidx+1+j*2]->ptr;
     int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
     if (retval == 0) {
         addReplyError(c,nanerr);
         goto cleanup;
     }
     if (retflags & ZADD_ADDED) added++;
     if (retflags & ZADD_UPDATED) updated++;
     if (!(retflags & ZADD_NOP)) processed++;
     score = newscore;
 }
 server.dirty += (added+updated);

reply_to_client:
 if (incr) { /* ZINCRBY or INCR option. */
     if (processed)
         addReplyDouble(c,score);
     else
         addReplyNull(c);
 } else { /* ZADD. */
     addReplyLongLong(c,ch ? added+updated : added);
 }

cleanup:
 zfree(scores);
 if (added || updated) {
     signalModifiedKey(c,c->db,key);
     notifyKeyspaceEvent(NOTIFY_ZSET,
         incr ? "zincr" : "zadd", key, c->db->id);
 }
}

ZSet 集合对象的创建函数在 object.c中,源码如下,可以看到其两种存储结构是有明显区别的

robj *createZsetObject(void) {
 zset *zs = zmalloc(sizeof(*zs));
 robj *o;

 zs->dict = dictCreate(&zsetDictType,NULL);
 zs->zsl = zslCreate();
 o = createObject(OBJ_ZSET,zs);
 o->encoding = OBJ_ENCODING_SKIPLIST;
 return o;
}

robj *createZsetZiplistObject(void) {
 unsigned char *zl = ziplistNew();
 robj *o = createObject(OBJ_ZSET,zl);
 o->encoding = OBJ_ENCODING_ZIPLIST;
 return o;
}

t_zset.c#zsetAdd()函数实现较长,但是逻辑比较清晰:

首先判断如果存储结构是 ziplist,在执行添加的过程中需要调用函数 t_zset.c#zzlFind() 区分元素存在和不存在两种情况。另外需注意,函数 t_zset.c#zzlInsert() 将元素插入到 ziplist 中是根据 score 分数有序插入的,分数小的在 ziplist 头部,分数大的在尾部,元素和分数分两次操作插入到 ziplist 中,并且紧挨在一起
存在的情况下如果元素 score 改变了,则先删除 ziplist 中的元素然后重新添加
不存在的情况下直接向 ziplist 添加元素,插入完成后需要考虑元素的长度是否超出限制或 ziplist 存储的元素个数是否超过最大限制,进而决定是否将底层结构转为 skiplist
如果存储结构是 skiplist,同样需要区分元素存在和不存在两种情况。需要注意,在跳跃表中元素是按照分数有序存储的,这样范围查找的时间复杂度为 O(logN);使用字典则能达到单个元素查找O(1)的时间复杂度,非常高效
存在的情况并且 score 分数发生变化的话,调用 t_zset.c#zslUpdateScore() 函数在跳跃表中查找到目标节点,将其删除并重新插入一个节点,完成后返回新的跳跃表节点用于更新字典中元素的分数
不存在情况下就直接调用t_zset.c#zslInsert() 函数往跳跃表添加节点,完成后同时往字典新增一个节点

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
 /* Turn options into simple to check vars. */
 int incr = (*flags & ZADD_INCR) != 0;
 int nx = (*flags & ZADD_NX) != 0;
 int xx = (*flags & ZADD_XX) != 0;
 *flags = 0; /* We'll return our response flags. */
 double curscore;

 /* NaN as input is an error regardless of all the other parameters. */
 if (isnan(score)) {
     *flags = ZADD_NAN;
     return 0;
 }

 /* Update the sorted set according to its encoding. */
 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
     unsigned char *eptr;

     if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
         /* NX? Return, same element already exists. */
         if (nx) {
             *flags |= ZADD_NOP;
             return 1;
         }

         /* Prepare the score for the increment if needed. */
         if (incr) {
             score += curscore;
             if (isnan(score)) {
                 *flags |= ZADD_NAN;
                 return 0;
             }
             if (newscore) *newscore = score;
         }

         /* Remove and re-insert when score changed. */
         if (score != curscore) {
             zobj->ptr = zzlDelete(zobj->ptr,eptr);
             zobj->ptr = zzlInsert(zobj->ptr,ele,score);
             *flags |= ZADD_UPDATED;
         }
         return 1;
     } else if (!xx) {
         /* Optimize: check if the element is too large or the list
          * becomes too long *before* executing zzlInsert. */
         zobj->ptr = zzlInsert(zobj->ptr,ele,score);
         if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
             sdslen(ele) > server.zset_max_ziplist_value)
             zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
         if (newscore) *newscore = score;
         *flags |= ZADD_ADDED;
         return 1;
     } else {
         *flags |= ZADD_NOP;
         return 1;
     }
 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
     zset *zs = zobj->ptr;
     zskiplistNode *znode;
     dictEntry *de;

     de = dictFind(zs->dict,ele);
     if (de != NULL) {
         /* NX? Return, same element already exists. */
         if (nx) {
             *flags |= ZADD_NOP;
             return 1;
         }
         curscore = *(double*)dictGetVal(de);

         /* Prepare the score for the increment if needed. */
         if (incr) {
             score += curscore;
             if (isnan(score)) {
                 *flags |= ZADD_NAN;
                 return 0;
             }
             if (newscore) *newscore = score;
         }

         /* Remove and re-insert when score changes. */
         if (score != curscore) {
             znode = zslUpdateScore(zs->zsl,curscore,ele,score);
             /* Note that we did not removed the original element from
              * the hash table representing the sorted set, so we just
              * update the score. */
             dictGetVal(de) = &znode->score; /* Update score ptr. */
             *flags |= ZADD_UPDATED;
         }
         return 1;
     } else if (!xx) {
         ele = sdsdup(ele);
         znode = zslInsert(zs->zsl,score,ele);
         serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
         *flags |= ZADD_ADDED;
         if (newscore) *newscore = score;
         return 1;
     } else {
         *flags |= ZADD_NOP;
         return 1;
     }
 } else {
     serverPanic("Unknown sorted set encoding");
 }
 return 0; /* Never reached. */
}

2.2 数据存储结构 zskiplist
2.2.1 存储结构定义
Redis 中的有序集合 ZSet 有属于它自己的数据结构,其定义在server.h文件中,源码如下:

可以看到 OBJ_ENCODING_SKIPLIST 编码的 ZSet 以 zset 结构体封装数据,其内部包含了两个关键的数据结构

dict :字典,键为成员,值为分值,可用于支持 O(1) 复杂度的按元素取分值操作
zsl: 跳跃表,按分值排序元素,用于支持平均复杂度为 O(log N) 的按分值定位元素的操作以及范围操作

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

数据结构 zskiplist 的定义如下,可以看到其包含的关键数据如下:

header: 跳跃表头节点
tail: 跳跃表尾节点
length: 即链表包含的节点总数。新创建的 skiplist 包含一个空的头指针,这个头指针不包含在 length 计数中
level: 表中层数最大的节点的层数

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

数据结构 zskiplistNode 是zskiplist 的单个节点的定义,其包含的关键属性如下:

  1. ele: 动态字符串对象,用于存储元素
  2. score: 元素的分值
  3. backward: 当前节点的上一个节点
  4. level[]: 跳跃表层数数组,保存了每一层指向的下一个节点
typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

2.2.2 关键函数

2.2.2.1 跳跃表的构建

t_zset.c#zslInsert() 是跳跃表中非常关键的函数,这里体现了跳跃表的构建过程及其查找策略。可以看到 skiplist 为每个节点随机出一个层数(level),比如一个节点随机出的层数是3,那么就把它链入到从第1层到第3层这三层链表中

首先通过分数 score 在跳跃表中查找到新增的节点应该插入的位置上的节点
调用 zslRandomLevel() 函数随机生成节点层数,使用该层数新建一个跳跃表节点
更新新增的节点应插入位置上的节点前后指针,将新增节点插入到跳跃表中
 

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 unsigned int rank[ZSKIPLIST_MAXLEVEL];
 int i, level;

 serverAssert(!isnan(score));
 x = zsl->header;
 for (i = zsl->level-1; i >= 0; i--) {
     /* store rank that is crossed to reach the insert position */
     rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
     while (x->level[i].forward &&
             (x->level[i].forward->score < score ||
                 (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele,ele) < 0)))
     {
         rank[i] += x->level[i].span;
         x = x->level[i].forward;
     }
     update[i] = x;
 }
 /* we assume the element is not already inside, since we allow duplicated
  * scores, reinserting the same element should never happen since the
  * caller of zslInsert() should test in the hash table if the element is
  * already inside or not. */
 level = zslRandomLevel();
 if (level > zsl->level) {
     for (i = zsl->level; i < level; i++) {
         rank[i] = 0;
         update[i] = zsl->header;
         update[i]->level[i].span = zsl->length;
     }
     zsl->level = level;
 }
 x = zslCreateNode(level,score,ele);
 for (i = 0; i < level; i++) {
     x->level[i].forward = update[i]->level[i].forward;
     update[i]->level[i].forward = x;

     /* update span covered by update[i] as x is inserted here */
     x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
     update[i]->level[i].span = (rank[0] - rank[i]) + 1;
 }

 /* increment span for untouched levels */
 for (i = level; i < zsl->level; i++) {
     update[i]->level[i].span++;
 }

 x->backward = (update[0] == zsl->header) ? NULL : update[0];
 if (x->level[0].forward)
     x->level[0].forward->backward = x;
 else
     zsl->tail = x;
 zsl->length++;
 return x;
}

int zslRandomLevel(void) {
 int level = 1;
 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
     level += 1;
 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

2.2.2.2 跳跃表的查找

在以上三层链表结构上,如果需要查找15,查找的路径是沿着图中的红色箭头所指向的方向进行的15 首先和 10 比较,比 10 大,继续与后继节点比较
10 的后继节点为 NULL,说明最上层的链表已经结束了,回到第二层链表继续比较。此时 10 的后继节点为 20,15 比 20 小,回到 10 的第一层链表
10 的一层链表后继节点为 15,待查数据找到了
以上过程中沿着最上层链表首先要比较的是10,发现待查数据比10大,接下来就只需要到 10 的后面去继续查找,从而跳过了 10 前面的所有节点。这样当链表足够长的时候,这种多层链表的查找方式就能跳过很多下层节点,大大加快查找的速度
 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:score,ZSet,level,数据类型,zset,源码,flags,ZADD,节点
来源: https://blog.csdn.net/alpha_love/article/details/116568705

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有