Redis 2.8版部分同步功能源碼淺析-Replication Partial Resynchronization


前面的2篇文章分別介紹了Redis主從同步源碼淺析-Master端 以及 Redis主從同步源碼淺析-Slave端 相關的代碼實現,從中我們可以看出redis主從同步的一個最大的缺點,也是阻礙大數據應用的地方便是其每次連接端開都需要重連master進行全量數據的重新同步,這個代價是可想而知的。

長連接斷開在線上環境中出現得很頻繁,如果需要重新同步所有RDB文件,幾十G的文件,從建立RDB快照,發送文件內容到slave,然后slave執行命令一一加載進內存中,這個時間開銷估計也得好幾個小時,更別說樹形結構的master->slave->slave, 對網卡的壓力,對服務器的壓力都是很恐怖的。從這方面來說,動輒幾個小時甚至一天的修復時間,沒人敢用Redis主從同步在生產環境中使用。

但是福音來了:即將(2013年第三季度)發布的2.8版本會解決這個問題,通過:Replication partial resynchronization 的方式,也就是部分重新同步,這里就說部分同步吧,注意不是常規情況下的新寫入指令同步。

具體的增量同步功能請看作者在剛開始的想法(Designing Redis replication partial resync) 和 中間的(Partial resyncs and synchronous replication.) 以及最后的想法(PSYNC),從這里可以知道redis的部分同步功能很詳細的解說。所以就不多說了,只是下面簡單總結一下方便后面分析代碼。

注意本文列出的代碼是目前的最新代碼,不是2.8版本的代碼·https://github.com/antirez/redis

零、Partial Resynchronization介紹

為了避免每次重連都需要重新全量同步RDB文件,redis采用類似mysql的backlog的方式,允許slave在一定的時間內進行部分同步,只同步自己需要的部分回去,已經有的不需要同步了。注意如果重啟了,那還是得重新同步,這個其實也有點悲劇,不知道后續會不會加入這個功能,實現也不太難的。

簡單來講,用口語就是:

  1. 對於slave :master兄,我剛剛走神了斷了連接,得重新找你同步一下。如果你還是昔日的那個replrunid, 我剛才同步到的位置是這里reploff,如果還有機會請把我落下的數據馬上發給我一下;否則請給我全部RDB文件;
  2. 對於master: slave們,如果你斷了連接,請最好給我你剛才記着的runid和你算的同步到的位置發送給我,我看看是不是可以只讓你同步你落下的部分;否則你得全量同步RDB文件。

根據這個設計,可想而知,master必須記住一定數目的backlog,也就是記住一段時間內的發送給slave們的命令列表,以及其起始,結束為止。slave必須在連接端開的時候記着自己同步到了什么位置,重連的時候用這位置去問master,自己是否還有機會趕上來。

一、SLAVE發起部分同步請求

大部分跟之前的2.6版本同步差不多:

  1. 標記server.repl_state為 REDIS_REPL_CONNECT狀態;
  2. replicationCron定時任務檢測到調用connectWithMaster函數連接master;
  3. slave連接成功調用syncWithMaster,發送PING指令;
  4. slave發送SYNC指令通知master做RDB快照;
  5. 接收master的RDB快照文件;
  6. 加載新數據;

在2.8版本部分同步的時候,將上面的第4步修改了,加入了發送PSYNC指令嘗試部分同步的功能。調用slaveTryPartialResynchronization函數嘗試部分同步,如果發現master不認識這個指令,那就沒辦法了,再次發送SYNC進行全量同步。

1 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
2 //·····
3     /* Try a partial resynchonization. If we don't have a cached master
4      * slaveTryPartialResynchronization() will at least try to use PSYNC
5      * to start a full resynchronization so that we get the master run id
6      * and the global offset, to try a partial resync at the next
7      * reconnection attempt. */
8     psync_result = slaveTryPartialResynchronization(fd);
9     if (psync_result == PSYNC_CONTINUE) {
10         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
11         return;
12     }
13  
14     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
15      * and the server.repl_master_runid and repl_master_initial_offset are
16      * already populated. */
17     if (psync_result == PSYNC_NOT_SUPPORTED) {
18         redisLog(REDIS_NOTICE,"Retrying with SYNC...");
19         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
20             redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
21                 strerror(errno));
22             goto error;
23         }
24     }

slaveTryPartialResynchronization是像master發送PSYNC指令的地方。PSYNC指令的語法為:PSYNC runid psync_offset 。下面解釋一下2個參數的含義。

runid就是master告訴slave的一串字符串,用來記錄master的實例,避免master重啟后,同步錯誤的情況,這個值是master在slave第一次同步的時候告訴他的,且一直不變直到master重啟;

psync_offset這個參數就是slave當前同步到的數據位置,實際上是同步了多少數據,以字節為單位。master根據這個來決定是否可以增量同步以及發送哪些數據給slave。第一次同步的時候master會告訴他的。以后slave每次收到從master過來的連接后,都會增加讀取的數據長度 到這個值,保存在c->reploff上面。
下面是發送PSYNC指令的代碼。

1 int slaveTryPartialResynchronization(int fd) {
2     char *psync_runid;
3     char psync_offset[32];
4     sds reply;
5  
6     if (server.cached_master) {
7         psync_runid = server.cached_master->replrunid;
8         snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
9         redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
10     else {
11         redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
12         psync_runid = "?";
13         memcpy(psync_offset,"-1",3);
14     }
15  
16     /* Issue the PSYNC command */
17     reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);

收到PSYNC指令后,master如果覺得可以進行增量同步,則會返回”+CONTINUE”, 如果必須進行全量同步,會返回”+FULLRESYNC”, 否則ERROR,這里具體待會介紹master的時候介紹。

1.只能進行全量同步

來看看如果必須進行全量同步的情況,這種情況下master會返回”+FULLRESYNC  runid offset” 給slave。雖然得全量,但是還會告訴slave runid是多少,以及當前master的backlog   offset位置,這樣讓slave下回來同步的時候能夠進行部分同步。也算是互相溝通一下狀態。

slave收到”+FULLRESYNC”結果后,會將runid保存到server.repl_master_runid上面,backlog offset位置放在server.repl_master_initial_offset里面。以便后面使用部分同步功能。讀取完RDB文件后會設置到server.master->reploff上的。

注意PSYNC如果只能進行全量同步,master自己會做RDB快照的,不需要再次發送SYNC。看下面的代碼:

1 int slaveTryPartialResynchronization(int fd) {
2 //`````
3     if (!strncmp(reply,"+FULLRESYNC",11)) {
4         char *runid = NULL, *offset = NULL;
5  
6         /* FULL RESYNC, parse the reply in order to extract the run id
7          * and the replication offset. */
8         runid = strchr(reply,' ');
9         if (runid) {
10             runid++;
11             offset = strchr(runid,' ');
12             if (offset) offset++;
13         }
14         if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
15             redisLog(REDIS_WARNING,
16                 "Master replied with wrong +FULLRESYNC syntax.");
17             /* This is an unexpected condition, actually the +FULLRESYNC
18              * reply means that the master supports PSYNC, but the reply
19              * format seems wrong. To stay safe we blank the master
20              * runid to make sure next PSYNCs will fail. */
21             memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
22         else {
23             memcpy(server.repl_master_runid, runid, offset-runid-1);
24             server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
25             server.repl_master_initial_offset = strtoll(offset,NULL,10);
26             redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
27                 server.repl_master_runid,
28                 server.repl_master_initial_offset);
29         }
30         /* We are going to full resync, discard the cached master structure. */
31         replicationDiscardCachedMaster();
32         sdsfree(reply);
33         return PSYNC_FULLRESYNC;
34     }
35 //····
36 }
37  
38 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
39 //````
40         server.master->reploff = server.repl_master_initial_offset;
41         memcpy(server.master->replrunid, server.repl_master_runid,
42             sizeof(server.repl_master_runid));
43 //·····
44 }

2.可以進行部分同步

如果master返回”+CONTINUE”,那就可以進行部分同步。這個比較簡單,繼續接收后面的數據 就行了。

3.發生錯誤

這個時候可能是master是老版本,不認識PSYNC,或者發生其他錯誤了,那就重新發送SYNC指令進行全量同步就行。

到這里還剩下幾個問題,第一個是如果連接斷開了,slave怎么記住master的runid和reploff位置的呢?

這個可以參考replicationCacheMaster,freeClient在斷開一個連接的時候,會判斷這個是不是master的連接,如果是,會調用replicationCacheMaster,將當前的狀態cache住,並且斷開跟本slave的下一級slave的連接。

1 void replicationCacheMaster(redisClient *c) {
2 //····
3     /* Save the master. Server.master will be set to null later by
4      * replicationHandleMasterDisconnection(). */
5     server.cached_master = server.master;
6 //···
7     replicationHandleMasterDisconnection();
8 }

下一個問題是slave的c->reploff如何保持跟master同步,因為他們必須絕對一致才行。

這個是通過在2端完成,雙方只要是發送給對方的指令,都會講指令的總長度加在offset上面,slave在readQueryFromClient讀取連接數據的時候增加這個值。master在replicationFeedSlaves函數里面會調用feedReplicationBacklogWithObject,后者最終調用feedReplicationBacklog,進而調整offset和backlog,這個待會介紹。

1 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2 //····
3     if (nread) {
4         sdsIncrLen(c->querybuf,nread);
5         c->lastinteraction = server.unixtime;
6         if (c->flags & REDIS_MASTER) c->reploff += nread;
7     }
8 //····
9 }

到這里slave部分介紹完畢。下一部分master端。

二、MASTER接收處理PSYNC指令

在master端,SYNC和PSYNC的處理函數都是syncCommand。只是增量了一段代碼檢測PSYNC指令,如果是,就會調用masterTryPartialResynchronization嘗試部分同步,如果不能進行部分同步,那就按照SYNC的方式處理,也就是進行全量同步,這個請參考“Redis主從同步源碼淺析-Slave端”。

1 void syncCommand(redisClient *c) {
2 //````
3     /* Try a partial resynchronization if this is a PSYNC command.
4      * If it fails, we continue with usual full resynchronization, however
5      * when this happens masterTryPartialResynchronization() already
6      * replied with:
7      *
8      * +FULLRESYNC <runid> <offset>
9      *
10      * So the slave knows the new runid and offset to try a PSYNC later
11      * if the connection with the master is lost. */
12     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
13         if (masterTryPartialResynchronization(c) == REDIS_OK) {
14             server.stat_sync_partial_ok++;
15             return/* No full resync needed, return. */
16         else {
17             char *master_runid = c->argv[1]->ptr;
18  
19             /* Increment stats for failed PSYNCs, but only if the
20              * runid is not "?", as this is used by slaves to force a full
21              * resync on purpose when they are not albe to partially
22              * resync. */
23             if (master_runid[0] != '?') server.stat_sync_partial_err++;
24         }
25     else {
26         /* If a slave uses SYNC, we are dealing with an old implementation
27          * of the replication protocol (like redis-cli --slave). Flag the client
28          * so that we don't expect to receive REPLCONF ACK feedbacks. */
29         c->flags |= REDIS_PRE_PSYNC_SLAVE;
30     }
31 //````
32 }

masterTryPartialResynchronization函數處理部分同步的檢查。

首先檢查runid是否匹配,如果不匹配那說明master重啟過了,必須全量,調轉到goto need_full_resync;

如果psync_offset 介於server.repl_backlog_off 和server.repl_backlog_off + server.repl_backlog_size 之間的話,那說明slave已經同步到的位置正好在我么的backlog之間,那說明他落下的東西master是記錄在backlog里面的!good,可以進行增量同步。

1 int masterTryPartialResynchronization(redisClient *c) {
2     long long psync_offset, psync_len;
3     char *master_runid = c->argv[1]->ptr;
4     char buf[128];
5     int buflen;
6  
7     /* Is the runid of this master the same advertised by the wannabe slave
8      * via PSYNC? If runid changed this master is a different instance and
9      * there is no way to continue. */
10     if (strcasecmp(master_runid, server.runid)) {
11         /* Run id "?" is used by slaves that want to force a full resync. */
12         if (master_runid[0] != '?') {
13             redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
14                 "Runid mismatch (Client asked for '%s', I'm '%s')",
15                 master_runid, server.runid);
16         else {
17             redisLog(REDIS_NOTICE,"Full resync requested by slave.");
18         }
19         goto need_full_resync;
20     }
21  
22     /* We still have the data our slave is asking for? */
23     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
24        REDIS_OK) goto need_full_resync;
25     if (!server.repl_backlog ||
26         psync_offset < server.repl_backlog_off ||
27         psync_offset >= (server.repl_backlog_off + server.repl_backlog_size))
28 //上面這一行的計算我看有點問題,應該用將repl_backlog_size替換為repl_backlog_histlen,因為后者才是代表實際數據長度。
29  {
30  redisLog(REDIS_NOTICE,
31  "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
32  goto need_full_resync;
33  }

下面進行增量同步的工作包括:將這個連接加到server.slaves里面,然后給slave發送”+CONTINUE\r\n”告訴他“沒事,你還可以趕得上”,然后使用addReplyReplicationBacklog把他落下的部分數據放到他的發送緩沖區中。

1 /*
If we reached this point, we are able to perform a partial resync:
2  * 1) Set client state to make it a slave.
3  * 2) Inform the client we can continue with +CONTINUE
4  * 3) Send the backlog data (from the offset to the end) to the slave. */
5 c->flags
|= REDIS_SLAVE;
6 c->replstate
= REDIS_REPL_ONLINE;
7 c->repl_ack_time
= server.unixtime;
8 listAddNodeTail(server.slaves,c);
9 /*
We can't use the connection buffers since they are used to accumulate
10  * new commands at this stage. But we are sure the socket send buffer is
11  * emtpy so this write will never fail actually. */
12 buflen
= snprintf(buf,
sizeof(buf),"+CONTINUE\r\n");
13 if (write(c->fd,buf,buflen) != buflen) {
14     freeClientAsync(c);
15     return REDIS_OK;
16 }
17 psync_len
= addReplyReplicationBacklog(c,psync_offset);

這樣slave收到”+CONTINUE\r\n”后就會像正常情況一樣接收master發送過來的數據,並且移動其c->reploff指針,部分同步開始。其實部分同步就是將落下的部分放到發送緩沖區發送給slave的事情。

關於addReplyReplicationBacklog函數就不多介紹了,里面是關於循環的backlog的處理,找出slave落下的數據,用addReplySds放到其緩沖區中准備發送。

如果不能進行部分同步,只能全部同步的話,master會附帶將當前master的狀態發送給slave。如下代碼,用”+FULLRESYNC %s %lld\r\n”指令發送過去。

1 int masterTryPartialResynchronization(redisClient *c) {
2 //·····