有序集合在生活中较常见,如根据成绩对学生进行排名、根据得分对游戏玩家进行排名等。对于有序集合的底层实现,我们可以使用数组、链表、平衡树等结构。数组不便于元素的插人和删除;链表的查询效率低,需要遍历所有元素;平衡树或者红黑树等结构虽然效率高但实现复杂。Redis采用了一种新型的数据结构————跳跃表。跳跃表的效率堪比红黑树,然而其实现却远比红黑树简单。
1、数据结构
跳跃表是建立在有序链表的基础上,对于这样的链表其查询操作的时间复杂度位O(N)。而有序链表的其它非查询操作本身基本不消耗时间,但是需要先找到需要操作的元素,所以整个有序链表的时间消耗主要集中在查找元素上。
如果将有序链表每隔几个提出一个元素,最后组成一个新的有序链表,查找元素的时候先在新链表中查找,当找不到或者要找的元素介于新链表的两个元素之间时,再返回原链表查询,这样相当于扩大了搜索范围,然后再精确定位:
比如想要找到51节点,如果直接利用0层查找,那么需要查找6次。如果从第2层开始查找,先查找两次,发现21<51,进入第1层,再查找两次,发现41<51<61,最后进入第0层查找一次发现51,一共5次,比直接查找少了一次。数据量越大,查找区别越明显。
跳跃表就是利用这种原理:抽取有序链表部分节点,组成新一层有序链表,这一行为可重复多次,组成多层有序链表。查找时从节点最少,也就是最上层开始,如果本层找到目标就直接返回,否则就从最接近目标的节点开始往下一层开始查找,一直到找到目标或者目标不存在返回NULL。
1.1、源码实现
1.1.1 跳跃表
typedef struct zskiplist {
/* 结构体zskiplistNode就是跳跃表的节点,每个跳跃表由多个跳跃表节点组成
header指向头结点是一个特殊节点,
tail指向尾结点*/
struct zskiplistNode *header, *tail;
/*length表示跳跃表长度,该长度并不包括头结点*/
unsigned long length;
/*跳跃表的高度,即总共分了多少层*/
int level;
} zskiplist;
1.1.2 跳跃表节点
typedef struct zskiplistNode {
/*ele存储字符串类型数据*/
sds ele;
/*分数,用来排序*/
double score;
/*后退指针,指向当前节点在最底层时的前一个节点,头结点和第一个节点的该字段指向NULL
该字段可用于从后往前遍历*/
struct zskiplistNode *backward;
/*这是一个数组,用来存储不同层的有序链表*/
struct zskiplistLevel {
/*指向当前节点在本层的下一个节点,尾结点会指向NULL*/
struct zskiplistNode *forward;
/*本节点和forward指向的节点之间元素个数,一般层数越高,该值越大,跳过的元素也越多*/
unsigned long span;
} level[];
} zskiplistNode;
前面在跳跃表中有个头节点struct zskiplistNode *header,这个节点的level[]数组长度为64表示最高64层。头结点ele为NULL,score值为0,并且不计入跳跃表总长度。头结点初始化时,64个元素的forward都是想NULL,span为0。
2、基本操作
2.1、创建跳跃表
2.1.1 、层级高度
在Redis5中,层级最小为1,最高为64。有一个函数专门用来
int zslRandomLevel(void) {
int level = 1;
// #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* 0xFFFF二进制1111 1111 1111 1111 共16位1,ZSKIPLIST_P * 0xFFFF结果为0011 1111 1111 1111
即高两位变为0。在while循环的判断条件中,random()&0xFFFF就是取一个随机数的低16位,将其与0.25倍的0xFFFF比 较,小于则层级+1,否则退出while循环。
*/
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
// #define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
显然,while每次循环的概率为ZSKIPLIST_P = 0.25,那么有如下情况:
- 循环0次,层级高度为1,概率为:1-ZSKIPLIST_P。
- 循环1次,层级高度为2,概率为:ZSKIPLIST_P*(1-ZSKIPLIST_P)。
- 循环2次,层级高度为3,概率为:ZSKIPLIST_P2*(1-ZSKIPLIST_P)。
- 循环n-1次,层级高度为n,概率为:ZSKIPLIST_Pn-1(1-ZSKIPLIST_P)。
- 循环n次,层级高度为n+1,概率为:ZSKIPLIST_Pn(1-ZSKIPLIST_P)。
设ZSKIPLIST_P=p
由此可得出节点期望层高:
$$
(1-p)\sum_\infty=1/(1-p)
$$
已知p=0.25,得出期望层高为4/3,约为1.33。虽然在该函数的最后,将大于64的结果转为64,但考虑到64及以上循环的概率过低,所以可以直接认为期望层高为1.33。
2.1.2、创建跳跃表的节点
来看一下跳跃表节点的创建:
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
/*struct zskiplistLevel是一个数组 大小由传入的level决定*/
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
// 对各字段赋值 初始化
zn->score = score;
zn->ele = ele;
return zn;
}
2.1.3 、创建跳跃表
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 申请内存
zsl = zmalloc(sizeof(*zsl));
// 初始化层级为1,长度为0,创建头结点,并将header指向头结点
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
/*对头节点的level数组的所有forward置为NULL,span置为0*/
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
/*设置头结点的backward为NULL,设置尾结点为NULL*/
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
2.2、添加节点
所有非查询操作都要先依赖于查询,所以第一步就是要查找添加的位置。
2.2.1、查找节点新增的位置
前面提到过,跳跃表本身就是为了快速查找而设计的数据结构,对于跳跃表查找的基本原理,之前已经提及,以下是Redis具体实现的函数
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update用于存储被插入节点每层的前一个节点,这样就将每层需要更新的节点记录了下来
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// rank用于记录当前层级的header节点到对应update节点的步长,用于设置update和新插入节点的span
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
// 判断score是否为一个数字
serverAssert(!isnan(score));
// 首先将x指向header
x = zsl->header;
/*这个循环主要记录需要update和rank*/
for (i = zsl->level-1; i >= 0; i--) {
/*第一次循环的时候,i就是zsl->level-1,所以rank[i]就等于level-1*/
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
/*第二层循环:首先判断x的level[i]是否有下一个节点forward,其次该forward的分数是否小于新增的分数
,如果两个分数相等,那么就要进一步判断新插入的值是否比forward的值要更好
*/
/*
对于两个sds哪个更好的的判断函数:利用memcmp函数和字符串长度比较
int sdscmp(const sds s1, const sds s2) {
size_t l1, l2, minlen;
int cmp;
l1 = sdslen(s1);
l2 = sdslen(s2);
minlen = (l1 < l2) ? l1 : l2;
cmp = memcmp(s1,s2,minlen);
if (cmp == 0) return l1-l2;
return cmp;
}
*/
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
//*进入第二层循环后,更新rank的值,x指向forward
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// update记录下要更新的node指针
update[i] = x;
}
// 重新调整跳跃表高度
level = zslRandomLevel();
/*如果要插入节点的高度大于跳跃表高度,那么就要根据rank和update更新跳跃表*/
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 调整完跳跃表,开始创建并插入节点
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 当插入节点高度小于原跳跃表高度时,调整update中level层的span值
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
// 每个节点只有一个backward指针,当插入节点不是最后一个节点,更新被插入节点的backward为update[0]
if (x->level[0].forward)
x->level[0].forward->backward = x;
// 如果插入的就是最后一个节点,更新跳跃表tail为新插入节点
else
zsl->tail = x;
// 跳跃表长度+1
zsl->length++;
return x;
}
2.2.2、删除节点
删除节点需要先找到节点,然后重新设置需要更新节点的span和forward
2.2.3、删除跳跃表
void zslFree(zskiplist *zsl) {
/*从头节点0层开始,利用forward指针一个个遍历并释放其中的节点,释放完节点后再释放跳跃表*/
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
3、跳跃表的应用
跳跃表主要用于有序集合的底层实现,不过有序集合默认先使用压缩列表实现。使用压缩列表有两个配置:
- zset-max-ziplist-entries 128:当列表元素个数不超过128时,使用压缩列表。
- zset-max-ziplist-entries 64:当ele字符串长度不超过64时,使用压缩列表。
由以上条件可知,当元素个数过多或存储的ele过长时,zset转为跳跃表实现,并且这种转变是单向,即使元素个数被删除到阈值以下,也不会重新转为压缩列表实现。