Spark写入GreenPlum

2022-06-07 00:00:00 数据 初始化 方式 写入 效率

、测试方式

为了测试在Spark框架下以不同方式将数据插入GreenPlum的性能,这里需要准备足够量的数据。我采用的是gzip格式压缩的Hdfs数据,在这种压缩格式下数据量约1.2G,数据条目共34946664条,测试中会插入一个GreenPlum表中,按其中一个字段作为分区存入三个表分区中。

输出表

1、DataFrame.write()

支持:该方式使用greenplum-jdbc包,通过JDBC连接来向gp库插入数据。就是通过dataFrame.write().jdbc()的形式将数据写入gp库。

效率:但是其写入的速度非常慢,写入上述数据到GP库包括数据的初始化(并行初始化时间非常短,可忽略)的耗时为1.7h这个级别,约每秒写入6000条数据(MySql数据库也维持在这种写入效率)。在大数据量的时候实在不建议使用这种方式写入。


2、CopyManager.copyIn()

支持:该方式使用postgresql的JDBC包,利用了postgresql的特性。使用包中提供的CopyManager将字节流直接写入数据库中,这种写入的方式无法提供事务处理,当然我们也不需要事务处理。

Dataset df = getData().persist(StorageLevel.MEMORY_AND_DISK());
df.foreachPartition(new ForeachPartitionFunction<Location>() {
// URL
String url = GREENPLUM_URL;
// 数据库用户名
String username = GREENPLUM_USER;
// 数据库密码
String password = GREENPLUM_AUTH;
@Override
public void call(Iterator<Location> iterator) throws Exception {
// 加载驱动
Class.forName("org.postgresql.Driver");
// 获取连接
Connection connection = DriverManager.getConnection(url, username, password);
CopyManager copyManager = new CopyManager((BaseConnection) connection);
StringBuilder sb = new StringBuilder();
while(iterator.hasNext()){
Location row = iterator.next();
sb.append(row.toString()+"\r\n");
}
ByteArrayInputStream inputStream = new ByteArrayInputStream(sb.toString().getBytes(StandardCharsets.UTF_8));
copyManager.copyIn("COPY tb_gp_test (imsi,stime,etime,longitude,latitude,cityid,eci) FROM STDIN",inputStream);
connection.close();
}
});
}

效率:其写入GP库的效率很高,写入上述数据到GP库(包括数据初始化的耗时)为4min到6分钟这个级别。


3、Greenplum-Spark Connector

支持:该方式只支持Spark2.x以上的版本,需要导入greenplum-spark包,我在使用Maven时没发现有该包,因此需要自己下载并导入Maven私库中使用。对应不同greenplum版本的包下载地址是:

https://network.pivotal.io/products/pivotal-gpdb/releases。

Greenplum-Spark Connector版本对应


效率:但是其写入GP库的效率很高,写入上述数据到GP库包括数据的初始化的耗时为5min到6min这个级别。与copyIn的方式效率差不多,而且更适配于Spark的并行式计算方式,问题在于这个包只能支持比较高版本的Spark,老集群只能用copyIn的方式来提升效率了。

来源 https://www.modb.pro/db/330771


相关文章