Greenplum基于Greenplum-SparkConnector的Spark脚本开发及遇到的坑

2023-03-09 00:00:00 数据 并行 专区 写入 博客

参考博客:

Greenplum-Spark Connector 介绍_Greenplum中文社区的博客-CSDN博客

比pgload更快更方便写入大数据量至Greenplum的Greenplum-Spark Connector_秣码一盏的博客-CSDN博客

1、背景
官方推荐了几种将外部数据写入Greenplum方式,分别是jdbc、pgcopy、gpdfdist、以及Pivotal Greenplum-Spark Connector,在官方文档描述中:

jdbc写大数据量会很慢,也是不推荐做大数据写入的;
pgcopy会比jdbc快,但是会占用master节点的资源;
gpdfdist不占用master资源,直接写入segment,能并行写入,但缺点是需要安装客户端,包括gpfdist等依赖;
Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口,下面会基于该组件进行数据写入测试;下载地址(Download VMware Tanzu™ Greenplum® — VMware Tanzu Network)
2、测试代码
2.1 核心测试类
package com.greenplum.spark.gsc

import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/**
* @Description: TODO
* @Author: chenweifeng
* @Date: 2022年08月16日 下午4:00
**/
object GreenplumSparkTest {
// 全局log对象
val LOGGER = Logger.getLogger(this.getClass)

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("gsc-greenplum-test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("INFO")
val spark = SparkSession.builder().config(conf).getOrCreate()
println("spark-version:" + spark.version)
// spark读greenplum
val gscReadOptionMap = Map(
"url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
"user" -> "gpadmin",
"password" -> "******",
"dbschema" -> "public",
"dbtable" -> "test_datax_gp_spark"
)

val gpdf: DataFrame = spark.read.format("greenplum")
.options(gscReadOptionMap)
.load()

gpdf.show()

// spark写greenplum
val gscWriteOptionMap = Map(
"url" -> "jdbc:postgresql://10.***.**.3:54432/pgbenchdb",
"user" -> "gpadmin",
"password" -> "******",
"dbschema" -> "public",
"dbtable" -> "test_datax_gp_spark_w"
)

gpdf.write.format("greenplum")
.mode(SaveMode.Append)
.options(gscWriteOptionMap)
.save()

sc.stop()
}
}

2.2 Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.greenplum.spark</groupId>
<artifactId>gsc-scala-test</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<spark.version>2.4.5</spark.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.spark</groupId>-->
<!-- <artifactId>spark-hive_${scala.version}</artifactId>-->
<!-- <version>${spark.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.spark</groupId>-->
<!-- <artifactId>spark-mllib_${scala.version}</artifactId>-->
<!-- <version>${spark.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.pivotal.greenplum.spark</groupId>
<artifactId>greenplum-spark_${scala.version}</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.pivotal</groupId>
<artifactId>greenplum-jdbc</artifactId>
<version>5.1.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.3-1102-jdbc4</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>${scala.version}</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

3 打包部署
3.1 打包
mvn clean package
3.2 上传包
分别上传greenplum-spark_2.12-2.1.0.jar 和 gsc-scala-test-1.0-SNAPSHOT.jar 这两个包到 spark的bin目录下

3.3 提交Spark任务执行命令
spark-submit \
--class com.greenplum.spark.gsc.GreenplumSparkTest \
--master spark://localhost:7078 \
--jars greenplum-spark_2.12-2.1.0.jar \
gsc-scala-test-1.0-SNAPSHOT.jar
遇到的坑
1、Greenplum-Spark Connector驱动和Spark的版本兼容问题

目前greenplum-spark_2.12-2.1.2.jar只支持Spark2.x版本的环境,如果用Spark3.x会报错不兼容, 后续得看是否会出新的驱动支持spark3.x。
本文来源:https://blog.csdn.net/Carson073/article/details/126500498

相关文章