本章我们来分析下和string相关的命令,在分析之前我们先来看下string的内部实现sds,这是redis保存string的基本数据结构:
typedef char *sds; struct sdshdr { unsigned int len; unsigned int free; char buf[]; };
redis的所有数据最基本的结构就是sds,所以我们来追述下它的来龙去脉,首先我们看客户端初始化的过程:
acceptTcpHandler→acceptCommonHandler→createClient,我们看createClient函数的代码:
redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the Redis commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } selectDb(c,0); c->id = server.next_client_id++; c->fd = fd; c->name = NULL; c->bufpos = 0; c->querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); c->bpop.keys = dictCreate(&setDictType,NULL); c->bpop.timeout = 0; c->bpop.target = NULL; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); c->peerid = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) listAddNodeTail(server.clients,c); initClientMultiState(c); return c; }
上面主要关注:
c->querybuf = sdsempty();
我们终于弄明白了client的querybuf是如何初始化了。接下来再看sdsempty的实现:
sds sdsempty(void) { return sdsnewlen("",0); }
其实就sdsnewlen函数创建了一个空字符串,再来看sdsnewlen:
sds sdsnewlen(const void *init, size_t initlen) { struct sdshdr *sh; if (init) { sh = zmalloc(sizeof(struct sdshdr)+initlen+1); } else { sh = zcalloc(sizeof(struct sdshdr)+initlen+1); } if (sh == NULL) return NULL; sh->len = initlen; sh->free = 0; if (initlen && init) memcpy(sh->buf, init, initlen); sh->buf[initlen] = '\0'; return (char*)sh->buf; }
这回我们终于弄明白了,我们来画一下原理图:
另外我们再看下sdsMakeRoomFor函数,此函数用于字符串空间的分配和扩展:
sds sdsMakeRoomFor(sds s, size_t addlen) { struct sdshdr *sh, *newsh; size_t free = sdsavail(s); size_t len, newlen; if (free >= addlen) return s; len = sdslen(s); sh = (void*) (s-(sizeof(struct sdshdr))); newlen = (len+addlen); if (newlen < SDS_MAX_PREALLOC) newlen *= 2; else newlen += SDS_MAX_PREALLOC; newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1); if (newsh == NULL) return NULL; newsh->free = newlen - len; return newsh->buf; }
注意:这里可以看到假如要分配分内存最终小于:SDS_MAX_PREALLOC(1024*1024),那么将会提前预分配2倍的内存。
这几个命令最终都调用了setGenericCommand函数,至于命令如何使用,我不再过多解释了,直接看redis的官方手册吧,我复制黏贴也没什么意思,http://redis.io/commands :
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */ void setCommand(redisClient *c) { int j; robj *expire = NULL; int unit = UNIT_SECONDS; int flags = REDIS_SET_NO_FLAGS; for (j = 3; j < c->argc; j++) { char *a = c->argv[j]->ptr; robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; if ((a[0] == 'n' || a[0] == 'N') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') { flags |= REDIS_SET_NX; } else if ((a[0] == 'x' || a[0] == 'X') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') { flags |= REDIS_SET_XX; } else if ((a[0] == 'e' || a[0] == 'E') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) { unit = UNIT_SECONDS; expire = next; j++; } else if ((a[0] == 'p' || a[0] == 'P') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) { unit = UNIT_MILLISECONDS; expire = next; j++; } else { addReply(c,shared.syntaxerr); return; } } c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); } void setnxCommand(redisClient *c) { c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero); } void setexCommand(redisClient *c) { c->argv[3] = tryObjectEncoding(c->argv[3]); setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL); } void psetexCommand(redisClient *c) { c->argv[3] = tryObjectEncoding(c->argv[3]); setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL); }
我们再来看setGenericCommand函数:
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } setKey(c->db,key,val); server.dirty++; if (expire) setExpire(c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC, "expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); }
该函数执行过程如下:
1.提供过期时间赋值给milliseconds
2.如果时间单位为秒(UNIT_SECONDS),则:milliseconds *= 1000
3.如果使用了NX选项并且可以查找到指定的key或者使用了XX选项并且查找不到指定的key,那么回复客户端abort_reply 或 shared.nullbulk
4.setKey(c→db,key,val)把(k→v)存入到指定的数据库中。
5.server.dirty++;
6.如果有过期时间则设置过期时间
7.notifyKeyspaceEvent
8.响应客户端:ok_reply 或 shared.ok
命令格式为:
SETBIT key offset bitvalue
void setbitCommand(redisClient *c) { robj *o; char *err = "bit is not an integer or out of range"; size_t bitoffset; int byte, bit; int byteval, bitval; long on; if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset) != REDIS_OK) return; if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != REDIS_OK) return; /* Bits can only be set or cleared... */ if (on & ~1) { addReplyError(c,err); return; } o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { o = createObject(REDIS_STRING,sdsempty()); dbAdd(c->db,c->argv[1],o); } else { if (checkType(c,o,REDIS_STRING)) return; o = dbUnshareStringValue(c->db,c->argv[1],o); } /* Grow sds value to the right length if necessary */ byte = bitoffset >> 3; o->ptr = sdsgrowzero(o->ptr,byte+1); /* Get current values */ byteval = ((uint8_t*)o->ptr)[byte]; bit = 7 - (bitoffset & 0x7); bitval = byteval & (1 << bit); /* Update byte with new bit value and return original value */ byteval &= ~(1 << bit); byteval |= ((on & 0x1) << bit); ((uint8_t*)o->ptr)[byte] = byteval; signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"setbit",c->argv[1],c->db->id); server.dirty++; addReply(c, bitval ? shared.cone : shared.czero); }
该函数执行过程如下:
1.获取bitoffset的值
2.获取要设置的值给on
3.假如on的值非0或者1(on & ~1)那么,返回客户端err(bit is not an integer or out of range)
4.查找key对应的对象,如不存在则创建一个。
5.如果值不是string类型,则返回客户端wrongtypeerr。如果对象的编码类型不是REDIS_ENCODING_RAW或者refcount>1,那么将会解码对象并且将对转换成string类型,同时decrRefCount,并且覆盖db中的对象。
6.如果对象字符串长度不够,则进行扩展sdsgrowzero
7.获得当前需要设置位的值:
byteval = ((uint8_t*)o->ptr)[byte]; bit = 7 - (bitoffset & 0x7); bitval = byteval & (1 << bit);
8.把on赋给该bit位:
byteval &= ~(1 << bit); byteval |= ((on & 0x1) << bit); ((uint8_t*)o->ptr)[byte] = byteval;
9.signalModifiedKey
10.notifyKeyspaceEvent
11.server.dirty++
12.如果设置为1则返回客户端1,否则返回0.
这是SETBIT的一个反过程,只要理解了SETBIT这个就很容易理解:
/* GETBIT key offset */ void getbitCommand(redisClient *c) { robj *o; char llbuf[32]; size_t bitoffset; size_t byte, bit; size_t bitval = 0; if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset) != REDIS_OK) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_STRING)) return; byte = bitoffset >> 3; bit = 7 - (bitoffset & 0x7); if (o->encoding != REDIS_ENCODING_RAW) { if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)o->ptr)) bitval = llbuf[byte] & (1 << bit); } else { if (byte < sdslen(o->ptr)) bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit); } addReply(c, bitval ? shared.cone : shared.czero); }
该命令用于计算被设置为1的位数总共有几个。
格式为:
BITCOUNT key [start end]
代码如下:
/* BITCOUNT key [start end] */ void bitcountCommand(redisClient *c) { robj *o; long start, end, strlen; unsigned char *p; char llbuf[32]; /* Lookup, check for type, and return 0 for non existing keys. */ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_STRING)) return; /* Set the 'p' pointer to the string, that can be just a stack allocated * array if our string was integer encoded. */ if (o->encoding == REDIS_ENCODING_INT) { p = (unsigned char*) llbuf; strlen = ll2string(llbuf,sizeof(llbuf),(long)o->ptr); } else { p = (unsigned char*) o->ptr; strlen = sdslen(o->ptr); } /* Parse start/end range if any. */ if (c->argc == 4) { if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != REDIS_OK) return; if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != REDIS_OK) return; /* Convert negative indexes */ if (start < 0) start = strlen+start; if (end < 0) end = strlen+end; if (start < 0) start = 0; if (end < 0) end = 0; if (end >= strlen) end = strlen-1; } else if (c->argc == 2) { /* The whole string. */ start = 0; end = strlen-1; } else { /* Syntax error. */ addReply(c,shared.syntaxerr); return; } /* Precondition: end >= 0 && end < strlen, so the only condition where * zero can be returned is: start > end. */ if (start > end) { addReply(c,shared.czero); } else { long bytes = end-start+1; addReplyLongLong(c,redisPopcount(p+start,bytes)); } }
格式为:
BITOP op_name target_key src_key1 src_key2 src_key3 … src_keyN
该方法对指定的srckey做位操作,并把结果保存到target_key中。
op_name为:
and
or
xor
not
代码如下:
/* BITOP op_name target_key src_key1 src_key2 src_key3 ... src_keyN */ void bitopCommand(redisClient *c) { char *opname = c->argv[1]->ptr; robj *o, *targetkey = c->argv[2]; unsigned long op, j, numkeys; robj **objects; /* Array of source objects. */ unsigned char **src; /* Array of source strings pointers. */ unsigned long *len, maxlen = 0; /* Array of length of src strings, and max len. */ unsigned long minlen = 0; /* Min len among the input keys. */ unsigned char *res = NULL; /* Resulting string. */ /* Parse the operation name. */ if ((opname[0] == 'a' || opname[0] == 'A') && !strcasecmp(opname,"and")) op = BITOP_AND; else if((opname[0] == 'o' || opname[0] == 'O') && !strcasecmp(opname,"or")) op = BITOP_OR; else if((opname[0] == 'x' || opname[0] == 'X') && !strcasecmp(opname,"xor")) op = BITOP_XOR; else if((opname[0] == 'n' || opname[0] == 'N') && !strcasecmp(opname,"not")) op = BITOP_NOT; else { addReply(c,shared.syntaxerr); return; } /* Sanity check: NOT accepts only a single key argument. */ if (op == BITOP_NOT && c->argc != 4) { addReplyError(c,"BITOP NOT must be called with a single source key."); return; } /* Lookup keys, and store pointers to the string objects into an array. */ numkeys = c->argc - 3; src = zmalloc(sizeof(unsigned char*) * numkeys); len = zmalloc(sizeof(long) * numkeys); objects = zmalloc(sizeof(robj*) * numkeys); for (j = 0; j < numkeys; j++) { o = lookupKeyRead(c->db,c->argv[j+3]); /* Handle non-existing keys as empty strings. */ if (o == NULL) { objects[j] = NULL; src[j] = NULL; len[j] = 0; minlen = 0; continue; } /* Return an error if one of the keys is not a string. */ if (checkType(c,o,REDIS_STRING)) { unsigned long i; for (i = 0; i < j; i++) { if (objects[i]) decrRefCount(objects[i]); } zfree(src); zfree(len); zfree(objects); return; } objects[j] = getDecodedObject(o); src[j] = objects[j]->ptr; len[j] = sdslen(objects[j]->ptr); if (len[j] > maxlen) maxlen = len[j]; if (j == 0 || len[j] < minlen) minlen = len[j]; } /* Compute the bit operation, if at least one string is not empty. */ if (maxlen) { res = (unsigned char*) sdsnewlen(NULL,maxlen); unsigned char output, byte; unsigned long i; /* Fast path: as far as we have data for all the input bitmaps we * can take a fast path that performs much better than the * vanilla algorithm. */ j = 0; if (minlen && numkeys <= 16) { unsigned long *lp[16]; unsigned long *lres = (unsigned long*) res; /* Note: sds pointer is always aligned to 8 byte boundary. */ memcpy(lp,src,sizeof(unsigned long*)*numkeys); memcpy(res,src[0],minlen); /* Different branches per different operations for speed (sorry). */ if (op == BITOP_AND) { while(minlen >= sizeof(unsigned long)*4) { for (i = 1; i < numkeys; i++) { lres[0] &= lp[i][0]; lres[1] &= lp[i][1]; lres[2] &= lp[i][2]; lres[3] &= lp[i][3]; lp[i]+=4; } lres+=4; j += sizeof(unsigned long)*4; minlen -= sizeof(unsigned long)*4; } } else if (op == BITOP_OR) { while(minlen >= sizeof(unsigned long)*4) { for (i = 1; i < numkeys; i++) { lres[0] |= lp[i][0]; lres[1] |= lp[i][1]; lres[2] |= lp[i][2]; lres[3] |= lp[i][3]; lp[i]+=4; } lres+=4; j += sizeof(unsigned long)*4; minlen -= sizeof(unsigned long)*4; } } else if (op == BITOP_XOR) { while(minlen >= sizeof(unsigned long)*4) { for (i = 1; i < numkeys; i++) { lres[0] ^= lp[i][0]; lres[1] ^= lp[i][1]; lres[2] ^= lp[i][2]; lres[3] ^= lp[i][3]; lp[i]+=4; } lres+=4; j += sizeof(unsigned long)*4; minlen -= sizeof(unsigned long)*4; } } else if (op == BITOP_NOT) { while(minlen >= sizeof(unsigned long)*4) { lres[0] = ~lres[0]; lres[1] = ~lres[1]; lres[2] = ~lres[2]; lres[3] = ~lres[3]; lres+=4; j += sizeof(unsigned long)*4; minlen -= sizeof(unsigned long)*4; } } } /* j is set to the next byte to process by the previous loop. */ for (; j < maxlen; j++) { output = (len[0] <= j) ? 0 : src[0][j]; if (op == BITOP_NOT) output = ~output; for (i = 1; i < numkeys; i++) { byte = (len[i] <= j) ? 0 : src[i][j]; switch(op) { case BITOP_AND: output &= byte; break; case BITOP_OR: output |= byte; break; case BITOP_XOR: output ^= byte; break; } } res[j] = output; } } for (j = 0; j < numkeys; j++) { if (objects[j]) decrRefCount(objects[j]); } zfree(src); zfree(len); zfree(objects); /* Store the computed value into the target key */ if (maxlen) { o = createObject(REDIS_STRING,res); setKey(c->db,targetkey,o); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",targetkey,c->db->id); decrRefCount(o); } else if (dbDelete(c->db,targetkey)) { signalModifiedKey(c->db,targetkey); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",targetkey,c->db->id); } server.dirty++; addReplyLongLong(c,maxlen); /* Return the output string length in bytes. */ }
此函数执行过程如下:
1.解析operation name是否在and/or/xor/not这4种类型,否则返回syntaxerr给客户端。
2.如果是not类型的,那么只支持一个src_key,否则返回客户端:BITOP NOT must be called with a single source key.
3.查找src_key并且保存成src数组
4.进行位运算
5.讲计算结果res存到target_key中,假如src_key不存在则删除target_key
命令格式:
SETRANGE key offset value
用 value 参数覆写(overwrite)给定 key 所储存的字符串值,从偏移量 offset 开始。
void setrangeCommand(redisClient *c) { robj *o; long offset; sds value = c->argv[3]->ptr; if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != REDIS_OK) return; if (offset < 0) { addReplyError(c,"offset is out of range"); return; } o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Return 0 when setting nothing on a non-existing string */ if (sdslen(value) == 0) { addReply(c,shared.czero); return; } /* Return when the resulting string exceeds allowed size */ if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK) return; o = createObject(REDIS_STRING,sdsempty()); dbAdd(c->db,c->argv[1],o); } else { size_t olen; /* Key exists, check type */ if (checkType(c,o,REDIS_STRING)) return; /* Return existing string length when setting nothing */ olen = stringObjectLen(o); if (sdslen(value) == 0) { addReplyLongLong(c,olen); return; } /* Return when the resulting string exceeds allowed size */ if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK) return; /* Create a copy when the object is shared or encoded. */ o = dbUnshareStringValue(c->db,c->argv[1],o); } if (sdslen(value) > 0) { o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value)); memcpy((char*)o->ptr+offset,value,sdslen(value)); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING, "setrange",c->argv[1],c->db->id); server.dirty++; } addReplyLongLong(c,sdslen(o->ptr)); }
执行过程如下:
1.获取value
2.获取偏移量
3.如果offset<0则返回客户端:offset is out of range
4.查找key对应的对象
5.如果key的对象不存在则创建一个空的,如果要设置的value为空,则返回客户端0.
6.如果要设置的对象不是REDIS_STRING类型,直接返回。
7.根据偏移量进行值的设置。
8.signalModifiedKey¬ifyKeyspaceEvent&server.dirty++
9.返回客户端字符串的长度。
这个比较简单,获取字符串的长度:
void strlenCommand(redisClient *c) { robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_STRING)) return; addReplyLongLong(c,stringObjectLen(o)); }
再看stringObjectLen:
size_t stringObjectLen(robj *o) { redisAssertWithInfo(NULL,o,o->type == REDIS_STRING); if (o->encoding == REDIS_ENCODING_RAW) { return sdslen(o->ptr); } else { char buf[32]; return ll2string(buf,32,(long)o->ptr); } }
可见假如o→encoding 不是REDIS_ENCODING_RAW编码类型的,则会转化成32位的整数。
这个也比较简单:
void appendCommand(redisClient *c) { size_t totlen; robj *o, *append; o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Create the key */ c->argv[2] = tryObjectEncoding(c->argv[2]); dbAdd(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); } else { /* Key exists, check type */ if (checkType(c,o,REDIS_STRING)) return; /* "append" is an argument, so always an sds */ append = c->argv[2]; totlen = stringObjectLen(o)+sdslen(append->ptr); if (checkStringLength(c,totlen) != REDIS_OK) return; /* Append the value */ o = dbUnshareStringValue(c->db,c->argv[1],o); o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr)); totlen = sdslen(o->ptr); } signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"append",c->argv[1],c->db->id); server.dirty++; addReplyLongLong(c,totlen); }
最终通过sdscatlen来做字符串拼接:
sds sdscatlen(sds s, const void *t, size_t len) { struct sdshdr *sh; size_t curlen = sdslen(s); s = sdsMakeRoomFor(s,len); if (s == NULL) return NULL; sh = (void*) (s-(sizeof(struct sdshdr))); memcpy(s+curlen, t, len); sh->len = curlen+len; sh->free = sh->free-len; s[curlen+len] = '\0'; return s; }
这几个原理相似,都是调用了incrDecrCommand函数:
void incrCommand(redisClient *c) { incrDecrCommand(c,1); } void decrCommand(redisClient *c) { incrDecrCommand(c,-1); } void incrbyCommand(redisClient *c) { long long incr; if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != REDIS_OK) return; incrDecrCommand(c,incr); }
我们再来看incrDecrCommand函数执行过程:
void incrDecrCommand(redisClient *c, long long incr) { long long value, oldvalue; robj *o, *new; o = lookupKeyWrite(c->db,c->argv[1]); if (o != NULL && checkType(c,o,REDIS_STRING)) return; if (getLongLongFromObjectOrReply(c,o,&value,NULL) != REDIS_OK) return; oldvalue = value; if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) { addReplyError(c,"increment or decrement would overflow"); return; } value += incr; new = createStringObjectFromLongLong(value); if (o) dbOverwrite(c->db,c->argv[1],new); else dbAdd(c->db,c->argv[1],new); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf); }
1.假如溢出了则返回客户端:increment or decrement would overflow
2.value += incr;
3.覆盖或者添加新值给指定的key.
4.signalModifiedKey¬ifyKeyspaceEvent&server.dirty++
5.返回客户端:“:value\r\n”
这两个函数最终都调用了getGenericCommand:
void getCommand(redisClient *c) { getGenericCommand(c); } void getsetCommand(redisClient *c) { if (getGenericCommand(c) == REDIS_ERR) return; c->argv[2] = tryObjectEncoding(c->argv[2]); setKey(c->db,c->argv[1],c->argv[2]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[1],c->db->id); server.dirty++; }
我们再看getGenericCommand:
int getGenericCommand(redisClient *c) { robj *o; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return REDIS_OK; if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); return REDIS_ERR; } else { addReplyBulk(c,o); return REDIS_OK; } }
void mgetCommand(redisClient *c) { int j; addReplyMultiBulkLen(c,c->argc-1); for (j = 1; j < c->argc; j++) { robj *o = lookupKeyRead(c->db,c->argv[j]); if (o == NULL) { addReply(c,shared.nullbulk); } else { if (o->type != REDIS_STRING) { addReply(c,shared.nullbulk); } else { addReplyBulk(c,o); } } } }
其实就是一个遍历get的过程
这两个函数都调用了:msetGenericCommand
void msetCommand(redisClient *c) { msetGenericCommand(c,0); } void msetnxCommand(redisClient *c) { msetGenericCommand(c,1); }
我们再看msetGenericCommand:
void msetGenericCommand(redisClient *c, int nx) { int j, busykeys = 0; if ((c->argc % 2) == 0) { addReplyError(c,"wrong number of arguments for MSET"); return; } /* Handle the NX flag. The MSETNX semantic is to return zero and don't * set nothing at all if at least one already key exists. */ if (nx) { for (j = 1; j < c->argc; j += 2) { if (lookupKeyWrite(c->db,c->argv[j]) != NULL) { busykeys++; } } if (busykeys) { addReply(c, shared.czero); return; } } for (j = 1; j < c->argc; j += 2) { c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); setKey(c->db,c->argv[j],c->argv[j+1]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id); } server.dirty += (c->argc-1)/2; addReply(c, nx ? shared.cone : shared.ok); }
唯一的区别就是假如设置了NX标志位,那么假如一旦存在已存在的健值对象,那么则返回客户端0.
这是setrange的反过程,用于获取指定偏移的字符串的数据:
void getrangeCommand(redisClient *c) { robj *o; long long start, end; char *str, llbuf[32]; size_t strlen; if (getLongLongFromObjectOrReply(c,c->argv[2],&start,NULL) != REDIS_OK) return; if (getLongLongFromObjectOrReply(c,c->argv[3],&end,NULL) != REDIS_OK) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptybulk)) == NULL || checkType(c,o,REDIS_STRING)) return; if (o->encoding == REDIS_ENCODING_INT) { str = llbuf; strlen = ll2string(llbuf,sizeof(llbuf),(long)o->ptr); } else { str = o->ptr; strlen = sdslen(str); } /* Convert negative indexes */ if (start < 0) start = strlen+start; if (end < 0) end = strlen+end; if (start < 0) start = 0; if (end < 0) end = 0; if ((unsigned long long)end >= strlen) end = strlen-1; /* Precondition: end >= 0 && end < strlen, so the only condition where * nothing can be returned is: start > end. */ if (start > end || strlen == 0) { addReply(c,shared.emptybulk); } else { addReplyBulkCBuffer(c,(char*)str+start,end-start+1); } }
如果对象编码是REDIS_ENCODING_INT类型的,强制转换成字符串。
用于给指定key的值增加浮点增量:
void incrbyfloatCommand(redisClient *c) { long double incr, value; robj *o, *new, *aux; o = lookupKeyWrite(c->db,c->argv[1]); if (o != NULL && checkType(c,o,REDIS_STRING)) return; if (getLongDoubleFromObjectOrReply(c,o,&value,NULL) != REDIS_OK || getLongDoubleFromObjectOrReply(c,c->argv[2],&incr,NULL) != REDIS_OK) return; value += incr; if (isnan(value) || isinf(value)) { addReplyError(c,"increment would produce NaN or Infinity"); return; } new = createStringObjectFromLongDouble(value,1); if (o) dbOverwrite(c->db,c->argv[1],new); else dbAdd(c->db,c->argv[1],new); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id); server.dirty++; addReplyBulk(c,new); /* Always replicate INCRBYFLOAT as a SET command with the final value * in order to make sure that differences in float precision or formatting * will not create differences in replicas or after an AOF restart. */ aux = createStringObject("SET",3); rewriteClientCommandArgument(c,0,aux); decrRefCount(aux); rewriteClientCommandArgument(c,2,new); }
因为浮点数计算具有不确定性,所以新的值直接转成SET命令,通过rewriteClientCommandArgument改变命令的参数来让aof和slave更新新值。