所有文章是按github的markdown格式书写的,如此页面显示不正常,请跳转到github版

关于etcd事件捕获的断点重续

1. 问题

通过etcd的watch接口捕获事件变更时,不得不面对的连接中断等异常场景。在连接中断后如果全量拉取数据,会给etcd数据库带来较大的负载冲击。因此有必要实现断点重续。

2. 方案概述

etcd中有一个全局的revision,每次PUT或DELETE值都会触发etcd中的revision加1。

因此我们可以在watch到每一个事件时,同时记录其对应的revision。当中途断连后,从上次已处理事件的revision+1开始继续watch就可以实现断点重续。

另外需要考虑重续位置的revision已被compacted的场景,此时必须要进行一次全量数据同步。

我们部署的etcd设置的自动compact间隔是2小时。因此,除了etcd连接长时间中断外,如果超过2个小时没有watch到新的事件也会导致上次记录的revision失效。为了避免这个情况,需要定期的主动获取etcd当前revision,作为长时间没有捕获到事件时的补充。

注意事项

  1. 一次watch到多个事件后,只有所有事件都处理完后才能更新记录的revision位点
  2. 定时通过get获取最新revision。一是可以作为etcd长连接的检活,二是可以在长时间没有获取新事件时的补充。当超过x秒没有watch到新事件,且连接未中断,将y秒前通过get获取到的revision作为下次断点重续的位点。
    • y必须小于x。比如y为60秒,x为600秒。设计延迟y秒的目的为了确保所设定的revision附近没有满足watch条件的事件发生。
    • 记录get方式获取revision和记录从watch的事件中获取的revision之间需要确保互斥
  3. 必须使用0.11以上的python-etcd3模块

3. 测试

以下测试程序主要演示etcd如何获取revision以及从指定revision位置watch事件。

3.1 测试程序

import etcd3
import etcd3.exceptions
import etcd3.utils as utils
import logging
import sys


logging.getLogger().setLevel(logging.DEBUG)

def connect():
    client = etcd3.client(host='10.37.163.43',
                      port=2379,
                      user='admin',
                      password='etcd@snds',
                      timeout=5)
    return client

def prn_obj(obj):
    return  '\n'.join(['  --%s:%s' % item for item in obj.__dict__.items()])

def my_callback(response_or_err):
    logging.info('---------------------my_callback():')
    logging.info('got response:%s' % response_or_err)
    logging.info('got response member:%s' % prn_obj(response_or_err))

    logging.info('---------------------content of response_or_err:')
    if type(response_or_err) == etcd3.watch.WatchResponse:
        logging.info('got response header.revision:%s' % response_or_err.header.revision)
        logging.info('got response events:')
        for e in response_or_err.events:
            logging.info('  :%s' % e)
    elif type(response_or_err) == etcd3.exceptions.RevisionCompactedError:
        logging.info('got RevisionCompactedError:%s' % response_or_err.compacted_revision)
    else:
        logging.info('got other Error:%s' % response_or_err)


def main():
    etcd=connect()

    ## 获取当前revision
    etcd.put('/test1','0')
    value,meta = etcd.get('/test1')
    logging.info('value of key /test1:%s' % value)
    logging.info('KVMeta of key /test1:%s' % prn_obj(meta))
    logging.info('mod_revision of key /test1:%s' % meta.mod_revision)
    logging.info('current revision:%s' % meta.response_header.revision)

    ## 从指定revision开始捕获变更
    etcd.add_watch_callback('/test1', my_callback, start_revision=meta.mod_revision)

    logging.info('press any key to exit')
    sys.stdin.readline()

if __name__ == "__main__":
    main()

3.2 正常场景

以下是正常场景输出(收到普通的PUT/DELETE事件)

[postgres@sndsdevdb18 python-etcd3-master]$ python3 mytest.py 
INFO:root:value of key /test1:b'0'
INFO:root:KVMeta of key /test1:  --key:b'/test1'
  --create_revision:3386233
  --mod_revision:3386253
  --version:20
  --lease_id:0
  --response_header:cluster_id: 12908842124456103319
member_id: 16003904845432636150
revision: 3386253
raft_term: 447849

INFO:root:mod_revision of key /test1:3386253
INFO:root:current revision:3386253
INFO:root:press any key to exit
INFO:root:---------------------my_callback():
INFO:root:got response:<etcd3.watch.WatchResponse object at 0x7fad82de0048>
INFO:root:got response member:  --header:cluster_id: 12908842124456103319
member_id: 16003904845432636150
revision: 3386253
raft_term: 447849

  --events:[<etcd3.events.PutEvent object at 0x7fad82de00b8>]
INFO:root:---------------------content of response_or_err:
INFO:root:got response header.revision:3386253
INFO:root:got response events:
INFO:root:  :<class 'etcd3.events.PutEvent'> key=b'/test1' value=b'0'

3.2 compacted场景

回收所有revision小于3386255的历史key

[root@sndsdevapp21 ~]# etcdctl --user 'admin:etcd@snds' compaction 3386255
compacted revision 3386255

修改测试程序,watch一个已被compact的revision

etcd.add_watch_callback('/test1', my_callback, start_revision=3386254)

再次执行测试程序,输出如下(收到一个RevisionCompactedError异常)。

[postgres@sndsdevdb18 python-etcd3-master]$ python3 mytest.py 
INFO:root:value of key /test1:b'0'
INFO:root:KVMeta of key /test1:  --key:b'/test1'
  --create_revision:3386233
  --mod_revision:3386260
  --version:25
  --lease_id:0
  --response_header:cluster_id: 12908842124456103319
member_id: 16003904845432636150
revision: 3386260
raft_term: 447849

INFO:root:mod_revision of key /test1:3386260
INFO:root:current revision:3386260
INFO:root:press any key to exit
INFO:root:---------------------my_callback():
INFO:root:got response:
INFO:root:got response member:  --compacted_revision:3386255
INFO:root:---------------------content of response_or_err:
INFO:root:got RevisionCompactedError:3386255

3.3 连接中断场景

在测试程序启动后,重启etcd进程。测试程序输出如下(收到一个其他error)

INFO:root:---------------------my_callback():
INFO:root:got response:<_Rendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Socket closed"
	debug_error_string = "{"created":"@1605106219.660601735","description":"Error received from peer ipv4:10.37.163.43:2379","file":"src/core/lib/surface/call.cc","file_line":1046,"grpc_message":"Socket closed","grpc_status":14}"
>
INFO:root:got response member:  --_state:<grpc._channel._RPCState object at 0x7f2ec03ab0b8>
  --_call:<grpc._cython.cygrpc.IntegratedCall object at 0x7f2ec03ab1d0>
  --_response_deserializer:<built-in method FromString of GeneratedProtocolMessageType object at 0x298cb08>
  --_deadline:None
INFO:root:---------------------content of response_or_err:
INFO:root:got other Error:<_Rendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Socket closed"
	debug_error_string = "{"created":"@1605106219.660601735","description":"Error received from peer ipv4:10.37.163.43:2379","file":"src/core/lib/surface/call.cc","file_line":1046,"grpc_message":"Socket closed","grpc_status":14}"
>
November 11, 2020