关于PG逻辑订阅判断数据是否同步的方法
PG逻辑订阅过程中,怎么判断订阅端已经同步到哪儿了?
考虑过2种方案,哪个更合适
- 订阅端的
pg_stat_subscription
中latest_end_lsn
- 发布端的
pg_stat_replication
中的replay_lsn
1. 关于pg_stat_subscription
中的latest_end_lsn
pg_stat_subscription
中的received_lsn
和latest_end_lsn
比较像,它们的区别如下
received_lsn
:最后一次接收到的预写日志位置latest_end_lsn
:报告给原始WAL发送程序的最后的预写日志位置
1.1 pg_stat_subscription
中latest_end_lsn
的来源
来源是全局数组LogicalRepCtx->workers[]
select * from pg_stat_subscription
pg_stat_get_subscription()
memcpy(&worker, &LogicalRepCtx->workers[i],sizeof(LogicalRepWorker));
values[6] = LSNGetDatum(worker.reply_lsn);
1.2 LogicalRepWorker的分配
Launcher ApplyWorker时分配slot,通过bgw_main_arg参数传给ApplyWorker
ApplyLauncherMain(Datum main_arg)
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid);
/* Find unused worker slot. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (!w->in_use)
{
worker = w;
slot = i;
break;
}
}
bgw.bgw_main_arg = Int32GetDatum(slot);
RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)
1.3 latest_end_lsn
的更新
订阅端只有收到发布端的keepalive消息,才会更新pg_stat_subscription.latest_end_lsn
。
由于不是每次send_feedback()后都会更新latest_end_lsn
,所以latest_end_lsn
可能比实际反馈给发布端的lsn要滞后。实测时也经常能看到10秒以上的延迟。
为防止wal send超时,当超过wal_sender_timeout / 2
还没有收到接受端反馈时,发送端会主动发送keepalive消息。
LogicalRepApplyLoop(XLogRecPtr last_received)
for (;;)
{
...
len = walrcv_receive(wrconn, &buf, &fd);
if (len != 0)
{
if (c == 'w')
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
TimestampTz send_time;
start_lsn = pq_getmsgint64(&s);
end_lsn = pq_getmsgint64(&s);
send_time = pq_getmsgint64(&s);
if (last_received < start_lsn)
last_received = start_lsn;
if (last_received < end_lsn)
last_received = end_lsn;
UpdateWorkerStats(last_received, send_time, false);//更新pg_stat_subscription.received_lsn
apply_dispatch(&s);
}
else if (c == 'k')
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
bool reply_requested;
end_lsn = pq_getmsgint64(&s);
timestamp = pq_getmsgint64(&s);
reply_requested = pq_getmsgbyte(&s);
if (last_received < end_lsn)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);//反馈订阅端的write/flush/reply lsn
UpdateWorkerStats(last_received, timestamp, true);//更新pg_stat_subscription.received_lsn和pg_stat_subscription.latest_end_lsn
}
}
send_feedback(last_received, false, false);//反馈订阅端的write/flush/reply lsn
2. 如何跟踪订阅端实际apply到哪里?
latest_end_lsn
也能在一定程度上反映订阅端的apply位点,但是这和它本身的功能其实不是特别契合,而且它出现滞后的概率比较高,不是特别理想。
我们可以通过发布端的pg_stat_replication
统计视图跟踪订阅端的apply位置。
同样参考上面LogicalRepApplyLoop()的代码,订阅端反馈自己复制位置的逻辑如下:
- 如果没有pending的事务(所有和订阅相关的写事务已经在订阅端刷盘) 反馈给sender:write=flush=apply=接受到最新wal位置
- 如果有pending的事务 反馈给sender: write=接受到最新wal位置 flush=属于订阅范围的写事务已经在订阅端刷盘的位置 apply=属于订阅范围的写事务已经在订阅端写盘的位置
由上面可以看出,逻辑订阅和物理复制不一样,物理复制是先写wal再apply这个WAL;逻辑订阅是先apply事务,再反馈这个事务产生的wal的flush位置
相关代码如下:
send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
get_flush_position(&writepos, &flushpos, &have_pending_txes);
/*
* No outstanding transactions to flush, we can report the latest received
* position. This is important for synchronous replication.
*/
if (!have_pending_txes)
flushpos = writepos = recvpos;
...
pq_sendbyte(reply_message, 'r');
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
static void
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
bool *have_pending_txes)
{
dlist_mutable_iter iter;
XLogRecPtr local_flush = GetFlushRecPtr();
*write = InvalidXLogRecPtr;
*flush = InvalidXLogRecPtr;
dlist_foreach_modify(iter, &lsn_mapping)//lsn_mapping 在应用commit日志时更新
{
FlushPosition *pos =
dlist_container(FlushPosition, node, iter.cur);
*write = pos->remote_end;
if (pos->local_end <= local_flush)
{
*flush = pos->remote_end;
dlist_delete(iter.cur);//从lsn_mapping中移除已经本地刷盘的记录
pfree(pos);
}
else
{
/*
* Don't want to uselessly iterate over the rest of the list which
* could potentially be long. Instead get the last element and
* grab the write position from there.
*/
pos = dlist_tail_element(FlushPosition, node,
&lsn_mapping);
*write = pos->remote_end;
*have_pending_txes = true;
return;
}
}
*have_pending_txes = !dlist_is_empty(&lsn_mapping);
}
应用commit日志时,会将commit对应的远程lsn和本地lsn添加到lsn_mapping末尾
ApplyWorkerMain
LogicalRepApplyLoop(origin_startpos);
apply_dispatch(&s);
apply_handle_commit(StringInfo s)
replorigin_session_origin_lsn = commit_data.end_lsn; //更新pg_replication_origin_status
replorigin_session_origin_timestamp = commit_data.committime;
CommitTransactionCommand();
store_flush_position(commit_data.end_lsn);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
flushpos->local_end = XactLastCommitEnd;
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
3. 发布端pg_stat_replication
中的apply位点能否保证正确性?
首先,需要明确,只有出现以下情况时,拿到的apply位置才认为有误的
- 发布端更新了订阅表的表
- 更新这个表的事务已提交
- 订阅端还没有应用这个事务
pg_stat_replication
中看到的apply位点已经大于等于3的事务结束位置
当所有表都是r或s状态时,订阅端的apply worker顺序接受和应用WAL日志。
在订阅端本地提交完成前,不会实施后续的send_feedback(),所以不会产生超过实际提交位置的apply位点(甚至碰巧pg_stat_subscription
中的latest_end_lsn
也可以认为是对的)。
4. 发布端pg_stat_replication
中的apply位点是否可能反馈不及时?
有可能。但是pg_stat_replication.replay_lsn
滞后的概率低于pg_stat_subscription.latest_end_lsn
当订阅端已处于同步状态时,下面的情况下pg_stat_replication
中的apply位点可能反馈不及时,比发布端的当前lsn滞后。
- 订阅端处于sleep状态,最多sleep 1秒
- 发布端发送非订阅表更新的消息(含keepalive)不及时
发送端为了防止sender超时,会及时发送keepalive保活,因此我们可以在发布端停止更新订阅表后,可以最多等待wal_sender_timeout
一样大的时间。