用户工具

站点工具


about_redis_8

Redis那些事儿 第八章 hash相关命令的实现

作者:陈科

联系方式:chenke1818@gmail.com

转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_redis_8

本章我们来分析下和hash相关的命令,在分析之前我们先来看下和hash表相关的数据结构的实现:

1.ziplist

关于ziplist的数据结构,作者在ziplist.c的最前面注释里已经写的非常清楚了。我们来看下:

<zlbytes><zltail><zllen><entry><entry><zlend>

<zlbytes>:是一个4字节无符号整数,用来存储整个ziplist占用的字节数

<zltail>:是一个4字节无符号整数,用来存储ziplist最后一个节点的相对于ziplist首地址偏移量

<zllen>:是一个2字节无符号整数,存储ziplist中节点的数目,最大值为(2^16 - 2),当zllen大于最大值时,需要遍历整个ziplist才能获取ziplist节点的数目

<entry>:ziplist存储的节点,各个节点的字节数根据内容而定

<zlend>:是一个1字节无符号整数,值为255(11111111),作为ziplist的结尾符

<entry>的数据结构为:

<previous entry length><encode&length><content>

<previous entry length>:代表前一个entry的长度,假如第一个字节大于254,那么后面四个字节为上一个entry的长度,否则这个部分为一个字节。

<encode&length>:这个部分信息量稍微多些:

|00pppppp| - 如果encode的前面2位为00,则后面6位表示length。

|01pppppp|qqqqqqqq| - 如果encode的前面2位为01,则后面14位表示length。

|10__|qqqqqqqq|rrrrrrrr|ssssssss|tttttttt| - 如果encode的前面2位为10,则跳过6位后,接下去的4个字节代表length。

|11000000| - 如果encode为:11000000,length代表2个字节的整数。

|11010000| - 如果encode为:11010000,length代表4个字节的整数。

|11100000| - 如果encode为:11100000,length代表8个字节的整数。

|11110000| - 如果encode为:11110000,length代表3个字节的整数。

|11111110| - 如果encode为:11111110,length代表1个字节的整数。

要是搞晕了,我们直接看解码部分的代码:

#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {                    \
    ZIP_ENTRY_ENCODING((ptr), (encoding));                                     \
    if ((encoding) < ZIP_STR_MASK) {                                           \
        if ((encoding) == ZIP_STR_06B) {                                       \
            (lensize) = 1;                                                     \
            (len) = (ptr)[0] & 0x3f;                                           \
        } else if ((encoding) == ZIP_STR_14B) {                                \
            (lensize) = 2;                                                     \
            (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];                       \
        } else if (encoding == ZIP_STR_32B) {                                  \
            (lensize) = 5;                                                     \
            (len) = ((ptr)[1] << 24) |                                         \
                    ((ptr)[2] << 16) |                                         \
                    ((ptr)[3] <<  8) |                                         \
                    ((ptr)[4]);                                                \
        } else {                                                               \
            assert(NULL);                                                      \
        }                                                                      \
    } else {                                                                   \
        (lensize) = 1;                                                         \
        (len) = zipIntSize(encoding);                                          \
    }                                                                          \
} while(0);
static unsigned int zipIntSize(unsigned char encoding) {
    switch(encoding) {
    case ZIP_INT_8B:  return 1;
    case ZIP_INT_16B: return 2;
    case ZIP_INT_24B: return 3;
    case ZIP_INT_32B: return 4;
    case ZIP_INT_64B: return 8;
    default: return 0; /* 4 bit immediate */
    }
    assert(NULL);
    return 0;
}

以下为entry的数据结构代码:

typedef struct zlentry {
    unsigned int prevrawlensize, prevrawlen;
    unsigned int lensize, len;
    unsigned int headersize;
    unsigned char encoding;
    unsigned char *p;
} zlentry;

接着我们看一下ziplist的初始化代码:

unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}

接着看下关键操作宏的实现:

#define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl))) //获取zlbytes值
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) //获取zltail值
#define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) //获取zllen值
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t)) //head的长度为4+4+2
#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE) //指针指向第一个entry
#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) //获取ziplist最后一个entry的首地址
#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1) //获取ziplist的末端,即zlend首地址

2.hash表中<K,V>键值对的默认实现

我们通过hset命令的实现来看下hash表中<K,V>键值对的默认实现:

void hsetCommand(redisClient *c) {
    int update;
    robj *o;
 
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    hashTypeTryConversion(o,c->argv,2,3);
    hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
    update = hashTypeSet(o,c->argv[2],c->argv[3]);
    addReply(c, update ? shared.czero : shared.cone);
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
    server.dirty++;
}
robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (o == NULL) {
        o = createHashObject();
        dbAdd(c->db,key,o);
    } else {
        if (o->type != REDIS_HASH) {
            addReply(c,shared.wrongtypeerr);
            return NULL;
        }
    }
    return o;
}
robj *createHashObject(void) {
    unsigned char *zl = ziplistNew();
    robj *o = createObject(REDIS_HASH, zl);
    o->encoding = REDIS_ENCODING_ZIPLIST;
    return o;
}

从上面代码可知,<K,V>键值对默认是ziplist.

3.hash相关命令的实现

接下来我们通过分析和hash相关的命令来具体展开:

3.1 HSET

先上代码:

void hsetCommand(redisClient *c) {
    int update;
    robj *o;
 
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    hashTypeTryConversion(o,c->argv,2,3);
    hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
    update = hashTypeSet(o,c->argv[2],c->argv[3]);
    addReply(c, update ? shared.czero : shared.cone);
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
    server.dirty++;
}

这里需要关注下hashTypeTryConversion这个步骤:

void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
    int i;
 
    if (o->encoding != REDIS_ENCODING_ZIPLIST) return;
 
    for (i = start; i <= end; i++) {
        if (argv[i]->encoding == REDIS_ENCODING_RAW &&
            sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
        {
            hashTypeConvert(o, REDIS_ENCODING_HT);
            break;
        }
    }
}

由于ziplist是通过文本来实现了一个双向的链表,如果链表过长的话,就会影响性能,所以,假如ziplist长度大于server.hash_max_ziplist_value则会把代表<k,v>键值对的ziplist转换成dict类型。hashTypeConvert的转化过程可自行分析。

我们接着看hset相关的关键函数hashTypeSet:

int hashTypeSet(robj *o, robj *field, robj *value) {
    int update = 0;
 
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl, *fptr, *vptr;
 
        field = getDecodedObject(field);
        value = getDecodedObject(value);
 
        zl = o->ptr;
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
            if (fptr != NULL) {
                /* Grab pointer to the value (fptr points to the field) */
                vptr = ziplistNext(zl, fptr);
                redisAssert(vptr != NULL);
                update = 1;
 
                /* Delete value */
                zl = ziplistDelete(zl, &vptr);
 
                /* Insert new value */
                zl = ziplistInsert(zl, vptr, value->ptr, sdslen(value->ptr));
            }
        }
 
        if (!update) {
            /* Push new field/value pair onto the tail of the ziplist */
            zl = ziplistPush(zl, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
            zl = ziplistPush(zl, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
        }
        o->ptr = zl;
        decrRefCount(field);
        decrRefCount(value);
 
        /* Check if the ziplist needs to be converted to a hash table */
        if (hashTypeLength(o) > server.hash_max_ziplist_entries)
            hashTypeConvert(o, REDIS_ENCODING_HT);
    } else if (o->encoding == REDIS_ENCODING_HT) {
        if (dictReplace(o->ptr, field, value)) { /* Insert */
            incrRefCount(field);
        } else { /* Update */
            update = 1;
        }
        incrRefCount(value);
    } else {
        redisPanic("Unknown hash encoding");
    }
    return update;
}

3.1.1 ziplist相关函数

我们接着分析上面和ziplist相关的几个函数:

3.1.1.1 ziplistIndex

该函数通过index值来查找entry对象:

unsigned char *ziplistIndex(unsigned char *zl, int index) {
    unsigned char *p;
    unsigned int prevlensize, prevlen = 0;
    if (index < 0) {
        index = (-index)-1;
        p = ZIPLIST_ENTRY_TAIL(zl);
        if (p[0] != ZIP_END) {
            ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            while (prevlen > 0 && index--) {
                p -= prevlen;
                ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            }
        }
    } else {
        p = ZIPLIST_ENTRY_HEAD(zl);
        while (p[0] != ZIP_END && index--) {
            p += zipRawEntryLength(p);
        }
    }
    return (p[0] == ZIP_END || index > 0) ? NULL : p;
}

3.1.1.2 ziplistFind

该函数也比较简单,通过字符串值来查找entry:

unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    int skipcnt = 0;
    unsigned char vencoding = 0;
    long long vll = 0;
 
    while (p[0] != ZIP_END) {
        unsigned int prevlensize, encoding, lensize, len;
        unsigned char *q;
 
        ZIP_DECODE_PREVLENSIZE(p, prevlensize);
        ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
        q = p + prevlensize + lensize;
 
        if (skipcnt == 0) {
            /* Compare current entry with specified entry */
            if (ZIP_IS_STR(encoding)) {
                if (len == vlen && memcmp(q, vstr, vlen) == 0) {
                    return p;
                }
            } else {
                /* Find out if the searched field can be encoded. Note that
                 * we do it only the first time, once done vencoding is set
                 * to non-zero and vll is set to the integer value. */
                if (vencoding == 0) {
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        /* If the entry can't be encoded we set it to
                         * UCHAR_MAX so that we don't retry again the next
                         * time. */
                        vencoding = UCHAR_MAX;
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }
 
                /* Compare current entry with specified entry, do it only
                 * if vencoding != UCHAR_MAX because if there is no encoding
                 * possible for the field it can't be a valid integer. */
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, encoding);
                    if (ll == vll) {
                        return p;
                    }
                }
            }
 
            /* Reset skip count */
            skipcnt = skip;
        } else {
            /* Skip entry */
            skipcnt--;
        }
 
        /* Move to next entry */
        p = q + len;
    }
 
    return NULL;
}

3.1.1.3 ziplistDelete

删除的过程比较复杂:

//从指针 p 开始,删除 num 个节点
static unsigned char *_ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;
 
    first = zipEntry(p); //删除的首个节点
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p); //偏移到下个节点
        deleted++;
    }
 
    totlen = p-first.p;// 被删除的节点的总字节数
    if (totlen > 0) {
        if (p[0] != ZIP_END) {
            /* Storing `prevrawlen` in this entry may increase or decrease the
             * number of bytes required compare to the current `prevrawlen`.
             * There always is room to store this, because it was previously
             * stored by an entry that is now being deleted. */
            //计算删除的第一个节点first的prevrawlensize与p节点prevrawlensize的差值
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
            p -= nextdiff; //根据nextdiff值,对p进行向前或向后偏移,留取的字节来保存first.prevrawlen
            zipPrevEncodeLength(p,first.prevrawlen);//将first.prevrawlen值存储在p的prevrawlensize中
 
            /* Update offset for tail */
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
 
            /* When the tail contains more than one entry, we need to take
             * "nextdiff" in account as well. Otherwise, a change in the
             * size of prevlen doesn't have an effect on the *tail* offset. */
            //如果p不是尾节点,那么尾节点指针的首地址还需要加上nextdiff
            tail = zipEntry(p);
            if (p[tail.headersize+tail.len] != ZIP_END) {
                ZIPLIST_TAIL_OFFSET(zl) =
                   intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
            }
 
            /* Move tail to the front of the ziplist */
            //first.p至p之间的节点都是需要删除的,因此需要将p开始的数据向前偏移,zlend不需要处理,因此需要-1
            memmove(first.p,p,intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
        } else {
            /* The entire tail was deleted. No need to move memory. */
            //如果已经删除到zlend,那么尾节点指针应该指向被删除的first之前的节点首地址
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe((first.p-zl)-first.prevrawlen);
        }
 
        /* Resize and update length */
        offset = first.p-zl;
        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
        ZIPLIST_INCR_LENGTH(zl,-deleted);
        p = zl+offset;
 
        /* When nextdiff != 0, the raw length of the next entry has changed, so
         * we need to cascade the update throughout the ziplist */
        /**
            如果nextdiff不等于0,说明现在的p节点的长度变了,需要级联更新下个节点能否保存
            p节点的长度值
        */
        if (nextdiff != 0)
            zl = _ziplistCascadeUpdate(zl,p);
    }
    return zl;
}

3.1.1.4 ziplistInsert

插入的过程类似删除:

//添加保存给定元素s的新节点插入到地址p之前,然后原有的数据向后偏移
//zl: ziplist首地址,p:插入位置指针,s:待插入的字符串首地址,slen:待插入字符串长度
static unsigned char *_ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry entry, tail;
 
    /* Find out prevlen for the entry that is inserted. */
    // 那么取出节点相关资料,以及 prevlen
    if (p[0] != ZIP_END) {//p之后存在节点
        entry = zipEntry(p);//取出p节点的相关资料
        prevlen = entry.prevrawlen;//得到p节点前一个节点所占字节数
    } else {//p之后没有节点,达到zlend,那么就取出尾节点
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {//如果存在尾节点
            prevlen = zipRawEntryLength(ptail);//得到尾节点的总字节数,然后在尾节点之后insert
        }//否则就应该是在一个空的ziplist第一个insert一个节点
    }
 
    /* See if the entry can be encoded */
    // 查看能否将新值保存为整数,如果可以的话返回 1 ,
    // 并将新值保存到 value ,编码形式保存到 encoding
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        //s 可以保存为整数,那么继续计算保存它所需的空间
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        // 不能保存为整数,直接使用字符串长度
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    // 计算编码 prevlen 所需的长度
    reqlen += zipPrevEncodeLength(NULL,prevlen);
    //计算编码slen所需的长度
    reqlen += zipEncodeLength(NULL,encoding,slen);
 
    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
    //当插入的位置不为尾部时,需要确保下一个节点的存储前一个
    //节点所占自己数的空间能够存储即将插入节点的长度
    // zipPrevLenByteDiff 的返回值有三种可能:
    // 1)新旧两个节点的编码长度相等,返回 0
    // 2)新节点编码长度 > 旧节点编码长度,返回 5 - 1 = 4
    // 3)旧节点编码长度 > 新编码节点长度,返回 1 - 5 = -4
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
 
    /* Store offset because a realloc may change the address of zl. */
    offset = p-zl;//保存当前的偏移量,在这偏移量之前的数据不需要改变,只需要改变在此之后的数据
    // 重分配空间,并更新长度属性和表尾
    // 新空间长度 = 现有长度 + 新节点所需长度 + 编码新节点长度所需的长度差
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;
 
    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        //将原有从p-nextdiff开始全部向后偏移,余留出reqlen保存即将insert的数据
        /**
            nextdiff = -4:原来p有5个字节来存储上个节点的长度,而现在只需要1个,
                          因此只需要将p+4后面的字节偏移到p+reqlen即可,这样就
                          只保留1个字节保存reqlen的长度了
            nextdiff = 4: 原来p只有1个字节来存储上个节点的长度,现在需要5个,
                          那就将p-4后面的字节偏移到p+reqlen,这样p原来有1个字节
                          加上多偏移来的4个字节就可以保存reqlen的长度了
            nextdiff = 0: 不需要考虑
        */
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
 
        /* Encode this entry's raw length in the next entry. */
        zipPrevEncodeLength(p+reqlen,reqlen);//下个节点保存即将insert数据的所占字节数
 
        /* Update offset for tail */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
 
        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        // 有需要的话,将 nextdiff 也加上到 zltail 上
        tail = zipEntry(p+reqlen);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        // 更新 ziplist 的 zltail 属性,现在新添加节点为表尾节点
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }
 
    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    /**
        如果nextdiff不等于0,说明现在的p+reqlen节点的长度变了,需要级联更新下个节点能否保存
        p+reqlen节点的长度值
    */
    if (nextdiff != 0) {
        offset = p-zl;
        zl = _ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }
 
    /* Write the entry */
    p += zipPrevEncodeLength(p,prevlen);//填写保存上一个节点长度的字节数
    p += zipEncodeLength(p,encoding,slen);//填写保存当前节点长度的字节数
    if (ZIP_IS_STR(encoding)) {//保存当前节点的字符串
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);//整数
    }
    ZIPLIST_INCR_LENGTH(zl,1);//length + 1
    return zl;
}

3.1.1.5 ziplistPush

push其实是插入的过程:

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return _ziplistInsert(zl,p,s,slen);
}

从上面可以看到push的过程只支持从头插入或者从尾插入。

3.1.2 hashTypeSet过程

我们再回到前面的hashTypeSet函数来分析步骤:

1.对传入的filed value进行编码

2.在ziplist链表中查找filed,假如已经存在,则删除旧的value,插入新的value.

3.假如filed不存在,那么则按照filed,value的顺序插入到ziplist中

3.2 HDEL

该命令格式为:

HDEL key field [field …]

void hdelCommand(redisClient *c) {
    robj *o;
    int j, deleted = 0, keyremoved = 0;
 
    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,REDIS_HASH)) return;
 
    for (j = 2; j < c->argc; j++) {
        if (hashTypeDelete(o,c->argv[j])) {
            deleted++;
            if (hashTypeLength(o) == 0) {
                dbDelete(c->db,c->argv[1]);
                keyremoved = 1;
                break;
            }
        }
    }
    if (deleted) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hdel",c->argv[1],c->db->id);
        if (keyremoved)
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],
                                c->db->id);
        server.dirty += deleted;
    }
    addReplyLongLong(c,deleted);
}

执行过程如下:

1.查找key是否在数据库中存在对象

2.循环执行hashTypeDelete过程,该过程其实就是对ziplist查找和删除的过程,具体代码不在黏贴了。

3.如果删除成功则signalModifiedKey&notifyKeyspaceEvent&server.dirty += deleted

4.返回客户端删除成功的field数量

3.3 HEXISTS

只要知道了插入的过程那么exists也很简单了:

void hexistsCommand(redisClient *c) {
    robj *o;
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,REDIS_HASH)) return;
 
    addReply(c, hashTypeExists(o,c->argv[2]) ? shared.cone : shared.czero);

3.4 HGET

这个直接看源码吧:

void hgetCommand(redisClient *c) {
    robj *o;
 
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
        checkType(c,o,REDIS_HASH)) return;
 
    addHashFieldToReply(c, o, c->argv[2]);
}
static void addHashFieldToReply(redisClient *c, robj *o, robj *field) {
    int ret;
 
    if (o == NULL) {
        addReply(c, shared.nullbulk);
        return;
    }
 
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *vstr = NULL;
        unsigned int vlen = UINT_MAX;
        long long vll = LLONG_MAX;
 
        ret = hashTypeGetFromZiplist(o, field, &vstr, &vlen, &vll);
        if (ret < 0) {
            addReply(c, shared.nullbulk);
        } else {
            if (vstr) {
                addReplyBulkCBuffer(c, vstr, vlen);
            } else {
                addReplyBulkLongLong(c, vll);
            }
        }
 
    } else if (o->encoding == REDIS_ENCODING_HT) {
        robj *value;
 
        ret = hashTypeGetFromHashTable(o, field, &value);
        if (ret < 0) {
            addReply(c, shared.nullbulk);
        } else {
            addReplyBulk(c, value);
        }
 
    } else {
        redisPanic("Unknown hash encoding");
    }
}

3.5 HGETALL/HKEYS/HVALS

这三个比较类似,就是遍历ziplist的过程:

void hkeysCommand(redisClient *c) {
    genericHgetallCommand(c,REDIS_HASH_KEY);
}
 
void hvalsCommand(redisClient *c) {
    genericHgetallCommand(c,REDIS_HASH_VALUE);
}
 
void hgetallCommand(redisClient *c) {
    genericHgetallCommand(c,REDIS_HASH_KEY|REDIS_HASH_VALUE);
}

3.6 HINCRBY

只要知道了原理,其实源码都很简单了:

void hincrbyCommand(redisClient *c) {
    long long value, incr, oldvalue;
    robj *o, *current, *new;
 
    if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return;
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    if ((current = hashTypeGetObject(o,c->argv[2])) != NULL) {
        if (getLongLongFromObjectOrReply(c,current,&value,
            "hash value is not an integer") != REDIS_OK) {
            decrRefCount(current);
            return;
        }
        decrRefCount(current);
    } else {
        value = 0;
    }
 
    oldvalue = value;
    if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
        (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
        addReplyError(c,"increment or decrement would overflow");
        return;
    }
    value += incr;
    new = createStringObjectFromLongLong(value);
    hashTypeTryObjectEncoding(o,&c->argv[2],NULL);
    hashTypeSet(o,c->argv[2],new);
    decrRefCount(new);
    addReplyLongLong(c,value);
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hincrby",c->argv[1],c->db->id);
    server.dirty++;
}

总体就4步:

1.查找

2.incr

3.设新值

4.addReplyLongLong&signalModifiedKey&notifyKeyspaceEvent&server.dirty++

3.7 HINCRBYFLOAT

这一步类似3.6,多出的最后一步在后面另外分析:

void hincrbyfloatCommand(redisClient *c) {
    double long value, incr;
    robj *o, *current, *new, *aux;

    if (getLongDoubleFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return;
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    if ((current = hashTypeGetObject(o,c->argv[2])) != NULL) {
        if (getLongDoubleFromObjectOrReply(c,current,&value,
            "hash value is not a valid float") != REDIS_OK) {
            decrRefCount(current);
            return;
        }
        decrRefCount(current);
    } else {
        value = 0;
    }

    value += incr;
    new = createStringObjectFromLongDouble(value,1);
    hashTypeTryObjectEncoding(o,&c->argv[2],NULL);
    hashTypeSet(o,c->argv[2],new);
    addReplyBulk(c,new);
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id);
    server.dirty++;

    /* Always replicate HINCRBYFLOAT as an HSET command with the final value
     * in order to make sure that differences in float pricision or formatting
     * will not create differences in replicas or after an AOF restart. */
    aux = createStringObject("HSET",4);
    rewriteClientCommandArgument(c,0,aux);
    decrRefCount(aux);
    rewriteClientCommandArgument(c,3,new);
    decrRefCount(new);
}

3.8 HLEN

该命令用于获取指定key的链表长度:

void hlenCommand(redisClient *c) {
    robj *o;
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,REDIS_HASH)) return;
 
    addReplyLongLong(c,hashTypeLength(o));
}
unsigned long hashTypeLength(robj *o) {
    unsigned long length = ULONG_MAX;
 
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        length = ziplistLen(o->ptr) / 2;
    } else if (o->encoding == REDIS_ENCODING_HT) {
        length = dictSize((dict*)o->ptr);
    } else {
        redisPanic("Unknown hash encoding");
    }
 
    return length;
}

因为我们存的是<k,v>键值对,所以长度为:ziplistLen(o→ptr) / 2

3.9 HMGET

这个也很简单,无非就是遍历hget的过程:

void hmgetCommand(redisClient *c) {
    robj *o;
    int i;
 
    /* Don't abort when the key cannot be found. Non-existing keys are empty
     * hashes, where HMGET should respond with a series of null bulks. */
    o = lookupKeyRead(c->db, c->argv[1]);
    if (o != NULL && o->type != REDIS_HASH) {
        addReply(c, shared.wrongtypeerr);
        return;
    }
 
    addReplyMultiBulkLen(c, c->argc-2);
    for (i = 2; i < c->argc; i++) {
        addHashFieldToReply(c, o, c->argv[i]);
    }
}

3.10 HMSET

这是遍历hset的过程:

void hmsetCommand(redisClient *c) {
    int i;
    robj *o;
 
    if ((c->argc % 2) == 1) {
        addReplyError(c,"wrong number of arguments for HMSET");
        return;
    }
 
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    hashTypeTryConversion(o,c->argv,2,c->argc-1);
    for (i = 2; i < c->argc; i += 2) {
        hashTypeTryObjectEncoding(o,&c->argv[i], &c->argv[i+1]);
        hashTypeSet(o,c->argv[i],c->argv[i+1]);
    }
    addReply(c, shared.ok);
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
    server.dirty++;
}

3.11 HSETNX

这一步类似HSET,不过只有当filed不存在的时候才进行设置,否则不设置:

void hsetnxCommand(redisClient *c) {
    robj *o;
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    hashTypeTryConversion(o,c->argv,2,3);
 
    if (hashTypeExists(o, c->argv[2])) {
        addReply(c, shared.czero);
    } else {
        hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
        hashTypeSet(o,c->argv[2],c->argv[3]);
        addReply(c, shared.cone);
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_HASH,"hset",c->argv[1],c->db->id);
        server.dirty++;
    }
}

3.1.12 HSCAN

参考前几章中的SCAN

void hscanCommand(redisClient *c) {
    robj *o;
    unsigned long cursor;
 
    if (parseScanCursorOrReply(c,c->argv[2],&cursor) == REDIS_ERR) return;
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
        checkType(c,o,REDIS_HASH)) return;
    scanGenericCommand(c,o,cursor);
}

4.hash表原理总结

下图为hashtable中的链表未超过server.hash_max_ziplist_value时的结构:

否则将会转换成dict:

void hashTypeConvertZiplist(robj *o, int enc) {
    redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
 
    if (enc == REDIS_ENCODING_ZIPLIST) {
        /* Nothing to do... */
 
    } else if (enc == REDIS_ENCODING_HT) {
        hashTypeIterator *hi;
        dict *dict;
        int ret;
 
        hi = hashTypeInitIterator(o);
        dict = dictCreate(&hashDictType, NULL);
 
        while (hashTypeNext(hi) != REDIS_ERR) {
            robj *field, *value;
 
            field = hashTypeCurrentObject(hi, REDIS_HASH_KEY);
            field = tryObjectEncoding(field);
            value = hashTypeCurrentObject(hi, REDIS_HASH_VALUE);
            value = tryObjectEncoding(value);
            ret = dictAdd(dict, field, value);
            if (ret != DICT_OK) {
                redisLogHexDump(REDIS_WARNING,"ziplist with dup elements dump",
                    o->ptr,ziplistBlobLen(o->ptr));
                redisAssert(ret == DICT_OK);
            }
        }
 
        hashTypeReleaseIterator(hi);
        zfree(o->ptr);
 
        o->encoding = REDIS_ENCODING_HT;
        o->ptr = dict;
 
    } else {
        redisPanic("Unknown hash encoding");
    }
}
about_redis_8.txt · 最后更改: 2018/10/14 15:31 (外部编辑)