Citus大量数据导出优化
1. 问题
业务需要将Citus中存储的分片表数据定期导出(或归档)到HIVE等其他存储。由于需要导出的数据量特别大,一批需要导出几亿甚至几十亿条记录,会给业务库造成很大的冲击。
用户经常会用下面几种方式导出大批数据
方式1:一次全部导出
如果一次全部导出上亿条记录,会存在以下问题
- 导出时间长达数小时,会影响垃圾回收,触发长事务告警,并且如果中断失败,需要重新开始。
- 每个worker同时在源表的所有shard上并发读取数据,引起worker的CPU和IO飙升。
- CN上同时接受来自worker的大量数据,引起CN的网络,CPU和IO飙升。
- CN在导出大量数据的过程中需要将数据暂存在临时文件中,极端情况下可能撑爆磁盘。
- CN上接受到worker数据后先暂存在
base/pgsql_job_cache
中的临时文件中 - CN构造的返回给客户端的过大的结果集需要暂存在
base/pgsql_tmp
中的临时文件中
- CN上接受到worker数据后先暂存在
方式2:使用分页方式分批导出
使用order by xxx offset x limit y
对结果进行分页后,分批导出。
这种方式极不可取,应避免,因为它存在下面的问题
- 排序会带来大量的数据库资源消耗(如果不排序不能保证分批数据的连续性)
- CN需要把远大于一批的数据从worker上捞到CN本地后再进行分页,不在分页范围内的数据会被丢失,下次分页有会重复这个过程,越往后的分页读取消耗的资源越多。多次分页严重放大了实际从worker读取的数据量。仅获取最后一个分页数据的消耗的资源就已经大于【方式1:一次全部导出】
方式3:使用条件过滤的方式分批导出
使用WHERE条件将匹配的记录切割到适当大小,再分批获取。前提条件是WHERE条件列上必须有索引,否则如果每次获取都全表扫描实际是放大了worker的负载。
和方式1相比,这种方式可以在一定程度上缓解一次导出时间过长的问题。但是依然存在以下问题,仍不能从根本上解决问题。
-
全表数据导出时索引扫描没有全表扫描高效
-
仍然是同时读取worker上的所有shard数据,worker负载比较高
2. 解决方案
理想的方式是一次只读取一个shard,并且中间的临时数据不在CN落盘。
在使用的Citus 9.3以上的版本时,可以通过COPY命令达到这个目的。Citus 9.3以上版本在执行COPY导出数据时,依次顺序导出所有shard上的数据,并且中间结果不会落盘,可以极大的缓解业务库的压力。详细参考:https://github.com/citusdata/citus/pull/3361 。
但是,使用Citus 9.3的COPY导出数据时仍然存在以下问题
- COPY不能对数据进行过滤
- 应用程序必须解析COPY的输出,没有处理标准的SELECT结果集方便
- 只能一次性导出所有shard上的全部数据,不能按shard分批导出。如果中途失败,必须从头开始重试。
- Citus 9.3以前的版本不支持
因为COPY存在以上限制,我们只能通过其他方式解决这个问题。
考虑到数据库管理上的规范,当前我们不予许应用直连Citus的Worker节点。为了达到一次导出一个shard的目的,我们可以通过dblink函数进行中转(使用postgres_fdw
也可以达到相同目的,但是需要执行额外的创建FDW的DDL操作)。示例如下:
原始导出SQL
select * from tb1 where c1 > 0;
优化后的SQL
WITH t AS
(
SELECT logicalrelid,shardid
FROM pg_dist_shard
WHERE logicalrelid='tb1'::regclass --设置导出源表
ORDER BY shardid
offset 0 --设置导出第几个shard
limit 1
),
t2 AS
(
SELECT logicalrelid,a.shardid,nodename
FROM t a join pg_dist_shard_placement b on (a.shardid = b.shardid)
)
SELECT a.* FROM t2,dblink('host=' || nodename || ' dbname=' || current_database(), $$select * from $$ || shard_name(logicalrelid,shardid) || $$ where c1 > 0$$) --设置过滤条件
a(id integer, c1 integer, c2 text); --填入源表的定义
以上SQL导出一个shard的数据,需要修改上面的offset
值依次导出所有shard数据。
注意事项
-
使用dblink的方式导出数据时,dblink内部也需要将worker的返回结果在CN本地落盘,因此当单个shard的数据过大时,需要注意CN上数据目录空间是否充足。
但是,和普通的数据导出相比,dblink导数据时临时文件占用的空间要小得多,一是dblink一次只导出一个shard,二次普通数据导出方式,临时数据需要落2次盘,dblink方式只要落一次盘。
-
如果需要分批导出数据,不建议用offset limit分页,而是使用WHERE条件分批过滤。比如
SELECT ... WHERE etl_time >= '时间点1' AND etl_time < '时间点2'; SELECT ... WHERE etl_time >= '时间点2' AND etl_time < '时间点3'; SELECT ... WHERE etl_time >= '时间点3' AND etl_time < '时间点4'; ...
3. 实测对比
下面通过一个例子看下效果
3.1 表定义
create table tb1(id integer, c1 integer, c2 text);
select create_distributed_table('tb1','c1');
3.2 数据准备
插入2000w记录到测试表中的同一个shard中(为了便于对比,其他shard不放数据)。
insert into tb1 select id,1,'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' from generate_series(1,20000000)id;
3.3 性能对比
分别用普通方式和dblink方式导出数据,观察导出耗费的时间和消耗的资源。测试时,在SQL前面加上explain analyze
避免大量数据输出到控制台。
3.3.1 普通方式的数据导出SQL
select * from tb1
3.3.2 dblink方式的数据导出SQL
WITH t AS
(
SELECT logicalrelid,shardid
FROM pg_dist_shard
WHERE logicalrelid='tb1'::regclass
ORDER BY shardid
offset 0
limit 1
),
t2 AS
(
SELECT logicalrelid,a.shardid,nodename
FROM t a join pg_dist_shard_placement b on (a.shardid = b.shardid)
)
SELECT a.* FROM t2,dblink('host=' || nodename || ' dbname=' || current_database(),
$$select * from $$ || shard_name(logicalrelid,shardid))
a(id integer, c1 integer, c2 text);
3.3.3 测试结果
项目 | 普通方式 | dblink方式 | 备注 |
---|---|---|---|
导出时间 | 83s | 51s | |
CN CPU峰值 | 95% | 95% | PG进程的CPU利用率 |
CN 私有内存峰值 | 15MB | 12MB | top的RES - SHR 的值 |
CN临时文件占用(base/pgsql_job_cache) | 2185MB | 0 | 存放CN从Worker拉取的中间数据 |
CN临时文件占用(base/pgsql_tmp) | 2406MB | 2406MB | CN处理大结果集时落盘的临时文件 |
Worker CPU峰值 | 59% | 44% | |
Worker 私有内存峰值 | 2MB | 2MB |
由上面的测试结果过可以看出
- dblink方式可以一次导出一个shard
- dblink方式导出数据和Citus的普通方式相比,导出单个shard的临时文件占用空间减少1半
这个方案存在一个问题,dblink()不允许普通账号免密调用。如果希望允许普通账号访问,可以给dblink()函数加上属性SECURITY DEFINER
进行回避,但是这样做需要防止SQL注入。
如果需要防范SQL注入,可以在dblink前面再包一层函数,由这个封装函数生成SQL或调用dblink前对用户传入的SQL做一些检查。实现的示例参考【附录:dblink安全封装】。但是,这样做也要明显的弊端,会导致临时结果集存储2次,一次函数调用一次。以下是测试数据。
项目 | dblink方式 | dblink封装方式 | 备注 |
---|---|---|---|
导出时间 | 51s | 63s | |
CN CPU峰值 | 95% | 94% | PG进程的CPU利用率 |
CN 私有内存峰值 | 12MB | 25MB | top的RES - SHR 的值 |
CN临时文件占用(base/pgsql_job_cache) | 0 | 0 | 存放CN从Worker拉取的中间数据 |
CN临时文件占用(base/pgsql_tmp) | 2406MB | 4662MB | CN处理大结果集时落盘的临时文件 |
Worker CPU峰值 | 44% | 50% | |
Worker 私有内存峰值 | 2MB | 2MB |
4. 附录:dblink安全封装
--selectsql:the query to generate query result type
-- e.g:
-- postgres=# select generate_query_result_type($$select 1 c1 ,now(), 'aa'::varchar(100)$$);
-- generate_query_result_type
-- -------------------------------------------
-- (c1 integer,now timestamptz,varchar text)
-- (1 row)
CREATE OR REPLACE FUNCTION public.generate_query_result_type(selectsql text)
RETURNS text
AS $$
DECLARE
result_type text;
BEGIN
EXECUTE 'DROP VIEW IF EXISTS generate_query_result_type_tmpview';
EXECUTE format('CREATE TEMPORARY VIEW generate_query_result_type_tmpview AS %s', selectsql);
SELECT
'(' ||
(SELECT string_agg(attname|| ' ' || tmptypname, ',')
FROM
(SELECT a.attnum,
a.attname,
case b.typname
when 'bit' then b.typname || '(' || d.character_maximum_length || ')'
else b.typname
end as tmptypname
FROM pg_class AS c, pg_attribute AS a, information_schema.columns d, pg_type AS b
WHERE c.relname = 'generate_query_result_type_tmpview' AND
a.attrelid = c.oid AND
a.attnum>0 AND
d.table_name = c.relname AND
d.column_name = a.attname AND
b.oid = a.atttypid AND
a.attisdropped=false ORDER BY a.attnum) AS tb)
|| ')'
INTO result_type;
RETURN result_type;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION public.export_citus_shard(tablename text,
index int,
select_sql text default '') -- 表名用%代替,暂未实现
RETURNS SETOF record
AS $$
DECLARE
sql text;
BEGIN
sql=format($sql$
WITH t AS
(
SELECT logicalrelid,shardid
FROM pg_dist_shard
WHERE logicalrelid='%s'::regclass
ORDER BY shardid
offset %s
limit 1
),
t2 AS
(
SELECT logicalrelid,a.shardid,nodename
FROM t a join pg_dist_shard_placement b on (a.shardid = b.shardid)
)
SELECT a.* FROM t2,public.dblink('host=' || nodename || ' dbname=' || current_database(),
'select * from ' || shard_name(logicalrelid,shardid))a%s
$sql$,
tablename,
index,
public.generate_query_result_type('select * from ' || tablename));
RAISE NOTICE 'sql=%', sql;
RETURN QUERY EXECUTE sql;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
select * from export_citus_shard('tb1',1) tb1(id integer,c1 integer,c2 text) limit 3;