使用Aerospike的 Spark 连接器(Spark Connect)

2022-06-21 00:00:00 数据 集群 示例 运行 加载

前言

Aerospike是一个高度可扩展的键值数据库,可提供同类产品中佳的性能。 它在实时业务环境中通常部署管理TB到PB数据量。

Aerospike通常与其他可扩展的分布式软件(例如,用于系统耦合的Kafka或用于分析的Spark)一起运行。 Aerospike提供的 Aerospike Connect 附件使集成变得很容易。

本文通过使用 aerospike-ansible 讨论了Aerospike Spark Connect在实际中的工作方式,并提供一个全面且易于复制的端到端示例。

一、数据库集群设置

首先看一下Ansible for Aerospike,它解释了如何使用 aerospike-ansible。

在此示例中,我在vars/cluster-config.yml 中将 cluster_instance_type 设置为 c5d.18xlarge。

按照说明进行操作,直到并包括一键式设置,后我们会运行到

ansible-playbook aws-setup-plus-aerospike-install.yml
ansible-playbook aerospike-java-client-setup.yml
  • 1
  • 2

这会产生一个3个节点的群集,以及一个安装了相关软件的客户端实例。

Spark 集群设置

这是通过

ansible-playbook spark-cluster-setup.yml
  • 1

对于此示例,在运行之前,我在 vars / cluster-config.yml 中将 spark_instance_type 设置为 c5d.4xlarge。

该腳本创建了一个3节点的给定实例类型的Spark集群,在其中已安装并运行了Spark,它还安装了Aerospike Spark Connect。

请注意,您需要设置 enterprise:true,并通过 vars/cluster-config.yml 中的 feature_key:/your/path/to/key 提供有效的Aerospike功能密钥的路径。 因此,您必须是Aerospike的授权客户,或者必须正在Aerospike企业版试用期。

在过程即将结束时,您会看到

TASK [Spark master IP & master internal url] ************************************************************************************************************************************************************************
ok: [localhost] => {
    "msg": "Spark master is 3.88.237.103. Spark master internal url is spark://10.0.2.122:7077."
}
  • 1
  • 2
  • 3
  • 4

记下Spark master内部网址,稍后需要。

加载数据

我们的示例利用了来自10亿纽约出租车司机库的2000万条记录,这些记录以压缩形式提供,网址为 https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz。 我们使用安装在上面设置的客户端计算机上的Aerospike loader加载程序把数据加载到Aerospike。

首先,我们获得Aerospike集群的节点的地址,稍后需要这些地址。

source ./scripts/cluster-ip-address-list.sh
  • 1

样品输出

Adds cluster ips to this array- AERO_CLUSTER_IPS
Use as ${ AERO_CLUSTER_IPS[index]}
There are 3 entries

##########################################################

cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58
cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234
cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

aerospike loader 加载器需要一个配置文件才能将数据加载到Aerospike中。 这会将 csv 列位置映射到命名和类型化的bin。 样本条目看起来像

{
    "name": "pkup_datetime",
    "value": {
        "column_position": 3,
        "type": "timestamp",
        "encoding": "yyyy-MM-dd hh:mm:ss",
        "dst_type": "integer"
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在 repos/aerospike-spark-demo/nyc-taxi- style="box-sizing: border-box; outline: 0px; margin: 0px 0px 24px; padding: 8px 16px 6px 56px; font-weight: 400; position: relative; font-family: "Source Code Pro", "DejaVu Sans Mono", "Ubuntu Mono", "Anonymous Pro", "Droid Sans Mono", Menlo, Monaco, Consolas, Inconsolata, Courier, monospace, "PingFang SC", "Microsoft YaHei", sans-serif; font-size: 14px; overflow: auto hidden; border: none; line-height: 22px; color: rgb(0, 0, 0); overflow-wrap: break-word; background-color: rgb(40, 44, 52); font-style: normal; font-variant-ligatures: no-common-ligatures; font-variant-caps: normal; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">source ./scripts/client-ip-address-list.sh scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[]}:~

  • 2

接下来,将数据放入客户端计算机。 有多种方法可以执行此操作,但是您需要进行规划,因为未压缩时数据集为7.6GB。 我使用了以下命令,但是具体情况取决于您的闪存和文件系统的具体情况

./scripts/client-quick-ssh.sh # to log in, followed by

sudo mkfs.ext4 /dev/nvme1n1
sudo mkdir /data
sudo mount -t ext4 /dev/nvme1n1 /data
sudo chmod 777 /data

wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz
gunzip /data/trips_xaa.csv.gz
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

后,我们使用上传的配置文件加载数据。

cd ~/aerospike-loader
./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv 
  • 1
  • 2

请注意,我们使用的是我们先前记录的群集IP地址之一。

使用Spark

登录到Spark的一个节点,通过aerospike-ansible中的一个工具脚本

./scripts/spark-quick-ssh.sh 
  • 1

使用我们在运行Spark集群安装手册时看到的Spark主URL启动Spark Shell。

/spark/bin/spark-shell --master spark://10.0.2.122:7077
  • 1

导入相关库

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf
import java.util.Date
import java.text.SimpleDateFormat
  • 1
  • 2
  • 3
  • 4

提供Aerospike配置。请注意,我们在这里使用了之前的集群ip:

spark.conf.set("aerospike.seedhost", "10.0.0.234")
spark.conf.set("aerospike.namespace", "test")
  • 1
  • 2

定义一个视图,以及我们将要使用的功能

val sqlContext = spark.sqlContext
sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))
val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load
taxi.createOrReplaceTempView("taxi")
  • 1
  • 2
  • 3
  • 4

后,运行我们的查询语句

// Journeys grouped by cab type
val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")
result.show()

+--------+--------+                                                             
|cab_type|   count|
+--------+--------+
|   green|20000000|
+--------+--------+

// Average fare based on different passenger count
val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")
result.show()

+-------------+----------+                                                      
|passenger_cnt|avg_amount|
+-------------+----------+
|            |     10.86|
|            1|     14.63|
|            2|     15.75|
|            3|     15.87|
|            4|     15.85|
|            5|     14.76|
|            6|     15.42|
|            7|     23.74|
|            8|     19.52|
|            9|      34.9|
+-------------+----------+

// No of journeys for different numbers of passengers
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");
result.show()

+-------------+---------+--------+                                              
|passenger_cnt|trip_year|   count|
+-------------+---------+--------+
|            |     2014|    4106|
|            1|     2014|16557518|
|            2|     2014| 1473578|
|            3|     2014|  507862|
|            4|     2014|  160714|
|            5|     2014|  939276|
|            6|     2014|  355846|
|            7|     2014|     492|
|            8|     2014|     494|
|            9|     2014|     114|
+-------------+---------+--------+

// Number of trips for each passenger count/distance combination
// Ordered by trip count, descending
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")
result.show()

+-------------+---------+--------+-------+                                      
|passenger_cnt|trip_year|distance|  trips|
+-------------+---------+--------+-------+
|            1|     2014|     1.0|5321230|
|            1|     2014|     2.0|3500458|
|            1|     2014|     3.0|2166462|
|            1|     2014|     4.0|1418494|
|            1|     2014|     5.0| 918460|
|            1|     2014|     0.0| 868210|
|            1|     2014|     6.0| 653646|
|            1|     2014|     7.0| 488416|
|            2|     2014|     1.0| 433746|
|            1|     2014|     8.0| 345728|
|            2|     2014|     2.0| 305578|
|            5|     2014|     1.0| 302120|
|            1|     2014|     9.0| 226278|
|            5|     2014|     2.0| 199968|
|            2|     2014|     3.0| 199522|
|            1|     2014|    10.0| 163928|
|            3|     2014|     1.0| 145580|
|            2|     2014|     4.0| 137152|
|            5|     2014|     3.0| 122714|
|            1|     2014|    11.0| 117570|
+-------------+---------+--------+-------+
only showing top 20 rows
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

结论

这篇文章向您展示了可以很快的启动并运行一个大型数据集。 该示例处理了二千万行数据,并很容易扩展到整个数据集。 我们还可以看到您可以快速启动并运行 Aerospike-ansible 工具。

相关文章