Citus分布式方案(七)- 集群管理

2022-05-07 00:00:00 数据 节点 分片 复制 租户

目录
(七)集群管理
选择集群的大小
1. 分片的数量
2. 多租户SaaS
3. 实时分析
初始硬件大小
1. 多租户SaaS
2. 实时分析
集群扩容
1. 添加一个数据节点
2. 在不停机的情况下重新平衡分片
3. 工作原理
4. 增加一个协调节点(开发中)
节点故障处理
1. 数据节点故障
2. 协调节点故障
租户隔离(企业版特性)
查看统计信息(企业版特性)
合理利用资源
1. 限制长时间运行的查询
PostgreSQL扩展
1. 插件使用
2. 加入新的实例
(七)集群管理
在本节内容:从Citus集群中添加或删除节点,以及如何处理节点故障。

选择集群的大小
为了更容易跨节点移动分片或在失败节点上重新复制分片,Citus企业版提供了一个分片重均衡的扩展。在此之前,我们将了解下在生产环境中运行集群的配置。

1. 分片的数量
集群中的节点数量很容易更改,但是在集群创建之后,更改在这些节点之间分布的分片数量就变得复杂了。为每个分布式表选择分片初始数量需要在更多分片的灵活性与跨分片查询的开销之间的做好平衡。

2. 多租户SaaS
在多租户数据库用例中,官方建议在32 - 128个碎片之间进行选择。对于较小的工作负载(比如<100GB),可以从32个碎片开始,对于较大的工作负载,可以选择64或128,但这意味着需要将数据节点的数量从32台扩展到128台。

3. 实时分析
在实时分析用例中,分片数量应该与工作程序上核心的总数有关。 为了确保大的并行度,应该在每个节点上创建足够的分片,以使每个CPU内核至少有一个分片。 我们通常建议创建大量初始分片,如当前CPU内核数量的2倍或4倍,以便在将来进行扩展。

不过,对于每个查询,Citus会为每个分片打开一个数据库连接,并且这些连接受到连接总数的限制。注意分片数量不能过大,以避免带来分布式查询不必的连接等待。 换句话说,所需的连接(大并发查询数×分片数)通常不应超过系统中可能的总连接数(数据节点数×每个数据节点的max_connections)。

初始硬件大小
集群的大小,就节点数及其硬件容量而言,很容易更改。但是,仍然需要为新集群选择初始大小。

1. 多租户SaaS
对于从现有的单节点数据库实例迁移到Citus的用户,建议选择一个群集,其中数据节点的核心和RAM的总数等于原始实例的数目。由于分片提高了资源利用率,允许使用更小的索引,因此很容易得到2-3倍的性能提升。

协调节点比数据节点需要更少的内存,因此可以选择偏重于CPU的计算机来运行协调节点。 所需的内核数量取决于现有的工作负载(读/写吞吐量)。

2. 实时分析
核心总数:Citus的性能提升与工作核心数成正比。确定适合的核心数量,需要考虑单节点数据库中当前查询的延迟以及Citus中所需的延迟,可以使用当前等待时间除以所需等待时间四舍五入后的结果。

数据节点RAM:好的情况是提供足够的内存,使大部分工作集都能存放在内存中。应用程序使用的查询类型会影响内存需求,通过对查询运行EXPLAIN (ANALYZE, BUFFERS)以确定它需要多少内存。

集群扩容
Citus的基于逻辑分片的体系结构可以扩展集群而无需停机。

1. 添加一个数据节点
Citus将所有数据存储在数据节点上的分布式表中,因此,如果通过增加更多的计算能力来扩展集群,可以通过增加一个数据节点来实现。

要将新节点添加到集群,首先需要在pg_dist_node目录表中添加该节点和端口(运行PostgreSQL的端口),包括DNS名称或IP地址,可以使用master_add_node()函数执行:

SELECT * from master_add_node('node-name', 5432);
1
新节点可用于新的分布式表的分片。如果不进行重新分配,那么现有分片将保留在原处,因此,如果不采取进一步措施,添加新的数据节点可能对性能没有任何帮助。

如果群集里有非常大的参考表,那它们可能会减慢节点的添加速度。在这种情况下,可以考虑设置citus.replicate_reference_tables_on_activate(boolean)延迟引用表复制。

从Citus 8.1开始,数据节点默认使用加密通信。运行8.1或更高版本的新节点将拒绝与未启用SSL的其他数据节点通信,因此,将节点添加到没有加密通信的群集中时,必须在创建Citus扩展之前重新配置新节点。

首先,从协调节点检查其他工作人员是否使用SSL:

SELECT run_command_on_workers('show ssl');
1
如果没有,那么连接到新节点,并允许它在必要时通过明文通信:

ALTER SYSTEM SET citus.node_conninfo TO 'sslmode=prefer';
SELECT pg_reload_conf();
1
2
2. 在不停机的情况下重新平衡分片
如果要将现有的分片移动到新添加的工作器中,Citus企业版提供了rebalance_table_shards()函数来简化此工作。此功能将移动给定表的分片以在数据节点之间平均分配它们。

该功能可配置为根据多种策略重新平衡分片,示例使用默认策略重新分配分片:

SELECT rebalance_table_shards();
1
许多用户场景,例如多租户SaaS应用程序,都无法忍受停机时间,Citus(PostgreSQL 10及以上版本)重新平衡功能可以满足这一要求。在移动数据时,Citus能够以小的中断来执行应用程序的读取和写入。

3. 工作原理
Citus的分片重新平衡使用PostgreSQL逻辑复制将数据从旧分片(在复制方面称为发布方)移动到新分片(订阅方)。逻辑复制允许应用程序读写在复制分片数据时可以连续不间断地进行。Citus仅在更新元数据以将订阅方分片提升为活动状态时才对分片进行简短的写锁定。

如果分布式表已定义了主键,则无需进行任何额外的工作即可进行分片重新平衡。但是,如果它没有主键或没有明确定义的副本标识,则尝试重新平衡它会导致错误。例如:

-- creating the following table without REPLICA IDENTITY or PRIMARY KEY
CREATE TABLE test_table (key int not null, value text not null);
SELECT create_distributed_table('test_table', 'key');

-- running shard rebalancer with default behavior
SELECT rebalance_table_shards('test_table');

/*
NOTICE: Moving shard 102040 from localhost:9701 to localhost:9700 ...
ERROR: cannot use logical replication to transfer shards of the
relation test_table since it doesn't have a REPLICA IDENTITY or
PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the shard will error out during
logical replication unless there is a REPLICA IDENTIY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the
shard_transfer_mode to 'force_logical' or 'block_writes'.
*/


如何应对这些情况?

① 如果要复制的表有包含分发列的索引,则可以选择该索引作为副本标识:

-- supposing my_table has unique index my_table_idx
-- which includes distribution column
ALTER TABLE my_table REPLICA IDENTITY USING INDEX my_table_idx;
1
2
3
虽然使用索引代替REPLICA IDENTITY可以解决上述问题,但并不建议将REPLICA IDENTITY FULL添加到表中。此设置将导致每个更新或删除都在订阅方进行全表扫描,以查找具有这些行的元组,性能影响较大。

② 将主键添加到表中。如果所需的键恰好是分布列,则非常简单,只需添加约束即可。否则,具有非分布列的主键必须是复合键,并且也必须包含分布列。

③ 如果分布式表没有主键或副本标识,并且不清楚选择什么列进行添加,仍可以在强制使用逻辑复制。该方式可以在只有读取和插入(不删除或更新)的表上执行:

SELECT rebalance_table_shards(
'test_table',
shard_transfer_mode => 'force_logical'
);
1
2
3
4
在这种情况下,如果应用程序确实需要在复制期间尝试更新或删除,则请求将返回错误,直至复制完成。

④ 在PostgreSQL 9.x以及更低版本上,Citus并不支持逻辑复制。在这种情况下,只能改用效率较低的方案:将分片复制到新位置时,将其锁定后进行写入,因此会导致写入语句被阻塞(读取查询不受影响)。

阻止写入并使用COPY命令进行复制的方法:

SELECT rebalance_table_shards(
'test_table',
shard_transfer_mode => 'block_writes'
);

4. 增加一个协调节点(开发中)
Citus协调节点只存储有关表分片的元数据,不存储任何业务数据。因此,所有计算都将下推到数据节点进行,而协调节点仅对数据节点的结果进行终汇总。因此,协调节点不太可能成为读取性能的瓶颈。同样,通过转移到硬件更强大的机器上来提升协调节点的性能也很方便。

但是,在一些高并发写的情况下,协调节点就容易成为性能瓶颈。 由于元数据表很小(通常只有几MB),因此可以将元数据复制到另一个节点上并定期进行同步,即,添加另一个协调器。用户可以将其查询发送给任何协调员,并扩展性能。

节点故障处理
在Citus群集不停机的情况下处理节点故障。首先介绍Citus如何通过维护数据的多个副本自动处理数据节点的故障,还要简要介绍在节点长时间关闭的情况下用户如何复制其分片,以及讨论如何为协调器设置冗余和故障处理机制。

1. 数据节点故障
Citus支持两种复制模式来处理数据节点的故障。其一,使用PostgreSQL的流复制来按原样复制整个数据节点。其二,Citus可以复制数据修改语句,从而在不同的数据节点之间复制分片。根据不同的工作负载和用例,它们具有的优势,如下所述:

① PostgreSQL流复制:此模式适合繁重的OLTP。它通过连续将其WAL记录流传输到备用数据库来复制整个数据节点,通过查询PostgreSQL复制文档自行配置内部流复制。

② Citus分片复制:此模式适合追加数据。Citus通过自动复制DML语句并管理一致性,在不同的节点之间复制分片。如果某个节点发生故障,则协调节点将无缝路由到副本来继续为查询提供服务。要启用分片复制,只需在创建集群前设置SET citus.shard_replication_factor = 2。

2. 协调节点故障
Citus协调器维护元数据表,以跟踪所有群集节点以及这些节点上数据库分片的位置。元数据表很小(通常只有几MB),并且不会经常更改。这意味着如果节点出现故障,可以对其进行复制并快速还原。

① 使用PostgreSQL流复制:PostgreSQL的流复制功能创建协调节点的热备。如果主协调节点发生故障,则备用数据库可以自动升级为主数据库,以向集群提供查询。

② 由于元数据表很小,因此用户可以使用EBS卷或PostgreSQL备份工具来备份元数据。如果主协调节点发生故障,可以轻松地将该元数据还原到新节点以恢复操作。

租户隔离(企业版特性)
Citus根据行分布列的哈希值将表行放入数据节点分片中。在Citus多租户场景中,多个分布列值通常属于同一分片,即,租户经常共享分片。

但是,当租户的大小差异很大的时候,共享分片会导致资源竞争。对于拥有大量租户的系统,这是一种常见情况,随着租户数量的增加,租户数据的大小倾向于遵循Zipfian分布。这意味着有一些非常大的租户,还有许多较小的租户。为了改善资源分配并确保租户QoS,有效的方法是将大型租户移至专用节点。

Citus企业版提供了在特定节点上隔离租户的功能,分为两个阶段:

① 将租户的数据传输到新的专用分片上;
② 将分片移动到新的节点上。

在Citus元数据中,每个分片都使用其包含的哈希值范围进行标记。Citus中isolate_tenant_to_new_shard(table_name,tenant_id)函数通过三个步骤将租户移入专用分片:

① 为table_name创建一个新的分片,这个分片将包括其分布列的值为tenant_id的行,并排除所有其他行。
② 将相关行从其当前分片移动到新分片。
③ 将旧分片分成两个散列范围,这些散列范围与上下左右的隔离块相邻。

此外,isolate_tenant_to_new_shard()函数可使用CASCADE选项,该选项不仅隔离了table_name的租户行,而且隔离了与之共存的所有表的租户行。

-- This query creates an isolated shard for the given tenant_id and returns the new shard id.
-- General form:
SELECT isolate_tenant_to_new_shard('table_name', tenant_id);
-- Specific example:
SELECT isolate_tenant_to_new_shard('lineitem', 135);
-- If the given table has co-located tables, the query above errors out and advises to use the CASCADE option
SELECT isolate_tenant_to_new_shard('lineitem', 135, 'CASCADE');

输出:

┌─────────────────────────────┐
│ isolate_tenant_to_new_shard │
├─────────────────────────────┤
│ 102240 │
└─────────────────────────────┘

新分片将在与删除原租户分片相同的节点上创建。为了实现真正的硬件隔离,可以将它们移至Citus群集中其他的单独节点。isolate_tenant_to_new_shard()函数返回新创建的分片ID,该ID可用于移动分片:

-- find the node currently holding the new shard
SELECT nodename, nodeport
FROM pg_dist_placement AS placement,
pg_dist_node AS node
WHERE placement.groupid = node.groupid
AND node.noderole = 'primary'
AND shardid = 102240;

-- list the available worker nodes that could hold the shard
SELECT * FROM master_get_active_worker_nodes();

-- move the shard to your choice of worker
-- (it will also move any shards created with the CASCADE option)
SELECT master_move_shard_placement(
102240,
'source_host', source_port,
'dest_host', dest_port);

master_move_shard_placement()函数还将移动与指定的分片共存的所有数据,以保证原来的协调定位能力。

查看统计信息(企业版特性)
在管理Citus群集时,了解应用程序正在运行哪些查询,涉及哪些节点以及Citus对每个查询使用哪种执行方法很有必要。Citus通过查询citus_stat_statements元数据视图中的记录来获取统计信息,该视图的名称类似于Postgres的pg_stat_statments。pg_stat_statements存储有关查询持续时间和I/O的信息,而citus_stat_statements存储有关Citus执行方法和分片分区键的信息。

Citus要求安装pg_stat_statements扩展以跟踪查询统计信息。因此,在Postgres实例上,需要通过shared_preload_libraries将扩展名加载到postgresql.conf中,然后在SQL中创建扩展:

CREATE EXTENSION pg_stat_statements;
1
假设我们有一个名为foo的表,它是按id列哈希分布的:

-- create and populate distributed table
create table foo ( id int );
select create_distributed_table('foo', 'id');

insert into foo select generate_series(1,100);

运行两个查询,citus_stat_statements将显示Citus如何选择执行它们:

-- counting all rows executes on all nodes, and sums the results on the coordinator
SELECT count(*) FROM foo;

-- specifying a row by the distribution column routes execution to an individual node
SELECT * FROM foo WHERE id = 42;

SELECT * FROM citus_stat_statements;


输出:

-[ RECORD 1 ]-+----------------------------------------------
queryid | -6844578505338488014
userid | 10
dbid | 13340
query | SELECT count(*) FROM foo;
executor | adaptive
partition_key |
calls | 1
-[ RECORD 2 ]-+----------------------------------------------
queryid | 185453597994293667
userid | 10
dbid | 13340
query | insert into foo select generate_series($1,$2)
executor | insert-select
partition_key |
calls | 1
-[ RECORD 3 ]-+----------------------------------------------
queryid | 1301170733886649828
userid | 10
dbid | 13340
query | SELECT * FROM foo WHERE id = $1
executor | adaptive
partition_key | 42
calls | 1

我们可以看到Citus常使用自适应执行器adaptive来运行查询。该执行程序将查询分组以在不同节点上运行,并将结果合并到协调器节点上。比如,按分发列id过滤的查询,Citus确定它仅需要访问一个节点,再如,insert into foo select…语句与insert-select执行程序一起运行。

通过对给定查询运行EXPLAIN命令,同样可以获得上述信息。

在多租户数据库中,我们希望绝大多数查询都是单租户,如果过多的多租户查询可能表明查询没有合适的过滤器来匹配租户,并且正在使用不必要的资源。

SELECT sum(calls),
partition_key IS NOT NULL AS single_tenant
FROM citus_stat_statements
GROUP BY 2;

sum | single_tenant
-----+---------------
2 | f
1 | t
1
2
3
4
同样,我们还可以找到哪些partition_id是频繁的目标,即,繁忙的租户:

SELECT partition_key, sum(calls) as total_queries
FROM citus_stat_statements
WHERE coalesce(partition_key, '') <> ''
GROUP BY partition_key
ORDER BY total_queries desc
LIMIT 10;

┌───────────────┬───────────────┐
│ partition_key │ total_queries │
├───────────────┼───────────────┤
│ 42 │ 1 │
└───────────────┴───────────────┘

pg_stat_statements视图的相关配置,限制了它跟踪的语句的数量及其记录的持续时间。因为citus_stat_statements使用的是pg_stat_statements中的查询子集,所以,如果两个视图选择相等的限制配置将导致它们保留的数据不匹配。

有三种方法可以帮助同步视图,并且这三种方法可以一起使用:

① 让守护程序定期同步citus和pg stats:citus.stats_statements_purge_interval以秒为单位设置同步时间,值为0将禁用定期同步。

② 调整citus_stat_statements中的条目数:当新条目超过阈值(默认值为50K,大允许值为10M)时,citus.stats_statements_max会删除旧条目。每个条目在共享内存中的开销约为140字节,需要适宜的设置。

③ 增加pg_stat_statements.max:它的默认值为5000,并且可以在没有太多开销的情况下增加到10K,20K甚至50K。

更改pg_stat_statements.max或citus.stat_statements_max需要重启PostgreSQL服务,更改citus.stats_statements_purge_interval通过调用pg_reload_conf()实现。

合理利用资源
1. 限制长时间运行的查询
长时间运行的查询可能会导致Lock、WAL排队或消耗大量系统资源,因此在生产环境中,好防止它们运行太长时间。在协调节点和工作程序上设置statement_timeout参数(单位毫秒),可以取消运行时间过长的查询。

-- limit queries to five minutes
ALTER DATABASE citus
SET statement_timeout TO 300000;

SELECT run_command_on_workers($cmd$
ALTER DATABASE citus
SET statement_timeout TO 300000;
$cmd$);


指定每个查询的超时时间,在事务中使用SET LOCAL即可:

BEGIN;
-- this limit applies to just the current transaction
SET LOCAL statement_timeout TO 300000;
...
COMMIT;

PostgreSQL扩展
作为PostgreSQL的一个扩展插件,Citus受益于丰富的PostgreSQL生态系统附带的功能。这些功能包括支持多种数据类型(JSONB和HSTORE等非结构化数据类型)、运算符和函数、全文搜索以及其他扩展,例如PostGIS和HyperLogLog等。此外,扩展API可以与标准PostgreSQL工具(例如pgAdmin和pg_upgrade等)兼容。

1. 插件使用
得益于Citus是可以安装在任何PostgreSQL实例上的扩展,因此,可以在Citus中直接使用其他扩展,例如hstore、hll或PostGIS。只需确保,在shared_preload_libraries中包括其他扩展名时,Citus是个扩展名即可。

除了核心的Citus扩展之外,时常用到的还有其他几项:

① cstore_fdw:用于分析的列式存储。通过从磁盘读取相关数据来提供性能,并且可以将数据压缩6x-10x以减少数据归档的空间需求。

② pg_cron:在数据库上运行定期作业。

③ postgresql-topn:根据某些条件返回数据库中的高值,使用近似算法,结合计算和内存资源快速提供结果。

④ postgresql-hll:HyperLogLog数据结构作为数据类型,是一个固定大小的类似集合的结构,用于以可调的精度进行不同的值计数。

2. 加入新的实例
每个PostgreSQL服务器可以容纳多个数据库,但是,新数据库并不会继承原数据库的扩展,所需的扩展名必须重新添加。要在新数据库上运行Citus,需要在协调节点和数据节点上创建数据库,在该数据库中创建Citus扩展,然后在协调节点数据库中注册数据节点:

-- create the new db on coordinator and workers
CREATE DATABASE newbie;
SELECT run_command_on_workers('CREATE DATABASE newbie;');

-- review the worker nodes registered in current db
SELECT * FROM master_get_active_worker_nodes();

-- switch to new db on coordinator
\c newbie

-- create citus extension in new db
CREATE EXTENSION citus;

-- register workers in new db
SELECT * from master_add_node('node-name', 5432);
SELECT * from master_add_node('node-name2', 5432);
...
-- for each of them


在每个数据节点的新数据库实例上,手动运行:

CREATE EXTENSION citus;
1
至此,新的数据库将作为另一个Citus集群运行。
————————————————
版权声明:本文为CSDN博主「子段知秋」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ivan_yaya/article/details/108470792

相关文章