作者:陈科
联系方式:chenke1818@gmail.com
转载请说明出处:http://www.dumpcache.com/wiki/doku.php?id=about_redis_18
同步分为客户端请求和服务器处理,我们分别来分析:
这里的客户端是指slave,在启动的时候serverCron会调用:replicationCron函数
/* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ run_with_period(1000) replicationCron();
下面为replicationCron函数,每秒被调用一次:
void replicationCron(void) { /* 和master连接超时处理 */ if (server.masterhost && (server.repl_state == REDIS_REPL_CONNECTING || server.repl_state == REDIS_REPL_RECEIVE_PONG) && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { redisLog(REDIS_WARNING,"Timeout connecting to the MASTER..."); undoConnectWithMaster(); } /* 从master接收rdb数据超时,调用replicationAbortSyncTransfer取消下载 */ if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER && (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); replicationAbortSyncTransfer(); } /* master没有任何响应了:freeClient */ if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED && (time(NULL)-server.master->lastinteraction) > server.repl_timeout) { redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received..."); freeClient(server.master); } /* connectWithMaster进行同步命令发送操作 */ if (server.repl_state == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); } } /* slave在psync状态发送ack给master */ if (server.masterhost && server.master && !(server.master->flags & REDIS_PRE_PSYNC)) replicationSendAck(); /* ping处理 */ if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { listIter li; listNode *ln; robj *ping_argv[1]; /* First, send PING */ ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); /* Second, send a newline to all the slaves in pre-synchronization * stage, that is, slaves waiting for the master to create the RDB file. * The newline will be ignored by the slave but will refresh the * last-io timer preventing a timeout. */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START || (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END && server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET)) { if (write(slave->fd, "\n", 1) == -1) { /* Don't worry, it's just a ping. */ } } } } /* master释放超时的slave连接 */ if (listLength(server.slaves)) { listIter li; listNode *ln; listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate != REDIS_REPL_ONLINE) continue; if (slave->flags & REDIS_PRE_PSYNC) continue; if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s", replicationGetSlaveName(slave)); freeClient(slave); } } } /* 如果master没有连接任何slave,释放repl_backlog的增量日志缓存 */ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && server.repl_backlog) { time_t idle = server.unixtime - server.repl_no_slaves_since; if (idle > server.repl_backlog_time_limit) { freeReplicationBacklog(); redisLog(REDIS_NOTICE, "Replication backlog freed after %d seconds " "without connected slaves.", (int) server.repl_backlog_time_limit); } } /* If AOF is disabled and we no longer have attached slaves, we can * free our Replication Script Cache as there is no need to propagate * EVALSHA at all. */ if (listLength(server.slaves) == 0 && server.aof_state == REDIS_AOF_OFF && listLength(server.repl_scriptcache_fifo) != 0) { replicationScriptCacheFlush(); } /* 如果master处于WAIT_BGSAVE_START状态,则先进行BGSAVE操作 */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; int slaves_waiting = 0; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { idle = server.unixtime - slave->lastinteraction; if (idle > max_idle) max_idle = idle; slaves_waiting++; } } if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) { /* Start a BGSAVE. Usually with socket target, or with disk target * if there was a recent socket -> disk config change. */ if (startBgsaveForReplication() == REDIS_OK) { /* It started! We need to change the state of slaves * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case * the current target is disk. Otherwise it was already done * by rdbSaveToSlavesSockets() which is called by * startBgsaveForReplication(). */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } } } } /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); }
从上面函数可以看到,slave同步的最主要操作为connectWithMaster:
int connectWithMaster(void) { int fd; fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; server.repl_state = REDIS_REPL_CONNECTING; return REDIS_OK; }
注册的回调函数为:syncWithMaster
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err; int dfd, maxtries = 5; int sockerr = 0, psync_result; socklen_t errlen = sizeof(sockerr); REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); REDIS_NOTUSED(mask); /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REDIS_REPL_NONE) { close(fd); return; } /* 检测socket连接 */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s", strerror(sockerr)); goto error; } /* 发送ping */ if (server.repl_state == REDIS_REPL_CONNECTING) { redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(server.el,fd,AE_WRITABLE); server.repl_state = REDIS_REPL_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ syncWrite(fd,"PING\r\n",6,100); return; } /*接收pong*/ if (server.repl_state == REDIS_REPL_RECEIVE_PONG) { char buf[1024]; /* Delete the readable event, we no longer need it now that there is * the PING reply to read. */ aeDeleteFileEvent(server.el,fd,AE_READABLE); /* Read the reply with explicit timeout. */ buf[0] = '\0'; if (syncReadLine(fd,buf,sizeof(buf), server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING, "I/O error reading PING reply from master: %s", strerror(errno)); goto error; } /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ if (buf[0] != '+' && strncmp(buf,"-NOAUTH",7) != 0 && strncmp(buf,"-ERR operation not permitted",28) != 0) { redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); goto error; } else { redisLog(REDIS_NOTICE, "Master replied to PING, replication can continue..."); } } /* AUTH with the master if required. */ if(server.masterauth) { err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); if (err[0] == '-') { redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ { sds port = sdsfromlonglong(server.port); err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, NULL); sdsfree(port); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err); } sdsfree(err); } /* slaveTryPartialResynchronization函数发送psync命令把runid和offset发送给master */ psync_result = slaveTryPartialResynchronization(fd); if (psync_result == PSYNC_CONTINUE) { redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization."); return; } /* 如果不支持psync增量同步,则发送sync进行全量同步 */ if (psync_result == PSYNC_NOT_SUPPORTED) { redisLog(REDIS_NOTICE,"Retrying with SYNC..."); if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno)); goto error; } /* Setup the non blocking download of the bulk file. */ if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { redisLog(REDIS_WARNING, "Can't create readable event for SYNC: %s (fd=%d)", strerror(errno),fd); goto error; } server.repl_state = REDIS_REPL_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: close(fd); server.repl_transfer_s = -1; server.repl_state = REDIS_REPL_CONNECT; return; }
客户端的主要工作就是:
1.假如支持psync则发送runid和offset给客户端
2.每个client都会保存cached_master,这样假如链接断开再次连上,只要服务器不重启runid和offset还能恢复
3.假如不支持psync则发送sync进行全量同步
服务器的实现主要由syncCommand函数来完成,该函数用来接收psync和sync命令:
void syncCommand(redisClient *c) { /* slave不接受sync或psync处理 */ if (c->flags & REDIS_SLAVE) return; /* slave同时状态不为:REDIS_REPL_CONNECTED,不接受sync或psync处理 */ if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { addReplyError(c,"Can't SYNC while not connected with my master"); return; } /*如果服务器还在接受请求处理,那么不能进行sync或psync */ if (listLength(c->reply) != 0 || c->bufpos != 0) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } redisLog(REDIS_NOTICE,"Slave %s asks for synchronization", replicationGetSlaveName(c)); /*增量同步处理,masterTryPartialResynchronization把repl_backlog中的命令发给客户端 */ if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == REDIS_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_runid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * runid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ if (master_runid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= REDIS_PRE_PSYNC; } //下面进行全量同步 /* Full resynchronization. */ server.stat_sync_full++; /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ if (server.rdb_child_pid != -1 && server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ redisClient *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; } if (ln) { /* 如果其他slave内存中已经有了rdb的数据,则拷贝过来 */ copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else if (server.rdb_child_pid != -1 && server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } else { if (server.repl_diskless_sync) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; if (server.repl_diskless_sync_delay) redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC"); } else { /* 进行bgsave处理 */ if (startBgsaveForReplication() != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } } if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= REDIS_SLAVE; server.slaveseldb = -1; /* Force to re-emit the SELECT command. */ listAddNodeTail(server.slaves,c); if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) createReplicationBacklog(); return; }
上面的主要过程就是:
1.假如psync则进行runid和offset的校验,并且发送新的给客户端.
2.psync最后把repl_backlog的增量数据发送给客户端
3.如果全量同步,则bgsave
4.bgsave后服务器状态变为:REDIS_REPL_WAIT_BGSAVE_END, backgroundSaveDoneHandler会被serverCron调用发送rdb给客户端。
注意,如果全量同步的时候客户端连接过多,copyClientOutputBuffer可能会引起内存爆炸。