基于Kafka+Spark+SequoiaDB实时处理架构快速实战

2022-03-23 00:00:00 创建 数据 集群 节点 检查

背景


随着系统数据量逐渐增大,数据实时处理成为许多公司需要面对的首要问题。为了应对这些问题,许多公司使用分布式开源消息系统 Kafka 作为数据管道和消息系统,并使用 Spark Streaming 消费处理数据。


Spark Streaming 是核心 Spark API 的一个扩展,它吞吐量高,能够处理实时数据流,并具有很好的可拓展性和容错性。Kafka 是一种基于发布/订阅的分布式消息系统,吞吐率高,提供消息持久化能力、消息分区、分布式消费、在线水平拓展、离线数据处理和实时数据处理等功能,同时能保证每个分区内的消息按顺序传输。


巨杉数据库 SequoiaDB 支持海量分布式数据存储,并且支持垂直分区和水平分区。利用这些特性,可以将 Kafka 中的消息存储到 SequoiaDB 中,方便业务系统后续进行数据分析和数据应用。


本文主要讲解 Spark Streaming 是如何消费并处理 Kafka 中的数据,并存储在巨杉数据库 SequoiaDB 中的。


02

架构设计



其中,数据的实时处理流程主要分为数据采集、数据加工和数据存储三大模块。数据产生系统生产数据并实时发送到 Kafka 队列,并由 Kafka 保证数据在传输中的顺序。之后 Kafka 队列会对接 Spark Streaming,Spark Streaming 定时将接收到的数据批量执行。处理完成后,数据将会以 JSON 的形式写入到 SequoiaDB 当中。

03

环境搭建


3.1 部署环境

  • 服务器分布

  • 软件配置

3.2 zookeeper 集群安装

1. 将下载的 zookeeper 解压到 opt/soft 目录下

    tar -zxvf zookeeper-3.4.8.tar.gz -C opt/soft

    2. 进入 zookeeper 的 conf 目录

      cd opt/soft/zookeeper-3.4.8/conf

      3. 复制一份 zoo_sample.cfg  zoo.cfg

        cp zoo_sample.cfg zoo.cfg

        4. 创建 data 目录
          mkdir opt/soft/zookeeper-3.4.8/data
          5. 修改 zoo.cfg
            vi zoo.cfg
            • 修改

              dataDir=/opt/soft/zookeeper-3.4.8/data
              • 新增

                server.0=server1:2888:3888
                server.1=server2:2888:3888
                server.2=server3:2888:3888
                6. 在 data 目录中创建 myid 并输入0
                  echo '0'>myid
                  7. scp zookeeper 到其他机器
                    scp -r zookeeper-3.4.8 root@server2:/opt/soft/
                    scp -r zookeeper-3.4.8 root@server3:/opt/soft/
                    8. 其他机器的 data 目录下的 myid 也相应更改
                    9. 检查集群
                    • 分别在三台机器上执行:zkServer.sh start

                    • 检查ZooKeeper状态:zkServer.sh status,应该是一个 leader,两个 follower

                    • jps:检查三个节点是否都有 QuromPeerMain 进程


                    3.3 kafka 安装

                    1. 将下载的 Kafka 解压到 opt/soft 目录下

                      tar -zxvf kafka_2.11-0.10.2.1.tgz -C opt/soft

                      2. 进入 Kafka/libs 目录

                        cd opt/soft/kafka_2.11-0.10.2.1/libs

                        3. 从 zookeepe r中复制 zookeeper-3.4.8.jar

                          cp opt/soft/zookeeper-3.4.8/zookeeper-3.4.8.jar ./

                          4. 删除 zookeeper-3.4.9.jar

                            rm zookeeper-3.4.9.jar

                            5. 进入 config 目录

                              cd config/

                              6. 创建 Kafka-logs 目录

                                /opt/soft/kafka_2.11-0.10.2.1/kafka-logs

                                7. 编辑 server.properties

                                  vi server.properties

                                  • broker.id:每台机器依次增长的整数,0、1、2,集群中 Broker 的id更改

                                    log.dirs=/opt/soft/kafka_2.11-0.10.2.1/kafka-logszookeeper.connect=server1:2181,server2:2181,server3:2181
                                    • 添加(当前机器的 ip)

                                      listeners=PLAINTEXT://192.168.106.189:9092
                                      advertised.listeners=PLAINTEXT://192.168.106.189:9092

                                      8. scp 到其他机器并按照规则更改 server.properties

                                        scp -r opt/soft/zookeeper-3.4.8/ server2:/opt/soft
                                        scp -r opt/soft/zookeeper-3.4.8/ server3:/opt/soft

                                        9. 检查集群

                                        • 创建 topic

                                          bin/kafka-topics.sh --zookeeper server2:2181,server3:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create

                                          • 创建 producer

                                            bin/kafka-console-producer.sh --broker-list server1:9092,server2:9092,server3:9092 --topic TestTopic

                                            • 创建 consumer

                                              bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic TestTopic --from-beginning


                                              3.4 Spark 集群安装

                                              1. 将下载的 Spark 解压到 opt/soft 目录下
                                                tar -zxvf spark-2..1-bin-hadoop2.7.tgz  -C opt/soft
                                                2. 进入 spark-2.0.1-bin-hadoop2.7/conf 目录
                                                  cd opt/soft/ spark-2.0.1-bin-hadoop2.7/conf
                                                  3. 复制一份 spark-env.sh, slaves
                                                    cp spark-env.sh.template spark-env.sh
                                                    cp slaves.template slaves
                                                    4. 更改 spark-env.sh
                                                      vi spark-env.sh
                                                      SPARK_WORKER_INSTANCES=2
                                                      SPARK_DAEMON_MEMORY=2g
                                                      SPARK_MASTER_IP=10.196.9.21
                                                      JAVA_HOME=/usr/local/java/jdk1.8.0_121
                                                      5. 编辑 slaves
                                                        vi slaves
                                                        server1
                                                        server2
                                                        server3
                                                        6. scp 到其他机器
                                                          scp -r opt/soft/ spark-2.0.1-bin-hadoop2.7/ server2:/opt/soft
                                                          scp -r opt/soft/ spark-2.0.1-bin-hadoop2.7/ server3:/opt/soft
                                                          7. 进入 sbin
                                                            ./start-all.sh
                                                            • 在浏览器打开 Spark 的监控页面,查看集群是否启动成功

                                                              http://192.168.106.187:8080
                                                              8. Spark 与 Kafka 集成

                                                              https://mvnrepository.com/按照scala, kafka, spark的版本下载驱动 spark-streaming-kafka-0-10_2.11-2.0.1.jar 复制到 jars 目录下。

                                                              • 在 kafka 目录下复制以下 jar 包到 jars 目录下

                                                                cp /opt/soft/kafka/libs/ kafka-clients-0.10.2.1.jar  opt/soft/spark/jars
                                                                cp /opt/soft/kafka/libs/ zkclient-0.10.jar opt/soft/spark/jars
                                                                cp /opt/soft/kafka/libs/ metrics-core-2.2.0.jar opt/soft/spark/jars


                                                                3.5 安装 SequoiaDB

                                                                1. 解压安装包
                                                                  tar -zxvf sequoiadb-3.2.3-linux_x86_64-enterprise-installer.tar.gz
                                                                  2. 安装命令
                                                                    ./sequoiadb-3.2.3-linux_x86_64-enterprise-installer.run --mode text --installer-language en --prefix opt/sequoiadb --upgrade false --force false --username sdbadmin --groupname sdbadmin_group --userpasswd sdbadmin --port 11790 --processAutoStart false --SMS true 
                                                                    Please choose an option [1] : 1
                                                                    Do you want to continue? [Y/n]: Y
                                                                    3. 切换到 sdbadmin 用户
                                                                      su - sdbadmin
                                                                      4. 查看 sdb 服务 
                                                                        cd opt/sequoiadb/bin
                                                                        • 检查节点是否已经启动

                                                                          ./sdblist -t all
                                                                          ./sdblist -l
                                                                          • 配置 opt 文件权限

                                                                            chown -R sdbadmin:sdbadmin_group /opt
                                                                            • 配置 sdbadmin 互相免密(包括本机)

                                                                              ssh-keygen -t rsa
                                                                              ssh-copy-id ip
                                                                              同时,server2 和 server3 也安装 SequoiaDB。

                                                                              3.6 配置集群

                                                                              一、数据库部署
                                                                              1. 创建临时节点
                                                                                var oma = new Oma("localhost", 11790)
                                                                                oma.createCoord(18800, "/opt/sequoiadb/database/coord/18800")
                                                                                oma.startNode(18800)
                                                                                2. 创建编目节点
                                                                                  var db = new Sdb("localhost",18800)
                                                                                  db.createCataRG("server1", 11800, "/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
                                                                                  var cataRG = db.getRG("SYSCatalogGroup");
                                                                                  var node1 = cataRG.createNode("server2", 11800,"/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
                                                                                  node1.start()
                                                                                  var node1 = cataRG.createNode("server3", 11800,"/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
                                                                                  node1.start()
                                                                                  3. 创建数据节点
                                                                                  • 创建数据组 rg1

                                                                                    var dataRG = db.createRG("rg1")
                                                                                    dataRG.createNode("server1", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})
                                                                                    dataRG.createNode("server2", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
                                                                                    dataRG.createNode("server3", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
                                                                                    dataRG.start()
                                                                                    • 创建数据组 rg2

                                                                                      var dataRG1 = db.createRG("rg2")
                                                                                      dataRG1.createNode("server1", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})
                                                                                      dataRG1.createNode("server2", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
                                                                                      dataRG1.createNode("server3", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
                                                                                      dataRG1.start()
                                                                                      • 创建数据组 rg3

                                                                                        var dataRG2 = db.createRG("rg3")
                                                                                        dataRG2.createNode("server1", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})
                                                                                        dataRG2.createNode("server2", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
                                                                                        dataRG2.createNode("server3", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
                                                                                        dataRG2.start()

                                                                                        4. 创建协调节点

                                                                                          var rg = db.createCoordRG()
                                                                                          rg.createNode("server1", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
                                                                                          rg.createNode("server2", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
                                                                                          rg.createNode("server3", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
                                                                                          rg.start()
                                                                                          5. 删除临时协调节点
                                                                                            var oma = new Oma("localhost", 11790)
                                                                                            oma.removeCoord(18800)

                                                                                            二、数据库部署检查

                                                                                            1. 首先检查节点是否都已经启动

                                                                                              sdblist -l
                                                                                              sdblist -t all
                                                                                              2. 创建 cs 和 cl
                                                                                                db.createCS("cstest1");
                                                                                                db.cstest1.createCL("cl",{ShardingKey:{"a":1},ShardingType:"hash",AutoSplit:false,AutoIndexId:false,Partition:1024,Compressed:true,CompressionType:"lzw",EnsureShardingIndex:false});
                                                                                                for(var i=;i<10000;i++){db.cstest1.cl.insert({"a":i,age:i})}
                                                                                                • 查看数据

                                                                                                  db.cstest1.cl.count()
                                                                                                  db.cstest1.cl.findOne()
                                                                                                  • 清除 cs

                                                                                                    db.dropCS("cstest1")
                                                                                                    db.createCS("cstest2");
                                                                                                    db.cstest2.createCL("cl",{ShardingKey:{"a":1},ShardingType:"hash",AutoSplit:true,AutoIndexId:false,Partition:1024,Compressed:true,CompressionType:"lzw",EnsureShardingIndex:false});
                                                                                                    for(var i=;i<10000;i++){db.cstest2.cl.insert({"a":i,age:i})}
                                                                                                    • 查看数据

                                                                                                      db.cstest2.cl.count()
                                                                                                      db.cstest2.cl.findOne()
                                                                                                      • 清除 cs

                                                                                                        db.dropCS("cstest2")
                                                                                                        3. 是否可以重新选主
                                                                                                        • 连接数据组进行节点重启

                                                                                                          db = new Sdb("localhost",11810)
                                                                                                          var rg = db.getRG("rg1")
                                                                                                          rg.reelect()
                                                                                                          • 检查节点是否重新分配

                                                                                                            sdblist -l
                                                                                                            4. 节点运行时配置项的查看
                                                                                                            方法:
                                                                                                              bin/sdblist -p xxx端口 --detail –expand
                                                                                                              • 检查 weight 是否为 80

                                                                                                              • 检查 logfilesz 是否为 1024

                                                                                                              • 检查 sortbuf 是否为 512

                                                                                                              • 检查 shardingbreak 是否为 180000


                                                                                                              04

                                                                                                              数据实时处理场景分析与实现


                                                                                                              数据加工层实际上是 Spark Streaming 处理层。在该层中,系统使用可靠读取的方式从 Spark Streaming 中获得相应的数据,然后在 JSON 数据中插入操作时间戳写入数据记录本身,然后插入数据库集合内:
                                                                                                              { TS: 135789023,
                                                                                                                ACCOUNTID: xxx,
                                                                                                                FIELD2: yyy,
                                                                                                                FIELD3: zzz,
                                                                                                                …
                                                                                                              }

                                                                                                              4.1 SequoiaDB 建立集合和集合空间

                                                                                                              1. 创建域
                                                                                                                db.createDomain('mydomain',['rg1','rg2','rg3'],{AutoSplit:true})
                                                                                                                2. 创建集合空间
                                                                                                                  db.createCS( "foo", { PageSize: 4096, Domain: "mydomain" } )
                                                                                                                  3.创建集合
                                                                                                                    db.createCS( "foo", { PageSize: 4096, Domain: "mydomain" } )


                                                                                                                    4.2 产生数据

                                                                                                                      import java.util.Properties
                                                                                                                      import kafka.javaapi.producer.Producer
                                                                                                                      import kafka.producer.{KeyedMessage, ProducerConfig}
                                                                                                                      import org.codehaus.jettison.json.JSONObject
                                                                                                                      import java.util.UUID
                                                                                                                      import scala.util.Random
                                                                                                                      object KafkaProducer {
                                                                                                                      private val random = new Random()
                                                                                                                      private var pointer = -1
                                                                                                                      def getID() : String =UUID.randomUUID().toString
                                                                                                                      def account() : Double = {
                                                                                                                      random.nextInt(1000000)
                                                                                                                      }
                                                                                                                      def main(args: Array[String]): Unit = {
                                                                                                                      val topic = "test"
                                                                                                                      val brokers = "192.168.106.187:9092"
                                                                                                                      val props = new Properties()
                                                                                                                      props.put("metadata.broker.list", brokers)
                                                                                                                      props.put("serializer.class", "kafka.serializer.StringEncoder")
                                                                                                                      val kafkaConfig = new ProducerConfig(props)
                                                                                                                      val producer = new Producer[String, String](kafkaConfig)
                                                                                                                      while(true) {
                                                                                                                      // prepare event data
                                                                                                                      val event = new JSONObject()
                                                                                                                      event
                                                                                                                      .put("id", getID)
                                                                                                                      .put("account", account())
                                                                                                                      // produce event message
                                                                                                                      producer.send(new KeyedMessage[String, String](topic, event.toString))
                                                                                                                      println("Message sent: " + event)


                                                                                                                      Thread.sleep(200)
                                                                                                                      }
                                                                                                                      }


                                                                                                                      }


                                                                                                                      4.3 实时处理数据

                                                                                                                        import com.sequoiadb.base.Sequoiadb
                                                                                                                        import org.apache.kafka.common.serialization.StringDeserializer
                                                                                                                        import org.slf4j.LoggerFactory
                                                                                                                        import org.apache.spark.SparkConf
                                                                                                                        import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
                                                                                                                        import org.apache.spark.streaming.kafka010.KafkaUtils
                                                                                                                        import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
                                                                                                                        import org.apache.spark.streaming.{Seconds, StreamingContext}
                                                                                                                        import org.json4s._
                                                                                                                        import org.json4s.jackson.JsonMethods._


                                                                                                                        object AccountAnalytics {


                                                                                                                        val db= new Sequoiadb("192.168.106.188","","")
                                                                                                                        val bar=db.getCollectionSpace("foo").getCollection("bar")


                                                                                                                        implicit val formats = DefaultFormats
                                                                                                                        def main(args: Array[String]) {
                                                                                                                        val log = LoggerFactory.getLogger(this.getClass)


                                                                                                                        val kafkaParams = Map[String, Object](
                                                                                                                        "bootstrap.servers" -> "192.168.106.187:9092",
                                                                                                                        "key.deserializer" -> classOf[StringDeserializer],
                                                                                                                        "value.deserializer" -> classOf[StringDeserializer],
                                                                                                                        "group.id" -> "example",
                                                                                                                        "auto.offset.reset" -> "latest",
                                                                                                                        "enable.auto.commit" -> (false: java.lang.Boolean)
                                                                                                                        )
                                                                                                                        val masterUrl = if (args.length > ) args() else "local[2]"
                                                                                                                        val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCount")
                                                                                                                        val streamingContext = new StreamingContext(conf, Seconds(5))
                                                                                                                        val topics = Array("test")
                                                                                                                        val kafkaStream = KafkaUtils.createDirectStream[String, String](
                                                                                                                        streamingContext,
                                                                                                                        PreferConsistent,
                                                                                                                        Subscribe[String, String](topics, kafkaParams)
                                                                                                                        )
                                                                                                                        val events = kafkaStream.flatMap(line => {
                                                                                                                        val data = parse(line.value(),true)
                                                                                                                        // mapper.readValue(line., classOf[SdkBean])
                                                                                                                        val event = data.extract[Flow]
                                                                                                                        Some(event)
                                                                                                                        })
                                                                                                                        log.info(s"$events")
                                                                                                                        val userClicks = events.map { event => (event.id.getOrElse("null"), event.account)
                                                                                                                        }
                                                                                                                        userClicks.foreachRDD { rdd => {
                                                                                                                        rdd.foreachPartition(partitionOfRecords => {
                                                                                                                        partitionOfRecords.foreach(pair => {
                                                                                                                        val uid = pair._1
                                                                                                                        val account = pair._2
                                                                                                                        val time=System.currentTimeMillis();
                                                                                                                        log.info(s"$id:$time")
                                                                                                                        bar.insert("{'id':'"+id+"','account':'"+account+"','time':"+time+ "}")
                                                                                                                        }
                                                                                                                        )


                                                                                                                        })
                                                                                                                        }
                                                                                                                        }
                                                                                                                        streamingContext.start()
                                                                                                                        streamingContext.awaitTermination()
                                                                                                                        db.releaseResource()
                                                                                                                        }
                                                                                                                        }
                                                                                                                        case class Flow(id: Option[String], account: String)
                                                                                                                        4.4 打包运行
                                                                                                                        打包后用 saprk-submit 提交应用

                                                                                                                        spark-submit --master spark://server1:7077 --class com.simple.AccountAnalytics --deploy-mode client ./test.jar

                                                                                                                        总结


                                                                                                                        数据产生之后,首先会提交到 Kafka 消息队列,并交由 Spark Streaming 进行实时处理,结合高性能的巨杉数据库 SequoiaDB,处理后的数据后会存储在 SequoiaDB 当中。

                                                                                                                        相关文章