所有文章是按github的markdown格式书写的,如此页面显示不正常,请跳转到github版

关于PG逻辑订阅判断数据是否同步的方法

PG逻辑订阅过程中,怎么判断订阅端已经同步到哪儿了?

考虑过2种方案,哪个更合适

  • 订阅端的pg_stat_subscriptionlatest_end_lsn
  • 发布端的pg_stat_replication中的replay_lsn

1. 关于pg_stat_subscription中的latest_end_lsn

pg_stat_subscription中的received_lsnlatest_end_lsn比较像,它们的区别如下

  • received_lsn:最后一次接收到的预写日志位置
  • latest_end_lsn:报告给原始WAL发送程序的最后的预写日志位置

1.1 pg_stat_subscriptionlatest_end_lsn的来源

来源是全局数组LogicalRepCtx->workers[]

select * from pg_stat_subscription
  pg_stat_get_subscription()
    memcpy(&worker, &LogicalRepCtx->workers[i],sizeof(LogicalRepWorker));
    values[6] = LSNGetDatum(worker.reply_lsn);

1.2 LogicalRepWorker的分配

Launcher ApplyWorker时分配slot,通过bgw_main_arg参数传给ApplyWorker

ApplyLauncherMain(Datum main_arg)
  logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid);
	/* Find unused worker slot. */
	for (i = 0; i < max_logical_replication_workers; i++)
	{
		LogicalRepWorker *w = &LogicalRepCtx->workers[i];

		if (!w->in_use)
		{
			worker = w;
			slot = i;
			break;
		}
	}
	bgw.bgw_main_arg = Int32GetDatum(slot);
	RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)

1.3 latest_end_lsn的更新

订阅端只有收到发布端的keepalive消息,才会更新pg_stat_subscription.latest_end_lsn。 由于不是每次send_feedback()后都会更新latest_end_lsn,所以latest_end_lsn可能比实际反馈给发布端的lsn要滞后。实测时也经常能看到10秒以上的延迟。 为防止wal send超时,当超过wal_sender_timeout / 2还没有收到接受端反馈时,发送端会主动发送keepalive消息。

LogicalRepApplyLoop(XLogRecPtr last_received)
  
	for (;;)
	{
	...
		len = walrcv_receive(wrconn, &buf, &fd);
		if (len != 0)
		{
		
			if (c == 'w')
			{
				XLogRecPtr	start_lsn;
				XLogRecPtr	end_lsn;
				TimestampTz send_time;

				start_lsn = pq_getmsgint64(&s);
				end_lsn = pq_getmsgint64(&s);
				send_time = pq_getmsgint64(&s);

				if (last_received < start_lsn)
					last_received = start_lsn;

				if (last_received < end_lsn)
					last_received = end_lsn;

				UpdateWorkerStats(last_received, send_time, false);//更新pg_stat_subscription.received_lsn

				apply_dispatch(&s);
			}
			else if (c == 'k')
			{
				XLogRecPtr	end_lsn;
				TimestampTz timestamp;
				bool		reply_requested;

				end_lsn = pq_getmsgint64(&s);
				timestamp = pq_getmsgint64(&s);
				reply_requested = pq_getmsgbyte(&s);

				if (last_received < end_lsn)
					last_received = end_lsn;

				send_feedback(last_received, reply_requested, false);//反馈订阅端的write/flush/reply lsn
				UpdateWorkerStats(last_received, timestamp, true);//更新pg_stat_subscription.received_lsn和pg_stat_subscription.latest_end_lsn
			}
		}
		send_feedback(last_received, false, false);//反馈订阅端的write/flush/reply lsn

2. 如何跟踪订阅端实际apply到哪里?

latest_end_lsn也能在一定程度上反映订阅端的apply位点,但是这和它本身的功能其实不是特别契合,而且它出现滞后的概率比较高,不是特别理想。

我们可以通过发布端的pg_stat_replication统计视图跟踪订阅端的apply位置。

同样参考上面LogicalRepApplyLoop()的代码,订阅端反馈自己复制位置的逻辑如下:

  • 如果没有pending的事务(所有和订阅相关的写事务已经在订阅端刷盘) 反馈给sender:write=flush=apply=接受到最新wal位置
  • 如果有pending的事务 反馈给sender: write=接受到最新wal位置 flush=属于订阅范围的写事务已经在订阅端刷盘的位置 apply=属于订阅范围的写事务已经在订阅端写盘的位置

由上面可以看出,逻辑订阅和物理复制不一样,物理复制是先写wal再apply这个WAL;逻辑订阅是先apply事务,再反馈这个事务产生的wal的flush位置

相关代码如下:

send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
  	get_flush_position(&writepos, &flushpos, &have_pending_txes);
	/*
	 * No outstanding transactions to flush, we can report the latest received
	 * position. This is important for synchronous replication.
	 */
	if (!have_pending_txes)
		flushpos = writepos = recvpos;
	...
	pq_sendbyte(reply_message, 'r');
	pq_sendint64(reply_message, recvpos);	/* write */
	pq_sendint64(reply_message, flushpos);	/* flush */
	pq_sendint64(reply_message, writepos);	/* apply */
	pq_sendint64(reply_message, now);	/* sendTime */
	pq_sendbyte(reply_message, requestReply);	/* replyRequested */


static void
get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
				   bool *have_pending_txes)
{
	dlist_mutable_iter iter;
	XLogRecPtr	local_flush = GetFlushRecPtr();

	*write = InvalidXLogRecPtr;
	*flush = InvalidXLogRecPtr;

	dlist_foreach_modify(iter, &lsn_mapping)//lsn_mapping 在应用commit日志时更新
	{
		FlushPosition *pos =
		dlist_container(FlushPosition, node, iter.cur);

		*write = pos->remote_end;

		if (pos->local_end <= local_flush)
		{
			*flush = pos->remote_end;
			dlist_delete(iter.cur);//从lsn_mapping中移除已经本地刷盘的记录
			pfree(pos);
		}
		else
		{
			/*
			 * Don't want to uselessly iterate over the rest of the list which
			 * could potentially be long. Instead get the last element and
			 * grab the write position from there.
			 */
			pos = dlist_tail_element(FlushPosition, node,
									 &lsn_mapping);
			*write = pos->remote_end;
			*have_pending_txes = true;
			return;
		}
	}

	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
}

应用commit日志时,会将commit对应的远程lsn和本地lsn添加到lsn_mapping末尾

ApplyWorkerMain
  LogicalRepApplyLoop(origin_startpos);
    apply_dispatch(&s);
      apply_handle_commit(StringInfo s)
        replorigin_session_origin_lsn = commit_data.end_lsn; //更新pg_replication_origin_status
        replorigin_session_origin_timestamp = commit_data.committime;
        CommitTransactionCommand();
        store_flush_position(commit_data.end_lsn);
            /* Track commit lsn  */
            flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
            flushpos->local_end = XactLastCommitEnd;
            flushpos->remote_end = remote_lsn;
            dlist_push_tail(&lsn_mapping, &flushpos->node);

3. 发布端pg_stat_replication中的apply位点能否保证正确性?

首先,需要明确,只有出现以下情况时,拿到的apply位置才认为有误的

  1. 发布端更新了订阅表的表
  2. 更新这个表的事务已提交
  3. 订阅端还没有应用这个事务
  4. pg_stat_replication中看到的apply位点已经大于等于3的事务结束位置

当所有表都是r或s状态时,订阅端的apply worker顺序接受和应用WAL日志。 在订阅端本地提交完成前,不会实施后续的send_feedback(),所以不会产生超过实际提交位置的apply位点(甚至碰巧pg_stat_subscription中的latest_end_lsn也可以认为是对的)。

4. 发布端pg_stat_replication中的apply位点是否可能反馈不及时?

有可能。但是pg_stat_replication.replay_lsn滞后的概率低于pg_stat_subscription.latest_end_lsn

当订阅端已处于同步状态时,下面的情况下pg_stat_replication中的apply位点可能反馈不及时,比发布端的当前lsn滞后。

  1. 订阅端处于sleep状态,最多sleep 1秒
  2. 发布端发送非订阅表更新的消息(含keepalive)不及时

发送端为了防止sender超时,会及时发送keepalive保活,因此我们可以在发布端停止更新订阅表后,可以最多等待wal_sender_timeout一样大的时间。

May 8, 2020