所有文章是按github的markdown格式书写的,如此页面显示不正常,请跳转到github版

Citus大量数据导出优化

1. 问题

业务需要将Citus中存储的分片表数据定期导出(或归档)到HIVE等其他存储。由于需要导出的数据量特别大,一批需要导出几亿甚至几十亿条记录,会给业务库造成很大的冲击。

用户经常会用下面几种方式导出大批数据

方式1:一次全部导出

如果一次全部导出上亿条记录,会存在以下问题

  1. 导出时间长达数小时,会影响垃圾回收,触发长事务告警,并且如果中断失败,需要重新开始。
  2. 每个worker同时在源表的所有shard上并发读取数据,引起worker的CPU和IO飙升。
  3. CN上同时接受来自worker的大量数据,引起CN的网络,CPU和IO飙升。
  4. CN在导出大量数据的过程中需要将数据暂存在临时文件中,极端情况下可能撑爆磁盘。
    • CN上接受到worker数据后先暂存在base/pgsql_job_cache中的临时文件中
    • CN构造的返回给客户端的过大的结果集需要暂存在base/pgsql_tmp中的临时文件中

方式2:使用分页方式分批导出

使用order by xxx offset x limit y对结果进行分页后,分批导出。

这种方式极不可取,应避免,因为它存在下面的问题

  1. 排序会带来大量的数据库资源消耗(如果不排序不能保证分批数据的连续性)
  2. CN需要把远大于一批的数据从worker上捞到CN本地后再进行分页,不在分页范围内的数据会被丢失,下次分页有会重复这个过程,越往后的分页读取消耗的资源越多。多次分页严重放大了实际从worker读取的数据量。仅获取最后一个分页数据的消耗的资源就已经大于【方式1:一次全部导出】

方式3:使用条件过滤的方式分批导出

使用WHERE条件将匹配的记录切割到适当大小,再分批获取。前提条件是WHERE条件列上必须有索引,否则如果每次获取都全表扫描实际是放大了worker的负载。

和方式1相比,这种方式可以在一定程度上缓解一次导出时间过长的问题。但是依然存在以下问题,仍不能从根本上解决问题。

  1. 全表数据导出时索引扫描没有全表扫描高效

  2. 仍然是同时读取worker上的所有shard数据,worker负载比较高

2. 解决方案

理想的方式是一次只读取一个shard,并且中间的临时数据不在CN落盘。

在使用的Citus 9.3以上的版本时,可以通过COPY命令达到这个目的。Citus 9.3以上版本在执行COPY导出数据时,依次顺序导出所有shard上的数据,并且中间结果不会落盘,可以极大的缓解业务库的压力。详细参考:https://github.com/citusdata/citus/pull/3361 。

但是,使用Citus 9.3的COPY导出数据时仍然存在以下问题

  1. COPY不能对数据进行过滤
  2. 应用程序必须解析COPY的输出,没有处理标准的SELECT结果集方便
  3. 只能一次性导出所有shard上的全部数据,不能按shard分批导出。如果中途失败,必须从头开始重试。
  4. Citus 9.3以前的版本不支持

因为COPY存在以上限制,我们只能通过其他方式解决这个问题。

考虑到数据库管理上的规范,当前我们不予许应用直连Citus的Worker节点。为了达到一次导出一个shard的目的,我们可以通过dblink函数进行中转(使用postgres_fdw也可以达到相同目的,但是需要执行额外的创建FDW的DDL操作)。示例如下:

原始导出SQL

select * from tb1 where c1 > 0;

优化后的SQL

WITH t AS
(
  SELECT logicalrelid,shardid
    FROM pg_dist_shard
    WHERE logicalrelid='tb1'::regclass  --设置导出源表
    ORDER BY shardid
    offset 0 --设置导出第几个shard
    limit 1
),
t2 AS
(
  SELECT logicalrelid,a.shardid,nodename
    FROM t a join pg_dist_shard_placement b on (a.shardid = b.shardid)
)
SELECT a.* FROM t2,dblink('host=' || nodename || ' dbname=' || current_database(), $$select * from $$ || shard_name(logicalrelid,shardid) || $$ where c1 > 0$$) --设置过滤条件
    a(id integer, c1 integer, c2 text); --填入源表的定义

以上SQL导出一个shard的数据,需要修改上面的offset值依次导出所有shard数据。

注意事项

  1. 使用dblink的方式导出数据时,dblink内部也需要将worker的返回结果在CN本地落盘,因此当单个shard的数据过大时,需要注意CN上数据目录空间是否充足。

    但是,和普通的数据导出相比,dblink导数据时临时文件占用的空间要小得多,一是dblink一次只导出一个shard,二次普通数据导出方式,临时数据需要落2次盘,dblink方式只要落一次盘。

  2. 如果需要分批导出数据,不建议用offset limit分页,而是使用WHERE条件分批过滤。比如

    SELECT ... WHERE etl_time >= '时间点1' AND etl_time < '时间点2';
    SELECT ... WHERE etl_time >= '时间点2' AND etl_time < '时间点3';
    SELECT ... WHERE etl_time >= '时间点3' AND etl_time < '时间点4';
    ...
    

3. 实测对比

下面通过一个例子看下效果

3.1 表定义

create table tb1(id integer, c1 integer, c2 text);
select create_distributed_table('tb1','c1');

3.2 数据准备

插入2000w记录到测试表中的同一个shard中(为了便于对比,其他shard不放数据)。

insert into tb1 select id,1,'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' from generate_series(1,20000000)id;

3.3 性能对比

分别用普通方式和dblink方式导出数据,观察导出耗费的时间和消耗的资源。测试时,在SQL前面加上explain analyze避免大量数据输出到控制台。

3.3.1 普通方式的数据导出SQL

select * from tb1

3.3.2 dblink方式的数据导出SQL

WITH t AS
(
  SELECT logicalrelid,shardid
    FROM pg_dist_shard
    WHERE logicalrelid='tb1'::regclass
    ORDER BY shardid
    offset 0
    limit 1
),
t2 AS
(
  SELECT logicalrelid,a.shardid,nodename
    FROM t a join pg_dist_shard_placement b on (a.shardid = b.shardid)
)
SELECT a.* FROM t2,dblink('host=' || nodename || ' dbname=' || current_database(),
  $$select * from $$ || shard_name(logicalrelid,shardid))
    a(id integer, c1 integer, c2 text);

3.3.3 测试结果

项目 普通方式 dblink方式 备注
导出时间 83s 51s  
CN CPU峰值 95% 95% PG进程的CPU利用率
CN 私有内存峰值 15MB 12MB top的RES - SHR的值
CN临时文件占用(base/pgsql_job_cache) 2185MB 0 存放CN从Worker拉取的中间数据
CN临时文件占用(base/pgsql_tmp) 2406MB 2406MB CN处理大结果集时落盘的临时文件
Worker CPU峰值 59% 44%  
Worker 私有内存峰值 2MB 2MB  

由上面的测试结果过可以看出

  1. dblink方式可以一次导出一个shard
  2. dblink方式导出数据和Citus的普通方式相比,导出单个shard的临时文件占用空间减少1半

这个方案存在一个问题,dblink()不允许普通账号免密调用。如果希望允许普通账号访问,可以给dblink()函数加上属性SECURITY DEFINER进行回避,但是这样做需要防止SQL注入。

如果需要防范SQL注入,可以在dblink前面再包一层函数,由这个封装函数生成SQL或调用dblink前对用户传入的SQL做一些检查。实现的示例参考【附录:dblink安全封装】。但是,这样做也要明显的弊端,会导致临时结果集存储2次,一次函数调用一次。以下是测试数据。

项目 dblink方式 dblink封装方式 备注
导出时间 51s 63s  
CN CPU峰值 95% 94% PG进程的CPU利用率
CN 私有内存峰值 12MB 25MB top的RES - SHR的值
CN临时文件占用(base/pgsql_job_cache) 0 0 存放CN从Worker拉取的中间数据
CN临时文件占用(base/pgsql_tmp) 2406MB 4662MB CN处理大结果集时落盘的临时文件
Worker CPU峰值 44% 50%  
Worker 私有内存峰值 2MB 2MB  

4. 附录:dblink安全封装

--selectsql:the query to generate query result type
-- e.g: 
-- postgres=# select generate_query_result_type($$select 1 c1 ,now(), 'aa'::varchar(100)$$);
--         generate_query_result_type         
-- -------------------------------------------
--  (c1 integer,now timestamptz,varchar text)
-- (1 row)
CREATE OR REPLACE FUNCTION public.generate_query_result_type(selectsql text) 
RETURNS text 
AS $$
    DECLARE
        result_type text;
    BEGIN
        EXECUTE 'DROP VIEW IF EXISTS generate_query_result_type_tmpview';
        EXECUTE format('CREATE TEMPORARY VIEW generate_query_result_type_tmpview AS %s', selectsql);

        SELECT 
            '(' || 
                (SELECT string_agg(attname|| ' ' || tmptypname, ',')
                 FROM
                    (SELECT a.attnum,
                            a.attname, 
                            case b.typname  
                                when 'bit'      then b.typname || '(' || d.character_maximum_length || ')'  
                                else b.typname
                            end as tmptypname
                    FROM    pg_class AS c, pg_attribute AS a, information_schema.columns d, pg_type AS b
                    WHERE   c.relname = 'generate_query_result_type_tmpview'    AND 
                            a.attrelid = c.oid                                  AND
                            a.attnum>0                                          AND
                            d.table_name = c.relname                            AND
                            d.column_name = a.attname                           AND
                            b.oid = a.atttypid                                  AND
                            a.attisdropped=false ORDER BY a.attnum) AS tb)
            || ')'
        INTO result_type;

        RETURN result_type;
    END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION public.export_citus_shard(tablename text, 
                                                     index int,
                                                     select_sql text default '') -- 表名用%代替,暂未实现
RETURNS SETOF record
AS $$
    DECLARE
        sql text;
    BEGIN
        sql=format($sql$
        WITH t AS
        (
         SELECT logicalrelid,shardid
           FROM pg_dist_shard
           WHERE logicalrelid='%s'::regclass
           ORDER BY shardid
           offset %s
           limit 1
        ),
        t2 AS
        (
         SELECT logicalrelid,a.shardid,nodename
           FROM t a join pg_dist_shard_placement b on (a.shardid = b.shardid)
        )
        SELECT a.* FROM t2,public.dblink('host=' || nodename || ' dbname=' || current_database(), 
          'select * from ' || shard_name(logicalrelid,shardid))a%s
        $sql$,
        tablename,
        index,
        public.generate_query_result_type('select * from ' || tablename));

        RAISE NOTICE  'sql=%', sql;
        RETURN QUERY EXECUTE sql;
    END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

select * from export_citus_shard('tb1',1) tb1(id integer,c1 integer,c2 text) limit 3;
November 11, 2020