Java学习

  • 首页
  • 文章归档
  • 默认分类
  • 关于页面

  • 搜索
CAP 分布式 计算机网络 MySQL 源码 备份 Redis

Redis5数据结构-跳跃表

发表于 2020-10-20 | 分类于 Redis | 0 | 阅读次数 431

有序集合在生活中较常见,如根据成绩对学生进行排名、根据得分对游戏玩家进行排名等。对于有序集合的底层实现,我们可以使用数组、链表、平衡树等结构。数组不便于元素的插人和删除;链表的查询效率低,需要遍历所有元素;平衡树或者红黑树等结构虽然效率高但实现复杂。Redis采用了一种新型的数据结构————跳跃表。跳跃表的效率堪比红黑树,然而其实现却远比红黑树简单。

1、数据结构

跳跃表是建立在有序链表的基础上,对于这样的链表其查询操作的时间复杂度位O(N)。而有序链表的其它非查询操作本身基本不消耗时间,但是需要先找到需要操作的元素,所以整个有序链表的时间消耗主要集中在查找元素上。

如果将有序链表每隔几个提出一个元素,最后组成一个新的有序链表,查找元素的时候先在新链表中查找,当找不到或者要找的元素介于新链表的两个元素之间时,再返回原链表查询,这样相当于扩大了搜索范围,然后再精确定位:

分层有序链表

比如想要找到51节点,如果直接利用0层查找,那么需要查找6次。如果从第2层开始查找,先查找两次,发现21<51,进入第1层,再查找两次,发现41<51<61,最后进入第0层查找一次发现51,一共5次,比直接查找少了一次。数据量越大,查找区别越明显。

跳跃表就是利用这种原理:抽取有序链表部分节点,组成新一层有序链表,这一行为可重复多次,组成多层有序链表。查找时从节点最少,也就是最上层开始,如果本层找到目标就直接返回,否则就从最接近目标的节点开始往下一层开始查找,一直到找到目标或者目标不存在返回NULL。

1.1、源码实现

跳跃表的实现过程

1.1.1 跳跃表

typedef struct zskiplist {
    /* 结构体zskiplistNode就是跳跃表的节点,每个跳跃表由多个跳跃表节点组成
    header指向头结点是一个特殊节点,
    tail指向尾结点*/
    struct zskiplistNode *header, *tail;
    /*length表示跳跃表长度,该长度并不包括头结点*/
    unsigned long length;
    /*跳跃表的高度,即总共分了多少层*/
    int level;
} zskiplist;

1.1.2 跳跃表节点

typedef struct zskiplistNode {
    /*ele存储字符串类型数据*/
    sds ele;
    /*分数,用来排序*/
    double score;
    /*后退指针,指向当前节点在最底层时的前一个节点,头结点和第一个节点的该字段指向NULL
    该字段可用于从后往前遍历*/
    struct zskiplistNode *backward;
    /*这是一个数组,用来存储不同层的有序链表*/
    struct zskiplistLevel {
        /*指向当前节点在本层的下一个节点,尾结点会指向NULL*/
        struct zskiplistNode *forward;
        /*本节点和forward指向的节点之间元素个数,一般层数越高,该值越大,跳过的元素也越多*/
        unsigned long span;
    } level[];
} zskiplistNode;

前面在跳跃表中有个头节点struct zskiplistNode *header,这个节点的level[]数组长度为64表示最高64层。头结点ele为NULL,score值为0,并且不计入跳跃表总长度。头结点初始化时,64个元素的forward都是想NULL,span为0。

2、基本操作

2.1、创建跳跃表

2.1.1 、层级高度

在Redis5中,层级最小为1,最高为64。有一个函数专门用来

int zslRandomLevel(void) {
    int level = 1;
    // #define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
    /* 0xFFFF二进制1111 1111 1111 1111 共16位1,ZSKIPLIST_P * 0xFFFF结果为0011 1111 1111 1111
    即高两位变为0。在while循环的判断条件中,random()&0xFFFF就是取一个随机数的低16位,将其与0.25倍的0xFFFF比 	较,小于则层级+1,否则退出while循环。
    */
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    // #define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

显然,while每次循环的概率为ZSKIPLIST_P = 0.25,那么有如下情况:

  1. 循环0次,层级高度为1,概率为:1-ZSKIPLIST_P。
  2. 循环1次,层级高度为2,概率为:ZSKIPLIST_P*(1-ZSKIPLIST_P)。
  3. 循环2次,层级高度为3,概率为:ZSKIPLIST_P2*(1-ZSKIPLIST_P)。
  4. 循环n-1次,层级高度为n,概率为:ZSKIPLIST_Pn-1(1-ZSKIPLIST_P)。
  5. 循环n次,层级高度为n+1,概率为:ZSKIPLIST_Pn(1-ZSKIPLIST_P)。

设ZSKIPLIST_P=p

由此可得出节点期望层高:
$$
(1-p)\sum_
\infty=1/(1-p)
$$
已知p=0.25,得出期望层高为4/3,约为1.33。虽然在该函数的最后,将大于64的结果转为64,但考虑到64及以上循环的概率过低,所以可以直接认为期望层高为1.33。

2.1.2、创建跳跃表的节点

来看一下跳跃表节点的创建:

zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    /*struct zskiplistLevel是一个数组 大小由传入的level决定*/
    zskiplistNode *zn =
        zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    // 对各字段赋值 初始化
    zn->score = score;
    zn->ele = ele;
    return zn;
}

2.1.3 、创建跳跃表

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    // 申请内存
    zsl = zmalloc(sizeof(*zsl));
    // 初始化层级为1,长度为0,创建头结点,并将header指向头结点
    zsl->level = 1;
    zsl->length = 0;
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    /*对头节点的level数组的所有forward置为NULL,span置为0*/
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    /*设置头结点的backward为NULL,设置尾结点为NULL*/
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

2.2、添加节点

所有非查询操作都要先依赖于查询,所以第一步就是要查找添加的位置。

2.2.1、查找节点新增的位置

前面提到过,跳跃表本身就是为了快速查找而设计的数据结构,对于跳跃表查找的基本原理,之前已经提及,以下是Redis具体实现的函数

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // update用于存储被插入节点每层的前一个节点,这样就将每层需要更新的节点记录了下来
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // rank用于记录当前层级的header节点到对应update节点的步长,用于设置update和新插入节点的span
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;
	// 判断score是否为一个数字
    serverAssert(!isnan(score));
    // 首先将x指向header
    x = zsl->header;
    /*这个循环主要记录需要update和rank*/
    for (i = zsl->level-1; i >= 0; i--) {
        /*第一次循环的时候,i就是zsl->level-1,所以rank[i]就等于level-1*/
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        /*第二层循环:首先判断x的level[i]是否有下一个节点forward,其次该forward的分数是否小于新增的分数
        ,如果两个分数相等,那么就要进一步判断新插入的值是否比forward的值要更好
        */
        /*
        对于两个sds哪个更好的的判断函数:利用memcmp函数和字符串长度比较
        int sdscmp(const sds s1, const sds s2) {
    		size_t l1, l2, minlen;
    		int cmp;

    		l1 = sdslen(s1);
    		l2 = sdslen(s2);
    		minlen = (l1 < l2) ? l1 : l2;
    		cmp = memcmp(s1,s2,minlen);
    		if (cmp == 0) return l1-l2;
    		return cmp;
		}
        */
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            //*进入第二层循环后,更新rank的值,x指向forward
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        // update记录下要更新的node指针
        update[i] = x;
    }
    // 重新调整跳跃表高度
    level = zslRandomLevel();
    /*如果要插入节点的高度大于跳跃表高度,那么就要根据rank和update更新跳跃表*/
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    // 调整完跳跃表,开始创建并插入节点
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    // 当插入节点高度小于原跳跃表高度时,调整update中level层的span值
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    // 每个节点只有一个backward指针,当插入节点不是最后一个节点,更新被插入节点的backward为update[0]
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
     // 如果插入的就是最后一个节点,更新跳跃表tail为新插入节点
    else
        zsl->tail = x;
    // 跳跃表长度+1
    zsl->length++;
    return x;
}

2.2.2、删除节点

删除节点需要先找到节点,然后重新设置需要更新节点的span和forward

2.2.3、删除跳跃表

void zslFree(zskiplist *zsl) {
    /*从头节点0层开始,利用forward指针一个个遍历并释放其中的节点,释放完节点后再释放跳跃表*/
    zskiplistNode *node = zsl->header->level[0].forward, *next;

    zfree(zsl->header);
    while(node) {
        next = node->level[0].forward;
        zslFreeNode(node);
        node = next;
    }
    zfree(zsl);
}

3、跳跃表的应用

跳跃表主要用于有序集合的底层实现,不过有序集合默认先使用压缩列表实现。使用压缩列表有两个配置:

  1. zset-max-ziplist-entries 128:当列表元素个数不超过128时,使用压缩列表。
  2. zset-max-ziplist-entries 64:当ele字符串长度不超过64时,使用压缩列表。

由以上条件可知,当元素个数过多或存储的ele过长时,zset转为跳跃表实现,并且这种转变是单向,即使元素个数被删除到阈值以下,也不会重新转为压缩列表实现。

  • 本文作者: fanchw
  • 本文链接: https://www.fanchw.xyz/archives/redis5数据结构-跳跃表
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# CAP # 分布式 # 计算机网络 # MySQL # 源码 # 备份 # Redis
重新开始的博客
Redis5数据结构-字典dict
  • 文章目录
  • 站点概览
fanchw

fanchw

11 日志
5 分类
7 标签
Creative Commons
© 2023 fanchw
由 Halo 强力驱动
|
主题 - NexT.Pisces v5.1.4
皖ICP备19014634号-1

皖公网安备 34180202000448号