Nebula Graph 如何快速导入本地数据

2022-06-17 00:00:00 路径 数据 参数 文件 导入

随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,亟需一种支持海量复杂数据关系运算的数据库即图数据库。本系列文章是学习知识图谱以及图数据库相关的知识梳理与总结

本文会包含如下内容:

Nebula Graph 如何基于Exchange快速导入本地数据
本篇文章适合人群:架构师、技术专家、对知识图谱与图数据库感兴趣的工程师

1. 什么是 Nebula Exchange
Nebula Exchange(简称为 Exchange)是一款 Apache Spark™ 应用,用于在分布式环境中将集群中的数据批量迁移到 Nebula Graph 中,能支持多种不同格式的批式数据和流式数据的迁移。

Exchange 由 Reader、Processor 和 Writer 三部分组成。Reader 读取不同来源的数据返回 DataFrame 后,Processor 遍历 DataFrame 的每一行,根据配置文件中 fields 的映射关系,按列名获取对应的值。在遍历指定批处理的行数后,Writer 会将获取的数据一次性写入到 Nebula Graph 中。下图描述了 Exchange 完成数据转换和迁移的过程。

但官方说如果是红框内的文件,则文件必须在分布式文件系统HDFS上,不支持本地文件,本文给出读取本地文件的示例

spark读取文件本身支持不同的文件类型,详见 Spark中读取本地Windows文件

[file://]代表本地文件路径, 如果是window要话,路径可以是:file:///E:/aa/bb/cc.txt, 或 file:///E:\\aa\\bb\\cc.txt

  [hdfs://]代表hdfs文件路径

如果路径没有文件头,spark会将该路径默认添加上"hdfs://"




2. 导入CSV文件
官方的示例,参见https://docs.nebula-graph.com.cn/nebula-exchange/use-exchange/ex-ug-import-from-csv/

2.1. 修改源代码
源码中spark的master参数 是通过spark-submit的--master参数传入的,为了方便在IDEA中运行,我们调整源代码,在调用方法时,增加master参数(注意:参数是默认参数,默认为空)

创建ExchangeUtil类,内容是Exchange的源码,但针对代码做了调整,将main中的代码封装为两2个方法:initParam, importData,方便调用。

package com.vesoft.nebula.exchange

import java.io.File

import com.vesoft.nebula.exchange.config._
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, ReloadProcessor, VerticesProcessor}
import com.vesoft.nebula.exchange.reader._
import org.apache.commons.lang3.StringUtils
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* SparkClientGenerator is a simple spark job used to write data into Nebula Graph parallel.
*/
object ExchangeUtil {
private[this] val LOG = Logger.getLogger(this.getClass)

def main(args: Array[String]): Unit = {
val PROGRAM_NAME = "Nebula Graph Exchange"
val (c, configs, spark) = initParam(args, PROGRAM_NAME);
importData(c,configs,spark)
spark.close()
sys.exit(0)
}

def initParam(args: Array[String], programName: String, masterStr: String = ""): (Argument, Configs, SparkSession) = {
val options = Configs.parser(args, programName)
val c: Argument = options match {
case Some(config) => config
case _ =>
LOG.error("Argument parse failed")
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
LOG.info(s"Config ${configs}")

val session = SparkSession
.builder()
.appName(programName)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.shuffle.partitions", "1")
if (StringUtils.isNoneBlank(masterStr)) {
session.master(masterStr)
}

for (key <- configs.sparkConfigEntry.map.keySet) {
session.config(key, configs.sparkConfigEntry.map(key))
}

val sparkConf = new SparkConf()
sparkConf.registerKryoClasses(Array(classOf[com.facebook.thrift.async.TAsyncClientManager]))

// config hive for sparkSession
if (c.hive) {
if (configs.hiveConfigEntry.isEmpty) {
LOG.info("you don't config hive source, so using hive tied with spark.")
} else {
val hiveConfig = configs.hiveConfigEntry.get
sparkConf.set("spark.sql.warehouse.dir", hiveConfig.warehouse)
sparkConf
.set("javax.jdo.option.ConnectionURL", hiveConfig.connectionURL)
.set("javax.jdo.option.ConnectionDriverName", hiveConfig.connectionDriverName)
.set("javax.jdo.option.ConnectionUserName", hiveConfig.connectionUserName)
.set("javax.jdo.option.ConnectionPassword", hiveConfig.connectionPassWord)
}
}

session.config(sparkConf)

if (c.hive) {
session.enableHiveSupport()
}

val spark = session.getOrCreate()

// reload for failed import tasks
if (!c.reload.isEmpty) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload")

val data = spark.read.text(c.reload)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
processor.process()
LOG.info(s"batchSuccess.reload: ${batchSuccess.value}")
LOG.info(s"batchFailure.reload: ${batchFailure.value}")
sys.exit(0)
}
(c, configs, spark);
}

def importData(c:Argument,configs: Configs,spark:SparkSession):Unit={
// import tags
if (configs.tagsConfig.nonEmpty) {
for (tagConfig <- configs.tagsConfig) {
LOG.info(s"Processing Tag ${tagConfig.name}")

val fieldKeys = tagConfig.fields
LOG.info(s"field keys: ${fieldKeys.mkString(", ")}")
val nebulaKeys = tagConfig.nebulaFields
LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}")

val data = createDataSource(spark, tagConfig.dataSourceConfigEntry)
if (data.isDefined && !c.dry) {
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
val batchFailure =
spark.sparkContext.longAccumulator(s"batchFailure.${tagConfig.name}")

val processor = new VerticesProcessor(
repartition(data.get, tagConfig.partition, tagConfig.dataSourceConfigEntry.category),
tagConfig,
fieldKeys,
nebulaKeys,
configs,
batchSuccess,
batchFailure)
processor.process()
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"batchSuccess.${tagConfig.name}: ${batchSuccess.value}")
LOG.info(s"batchFailure.${tagConfig.name}: ${batchFailure.value}")
}
}
}
} else {
LOG.warn("Tag is not defined")
}

// import edges
if (configs.edgesConfig.nonEmpty) {
for (edgeConfig <- configs.edgesConfig) {
LOG.info(s"Processing Edge ${edgeConfig.name}")

val fieldKeys = edgeConfig.fields
LOG.info(s"field keys: ${fieldKeys.mkString(", ")}")
val nebulaKeys = edgeConfig.nebulaFields
LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}")
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry)
if (data.isDefined && !c.dry) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")

val processor = new EdgeProcessor(
repartition(data.get, edgeConfig.partition, edgeConfig.dataSourceConfigEntry.category),
edgeConfig,
fieldKeys,
nebulaKeys,
configs,
batchSuccess,
batchFailure
)
processor.process()
if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"batchSuccess.${edgeConfig.name}: ${batchSuccess.value}")
LOG.info(s"batchFailure.${edgeConfig.name}: ${batchFailure.value}")
}
}
}
} else {
LOG.warn("Edge is not defined")
}

// reimport for failed tags and edges
if (ErrorHandler.existError(configs.errorConfig.errorPath)) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(configs.errorConfig.errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
processor.process()
LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
}
}

/**
* Create data source for different data type.
*
* @param session The Spark Session.
* @param config The config.
* @return
*/
private[this] def createDataSource(
session: SparkSession,
config: DataSourceConfigEntry
): Option[DataFrame] = {
config.category match {
case SourceCategory.PARQUET =>
val parquetConfig = config.asInstanceOf[FileBaseSourceConfigEntry]
LOG.info(s"""Loading Parquet files from ${parquetConfig.path}""")
val reader = new ParquetReader(session, parquetConfig)
Some(reader.read())
case SourceCategory.ORC =>
val orcConfig = config.asInstanceOf[FileBaseSourceConfigEntry]
LOG.info(s"""Loading ORC files from ${orcConfig.path}""")
val reader = new ORCReader(session, orcConfig)
Some(reader.read())
case SourceCategory.JSON =>
val jsonConfig = config.asInstanceOf[FileBaseSourceConfigEntry]
LOG.info(s"""Loading JSON files from ${jsonConfig.path}""")
val reader = new JSONReader(session, jsonConfig)
Some(reader.read())
case SourceCategory.CSV =>
val csvConfig = config.asInstanceOf[FileBaseSourceConfigEntry]
LOG.info(s"""Loading CSV files from ${csvConfig.path}""")
val reader =
new CSVReader(session, csvConfig)
Some(reader.read())
case SourceCategory.HIVE =>
val hiveConfig = config.asInstanceOf[HiveSourceConfigEntry]
LOG.info(s"""Loading from Hive and exec ${hiveConfig.sentence}""")
val reader = new HiveReader(session, hiveConfig)
Some(reader.read())
case SourceCategory.KAFKA => {
val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry]
LOG.info(s"""Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""")
val reader = new KafkaReader(session, kafkaConfig)
Some(reader.read())
}
case SourceCategory.NEO4J =>
val neo4jConfig = config.asInstanceOf[Neo4JSourceConfigEntry]
LOG.info(s"Loading from neo4j config: ${neo4jConfig}")
val reader = new Neo4JReader(session, neo4jConfig)
Some(reader.read())
case SourceCategory.MYSQL =>
val mysqlConfig = config.asInstanceOf[MySQLSourceConfigEntry]
LOG.info(s"Loading from mysql config: ${mysqlConfig}")
val reader = new MySQLReader(session, mysqlConfig)
Some(reader.read())
case SourceCategory.PULSAR =>
val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry]
LOG.info(s"Loading from pulsar config: ${pulsarConfig}")
val reader = new PulsarReader(session, pulsarConfig)
Some(reader.read())
case SourceCategory.JANUS_GRAPH =>
val janusGraphSourceConfigEntry = config.asInstanceOf[JanusGraphSourceConfigEntry]
val reader = new JanusGraphReader(session, janusGraphSourceConfigEntry)
Some(reader.read())
case SourceCategory.HBASE =>
val hbaseSourceConfigEntry = config.asInstanceOf[HBaseSourceConfigEntry]
val reader = new HBaseReader(session, hbaseSourceConfigEntry)
Some(reader.read())
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
}
}
}

/**
* Repartition the data frame using the specified partition number.
*
* @param frame
* @param partition
* @return
*/
private[this] def repartition(frame: DataFrame,
partition: Int,
sourceCategory: SourceCategory.Value): DataFrame = {
if (partition > 0 && !CheckPointHandler.checkSupportResume(sourceCategory)) {
frame.repartition(partition).toDF
} else {
frame
}
}
}
2.2. 配置文件
需要将F:/nebula/nebula-web-docker-master/example/ 替换为实际文件的路径。

{
spark: {
app: {
name: Nebula Exchange 2.0
}
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory:1G
}

cores {
max: 16
}
}
nebula: {
address:{
graph:["172.25.21.22:9669"]
meta:["172.25.21.22:9559"]
}
user: user
pswd: password
space: mooc
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
output: errors
}
rate: {
limit: 1024
timeout: 1000
}
}
tags: [
{
name: course
type: {
source: csv
sink: client
}
path: "file:///F:/nebula/nebula-web-docker-master/example/mooc-actions/course.csv"
fields: [_c0, _c1]
nebula.fields: [courseId, courseName]
vertex: _c1
separator: ","
header: false
batch: 256
partition: 32
}
{
name: user
type: {
source: csv
sink: client
}
path: "file:///F:/nebula/nebula-web-docker-master/example/mooc-actions/user.csv"
fields: [_c0]
nebula.fields: [userId]
vertex: _c0
separator: ","
header: false
batch: 256
partition: 32
}
]
edges: [
{
name: action
type: {
source: csv
sink: client
}
path: "file:///F:/nebula/nebula-web-docker-master/example/mooc-actions/actions.csv"
fields: [_c0, _c3, _c4, _c5, _c6, _c7, _c8]
nebula.fields: [actionId, duration, feature0, feature1, feature2, feature3, label]
source: _c1
target: _c2
separator: ","
header: false
batch: 256
partition: 32
}
]
}
2.3. 导入程序入口
object ExchangeTest {
private[this] val LOG = Logger.getLogger(this.getClass)

def main(args: Array[String]): Unit = {
testDataImport()

}

def testDataImport(): Unit = {
val conf="E:/gitcodes/nebula/nebula-s" +
"park-utils/nebula-exchange/src/main/resources/mooc_sst_application.conf";
val params : Array[String] = Array("-c",conf)
val PROGRAM_NAME = "Nebula Graph Exchange"
val (c, configs, spark) = ExchangeUtil.initParam(params, PROGRAM_NAME, "local[4]");
ExchangeUtil.importData(c, configs, spark)
spark.close()
sys.exit(0)
}
}
2.4. 导入日志
2021-04-20 10:16:15,792 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:15,799 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:15,805 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:17,930 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:2-2124
2021-04-20 10:16:17,951 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:17,953 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:17,956 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:17,960 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:4-4
2021-04-20 10:16:17,977 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:17,978 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:17,980 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:17,985 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:5-5
2021-04-20 10:16:18,012 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:18,014 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:18,319 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:18,325 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:6-6
2021-04-20 10:16:18,341 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:18,343 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:18,346 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:18,351 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:7-5
2021-04-20 10:16:18,677 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:18,679 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:18,984 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:21,495 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:21,690 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:8-2706
2021-04-20 10:16:21,718 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:21,720 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:21,722 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:21,735 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:9-13
2021-04-20 10:16:21,797 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:21,800 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:21,805 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:0-5
2021-04-20 10:16:24,753 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:24,823 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669
2021-04-20 10:16:24,825 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:24,828 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:24,833 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:11-5
2021-04-20 10:16:25,656 INFO [com.vesoft.nebula.exchange.GraphProvider:48] - switch space mooc
2021-04-20 10:16:25,658 INFO [com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter:147] - Connection to List(172.25.21.22:9559)
2021-04-20 10:16:25,668 INFO [com.vesoft.nebula.exchange.processor.VerticesProcessor:81] - spark partition for vertex cost time:10-10
2021-04-20 10:16:25,681 INFO [com.vesoft.nebula.client.graph.net.NebulaPool:105] - Get connection to 172.25.21.22:9669

————————————————
版权声明:本文为CSDN博主「java编程艺术」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/penriver/article/details/115893851

相关文章