本篇是 Redis 6
剖析的第二篇,主要探讨 Redis
是怎么做主从同步的,对代码会有所删减。
SLAVE
通常启用主从同步,只要在从服务器执行 SLAVEOF HOST PORT
即可,这个时候就会执行到 replicaofCommand
。由于主从同步是从服务器发起的,因此我们先从 Slave
开始进行剖析。
repl_state
Redis
的主从同步,是通过状态机驱动的,因此有必要在本篇一开始前,就先看看有哪些状态。
1 | typedef enum { |
REPL_STATE_NONE
,未启动同步。REPL_STATE_CONNECT
,需要连接到Master
。REPL_STATE_RECEIVE_PING_REPLY
,等待PING
的回包。REPL_STATE_SEND_HANDSHAKE
,验证密码。REPL_STATE_RECEIVE_AUTH_REPLY
,等待AUTH
的回包。REPL_STATE_RECEIVE_PORT_REPLY
,等待REPLCONF
针对端口的回包。REPL_STATE_RECEIVE_IP_REPLY
,等待REPLCONF
针对IP的回包。REPL_STATE_RECEIVE_CAPA_REPLY
,等待REPLCONF
针对”能力”(即支持的功能)的回包。REPL_STATE_SEND_PSYNC
,发送PSYNC
。REPL_STATE_RECEIVE_PSYNC_REPLY
,等待PSYNC
的回包。REPL_STATE_TRANSFER
,传送快照。REPL_STATE_CONNECTED
,主从同步完成。
replicaofCommand
拿到 Master
的 IP
和 Port
。
1 | void replicaofCommand(client *c) { |
replicationSetMaster
断连所有的 Slave
,然后取消掉原先的主从连接(如果有),设置 Cache Master
为了复用 PSYNC
(保存当前进度,不进行全量同步)。
- 设置状态
REPL_STATE_CONNECT
,表示需要连接Master
。
1 | void replicationSetMaster(char *ip, int port) { |
connectWithMaster
Redis 6
支持 TLS
,为了简化剖析过程,此处默认不采用 TLS 连接
。
server.repl_transfer_lastio
,最后一次 IO 时间,用于超时处理。- 设置状态
REPL_STATE_CONNECTING
,表示已连接到Master
。
1 | int connectWithMaster(void) { |
syncWithMaster
Slave → Master
连接完成后,会进入到 syncWithMaster
回调。这个函数共有 300多行
,因此分为多个部分讲解。
- 若当前状态机状态为
REPL_STATE_NONE
,直接返回。 - 检查链接是否正常。
这种情况主要是出现在 Slave
连接上 Master
之后,Client 后悔了。
1 | void syncWithMaster(connection *conn) { |
REPL_STATE_CONNECTING
,设置Read Handler
为当前函数。发送命令
PING
到Master
。设置状态
REPL_STATE_RECEIVE_PING_REPLY
,表示等待Master
返回PONG
。
主要是因为 Connect Handler
只会执行一次,后面的状态机的处理流程都在本函数,因此需要再次进入该函数。
1 | /* Send a PING to check the master is able to reply without errors. */ |
同步读
Master
对PING
的回包,正常情况只要有回包都是没错误的,除非对方是旧版本。设置状态
REPL_STATE_SEND_HANDSHAKE
,表示需要进行握手。
1 | /* Receive the PONG command. */ |
握手阶段主要是进行密码验证,将
Slave
的IP
和PORT
传给Master
方便查询,同时告诉Master
我当前的能力,比如EOF
为我支持无盘传输
,psync2
表示支持部分同步。设置状态
REPL_STATE_RECEIVE_AUTH_REPLY
,表示等待认证回包。
1 | if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { |
检测认证情况。
设置状态
REPL_STATE_RECEIVE_PORT_REPLY
,表示等待Master
确认端口配置是否正常。
1 | if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth) |
检测端口配置情况。
设置状态
REPL_STATE_RECEIVE_CAPA_REPLY
,表示Master
确认能力回包。
1 | if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip) |
检测能力设置是否正常。
设置状态
REPL_STATE_SEND_PSYNC
,表示 开始进行同步。
1 | /* Receive CAPA reply. */ |
slaveTryPartialResynchronization(conn, 0)
表示给Master
发送PSYNC ? -1
? 为Master RunID
,-1
为进度。设置状态
REPL_STATE_RECEIVE_PSYNC_REPLY
,表示等待Master
对PSYNC
回包。
1 | /* Try a partial resynchonization. If we don't have a cached master |
slaveTryPartialResynchronization(conn,1)
表示同步读Master
针对PSYNC
的回包,看是要全量同步,还是要增量同步。不支持PSYNC
则进行全量同步。
1 | psync_result = slaveTryPartialResynchronization(conn,1); |
- 能够增量同步,在
slaveTryPartialResynchronization
中设置状态REPL_STATE_CONNECTED
,表示已连接成功,直接返回。
1 | /* If the master is in an transient error, we should try to PSYNC |
- 不支持增量同步,与所有 Slaves 端口,清空
backLog
,毕竟要重头开始了,通过SYNC
进行同步。
1 | /* PSYNC failed or is not supported: we want our slaves to resync with us |
- 通过
RDB
文件传输,则先创建临时文件。
1 | /* Prepare a suitable temp file for bulk transfer */ |
- 设置
Read Handler
,读文件,同时 设置状态REPL_STATE_TRANSFER
,表示文件传送中。
1 | /* Setup the non blocking download of the bulk file. */ |
slaveTryPartialResynchronization
slaveTryPartialResynchronization
主要是和 Master
通信获取是否可以增量同步的信息。
前半部分,则是通过发送命令 PSYNC
来进行对接, cached_master
是之前意外断开的 Master
节点信息。
1 |
|
后半部分则是读到 Master
的回包,并确认其是 全量同步 +FULLRESYNC
还是 增量同步 +CONTINUE
。
其中 RUN_ID
为一个40字符的随机值,每次启动实例随机生成, offset
相当于一个偏移量,用于之后同步完 RDB
后进行增量同步。
replid2
的出现主要是因为若从服务器被提拔为主服务器,其他的从服务器连到现在新的主服务器时,若直接校验 replid
则必然失败,因此出现了这个变量来保存上次同步的主服务器ID。
1 | /* Reading half */ |
readSyncBulkPayload
readSyncBulkPayload
主要负责读取 Master
的 RDB
文件(也可以是无盘传输)。
- 如果刚开始传输(通过
server.repl_transfer_size == 1
判断),则先检查协议,同时查看是通过文件传输还是无盘传输,如果是文件,则可以提前获取文件大小,否则通过EOF
标记代表无盘传输,以eofmark
作为结尾的标记。
1 |
|
- 非无盘加载 则无论文件传输还是无盘传输都先写入文件再读取。
若是无盘传输,通过 eofmark
与 lastbytes
对比得到是否传输完成。
Redis
源码将 无盘加载和有盘加载的代码进行拆分,为了方便剖析,此处进行合并。
1 | if (!use_diskless_load) { |
- 无盘加载
删除 socket
的 Read Handler
,因为后续的加载操作通过 RIO
去加载,一边读取 TCP流
,一边进行加载。
1 | if (use_diskless_load && |
replicationCron
replicationCron
在 Master
和 Slave
都会走到, Master
给 Slave
发心跳,而 Slave
给 Master
发当前的进度,用于展示时使用。
1 | void replicationCron(void) { |
Master
Master
在收到 PSYNC
或者 SYNC
后,会调用 syncCommand
。
syncCommand
- 若是
PSYNC
则会调用masterTryPartialResynchronization
来判断是否可以增量同步(从repl_backlog
缓冲区中查找),否则全量同步。 - 若为
SYNC
则 设置Client→flags
为CLIENT_PRE_PSYNC
,表示Slave
不会发送ACK
,不能因为其不发就认为其宕机。
1 | void syncCommand(client *c) { |
- 往下走就全是全量同步了,若已有
BGSAVE
命令再执行,则尝试复用 生成出来的RDB
,将其他Slave
的输出缓冲区拷给当前Slave
来达到同步的目的。
1 | c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; |
- 若在执行无盘传送,说明启用了子进程进行序列化,再通过匿名管道传给父进程,父进程再通过
Socket
发给Slave
,因此我们在这个时候应该等待。
1 | /* CASE 2: BGSAVE is in progress, with socket target. */ |
- 没有后台进程再运行,则无论是
无盘同步
还是RDB同步
都会走到startBgsaveForReplication
这个函数。
1 | /* CASE 3: There is no BGSAVE is progress. */ |
startBgsaveForReplication
决定无盘同步还是RDB同步, rdbSaveToSlavesSocket
和 rdbSaveBackground
名字已经很清晰了。
1 | int startBgsaveForReplication(int mincapa) { |
特别注意的是,无盘传输也是采用子进程的形式完成,但是绝不是通过子进程进行发送,而是子进程序列化好后通过匿名管道发给父进程,父进程再读取将其发往 Slave
。
rdbSaveToSlavesSockets
创建 匿名管道
,通过 RIO
将内存序列化后写入 管道
中,父进程通过管道取出发到 Slave
。
1 | /* Spawn an RDB child that writes the RDB to the sockets of the slaves |
父进程注册管道的可读事件,从 rdbPipeReadHandler
读取。
1 | } else { |
至此主从同步就已剖析完了,之后的命令传送则通过 propagate
函数进行传递。
主从同步的坑
主从数据不一致
- 主从同步本来就是异步过程,应从部署方面考虑。
读到过期数据
- 带有相对时间过期的命令发送到从服务器后,已经滞后了,最好使用绝对时间。
- Redis 旧版本的从库就算读过期数据,也会原样返回(新版本返回空值)。