Greenplum【部署 04】GPSS扩展安装并使用GPKafka实现Kafka数据导入

2023-03-21 00:00:00 数据 数据库 字段 执行 脚本

链接:https://pan.baidu.com/s/1MO-qL0Pxe6PojfZKsw3_qA
提取码:o7fl

Greenplum Stream Server (GPSS)是一个ETL(提取、转换、加载)工具。GPSS服务器的一个实例从一个或多个客户机接收流数据,使用Greenplum数据库可读的外部表将数据转换并插入到目标Greenplum表中。数据源和数据格式是特定于客户机的。数据源和数据格式由客户端指定。

1. GPSS安装(gpadmin用户操作)
安装文件为gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg。

# 安装
gppkg -i gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg

# 安装信息
==========================================================================
GPSS installation is complete! To proceed, create gpss extension in the
target database with:
"CREATE EXTENSION gpss;"
==========================================================================
# 特别注意:扩展的安装只在当前数据库生效。

2.GPSS配置(主节点)
# 1.切换 gpadmin 用户并进入数据库命令行工具
su gpadmin
psql
# 2.启用扩展 GPSS
CREATE EXTENSION gpss;

# 执行实例
[gpadmin@tcloud ~]$ psql
psql (9.4.24)
Type "help" for help.

gp_sydb=# CREATE EXTENSION gpss;
CREATE EXTENSION
# 扩展仅对数据库 gp_sydb 有效

3.数据导入实例
3.1 YAML配置文件
GPKafka支持多种数据格式的导入,包括avro、csv、delimited、json等。这里给出的例子是json格式的,如果需要其他格式,可以从网络上查询配置文件的编写方式,这里使用json格式主要是为了和其他业务公共Topic下的数据。在gpadmin用户的任意目录创建YAML文件,进行编辑,粘贴进去按格式整理好的配置信息,保存退出。举例 gpkafkatest.yaml 文件内容如下:

DATABASE: gp_sydb # 数据库
USER: gpadmin # 用户名
PASSWORD: gpadmin # 密码
HOST: tcloud # master节点IP或hostname
PORT: 5432 # GP数据库端口号
KAFKA:
INPUT:
SOURCE:
BROKERS: tcloud:9092 # Kafka 连接地址端口
TOPIC: gp-test # kafka 数据Topic
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10 # 遇到错误退出的次数
OUTPUT:
SCHEMA: public
TABLE: gp_table # 写入数据表
MAPPING: # 字段配置(NAME 是数据库字典名 EXPRESSION 对应的是 Kafka 字段名)
- NAME: id
EXPRESSION: (jdata->>'ID')::varchar
- NAME: name
EXPRESSION: (jdata->>'NAME')::varchar
- NAME: gender
EXPRESSION: (jdata->>'GENDER')::varchar
- NAME: phone
EXPRESSION: (jdata->>'PHONE')::varchar
- NAME: age
EXPRESSION: (jdata->>'AGE')::int
COMMIT:
MAX_ROW: 10000 # 一次多提交数量
MINIMAL_INTERVAL: 20 # 等待多少时间一提交(毫秒)



3.2 执行脚本
进入对应的yaml文件目录

# 使用以下命令测试脚本执行
gpkafka load gpkafkatest.yaml

# 后台运行脚本
nohup gpkafka load gpkafkatest.yaml 1>/dev/null 2>&1 &

# root用户下执行
su - gpadmin -c " nohup gpkafka load /path/to/gpkafkatest.yaml 1>/dev/null 2>&1 & "

# 添加到开机启动

# 查询已执行脚本
ps -aux | grep gpkafka

进入对应的yaml文件目录,执行 gpkafka load gsmdata.yaml 命令进行测试,也可以执行 nohup gpkafka load gsmdata.yaml 1>/dev/null 2>&1 &让脚本保持在后台运行。如果在root用户下,需要执行 su - gpadmin -c " nohup gpkafka load gsmdata.yaml 1>/dev/null 2>&1 &" 来指定导入脚本在gpadmin(数据库用户下)下运行,也可以把该指令添加到开机启动服务里(打开/etc/rc.d/rc.local或/etc/rc.local文件,在末尾增加执行的指令内容),确保开机即开启脚本。

使用ps -aux | grep gpkafka查询已执行的脚本

4. 注意事项
配置文件中大部分内容都根据实际情况进行配置即可,需要注意的事项时数据表的字段类型配置问题,例如JSON中是字符串,数据表对应的字段是整形就会异常;例如JSON中式空字符串,数据表中对应的字段是时间戳,也会异常。为了防止在生产环境中出现异常导致数据入库停止,需要对入库的数据进行进一步清洗:

对数据字段进行严格验证;
删除值为空的属性;
验证字符串的长度不能超过字段的长度(需要考虑编码);
不能随意更改数据类型和字段长度。


本文来源:https://blog.csdn.net/weixin_39168541/article/details/123922314

相关文章