所有文章是按github的markdown格式书写的,如此页面显示不正常,请跳转到github版

citus平滑扩容和缩容

前言

对一个分布式数据库来说,动态扩缩容是不可回避的需求。但是citus的动态扩缩容功能只在企业版中才有。好消息是,citus的分片信息是存储在元数据表里的,通过修改元数据表,我们完全可以在citus社区版上实现动态的平滑扩缩容。

环境

软件

  • CentOS 7.4
  • PostgreSQL 10
  • citus 7.2

集群架构(扩容前)

  • cituscn
    • cituswk1
    • cituswk2

集群架构(扩容后)

  • cituscn
    • cituswk1
    • cituswk2
    • cituswk3

实验环境可参考《使用Docker搭建citus实验环境》搭建。

原理概述

citus提供了现成的管理函数可以添加新的worker节点,但现有的分片表和参考表却不会自动分布到新加的worker上。 我们需要手动移动这些分片,并且要保证分片移动过程中不中断业务。主要过程可以分为以下几个步骤

  1. 表复制
    • 在移动目标分片的源端和目的端建立复制
  2. 元数据切换
    • 加锁,阻塞相关的分片表的数据变更
    • 修改pg_dist_shard_placement元数据表,变更分片位置信息。
  3. 清理
    • DROP切换前的旧的分片

表复制采用PostgreSQL的逻辑复制实现,因此所有worker节点必须预先打开逻辑复制开关。

wal_level = logical

注1:citus在添加新worker节点时已经在新worker上拷贝了参考表,不需要再人工处理。

注2:扩容时,如果把worker数翻倍,也可以用物理复制实现。使用物理复制时,如果有参考表不能调用master_add_node添加节点,必须手动修改元数据表。逻辑复制不支持复制DDL,物理复制没有这个限制,但物理复制没有逻辑复制灵活,只支持worker粒度的扩容,而且不能实现缩容。

分片表扩容操作步骤

创建测试分片表

创建以下测试分片表

create table tb1(id int primary key, c1 int);
set citus.shard_count=8;
select create_distributed_table('tb1','id');
insert into tb1 select id,random()*1000 from generate_series(1,100)id;

检查分片位置

postgres=# select * from pg_dist_placement where shardid in (select shardid from pg_dist_shard where logicalrelid='tb1'::regclass);
 placementid | shardid | shardstate | shardlength | groupid 
-------------+---------+------------+-------------+---------
          33 |  102040 |          1 |           0 |       1
          34 |  102041 |          1 |           0 |       2
          35 |  102042 |          1 |           0 |       1
          36 |  102043 |          1 |           0 |       2
          37 |  102044 |          1 |           0 |       1
          38 |  102045 |          1 |           0 |       2
          39 |  102046 |          1 |           0 |       1
          40 |  102047 |          1 |           0 |       2
(8 rows)

上面的groupid代表了对应哪个worker

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster 
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
      1 |       1 | cituswk1 |     5432 | default  | f           | t        | primary  | default
      2 |       2 | cituswk2 |     5432 | default  | f           | t        | primary  | default
(2 rows)

添加新的worker

在CN节点上执行以下SQL,将新的worker节点cituswk3加入到集群中

SELECT * from master_add_node('cituswk3', 5432);

检查pg_dist_node元数据表。新的worker节点的groupid为4

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster 
--------+---------+----------+----------+----------+-------------+----------+----------+-------------
      1 |       1 | cituswk1 |     5432 | default  | f           | t        | primary  | default
      2 |       2 | cituswk2 |     5432 | default  | f           | t        | primary  | default
      4 |       4 | cituswk3 |     5432 | default  | f           | t        | primary  | default
(3 rows)

复制分片

目前cituswk1和cituswk2上各有4个分片,cituswk3上没有分片,为了保持数据分布均匀可以移动部分分片到cituswk3上。

下面移动cituswk1上的分片102046到cituswk3上。

在cituswk1上创建PUBLICATION

CREATE PUBLICATION pub_shard FOR TABLE tb1_102046;

在cituswk3上创建分片表和SUBSCRIPTION

create table tb1_102046(id int primary key, c1 int);
CREATE SUBSCRIPTION sub_shard
	CONNECTION 'host=cituswk1'
	PUBLICATION pub_shard;

切换元数据

begin;
lock table tb1;
update pg_dist_placement set groupid=4 where shardid=102046 and groupid=1;
commit;

清理

在cituswk1上删除分片表和PUBLICATION

DROP PUBLICATION pub_shard;
drop table tb1_102046;

在cituswk3上删除SUBSCRIPTION

DROP SUBSCRIPTION sub_shard;

分片表缩容操作步骤

参考分片表扩容的步骤,将要删除的worker(cituswk3)上的分片(102046)移到其它worker(cituswk1)上,然后删除worker(cituswk3)。

select master_remove_node('cituswk3',5432);

亲和性表的处理

citus的分片表之间存在亲和性关系,具有亲和性(即colocationid相同)的所有分片表的同一范围的分片其所在位置必须相同。 移动某个分片时,必须将这些亲和分片捆绑移动。可以通过以下SQL查出某个分片的所有亲和分片。

postgres=# select * from pg_dist_shard where logicalrelid in(select logicalrelid from pg_dist_partition where colocationid=(select colocationid from pg_dist_partition where partmethod='h' and logicalrelid='tb1'::regclass)) and (shardminvalue,shardmaxvalue)=(select shardminvalue,shardmaxvalue from pg_dist_shard where shardid=102046);
 logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue 
--------------+---------+--------------+---------------+---------------
 tb1          |  102046 | t            | 1073741824    | 1610612735
 tb2          |  102055 | t            | 1073741824    | 1610612735
(2 rows)

对应的分片表元数据如下:

postgres=# select logicalrelid,partmethod,colocationid from pg_dist_partition;
 logicalrelid | partmethod | colocationid 
--------------+------------+--------------
 tb1          | h          |            2
 tb2          | h          |            2
 tb3          | h          |            4
(3 rows)
March 4, 2018