基于Kafka+Spark+SequoiaDB实时处理架构快速实战
背景
02
架构设计
03
环境搭建
3.1 部署环境
服务器分布
软件配置
3.2 zookeeper 集群安装
1. 将下载的 zookeeper 解压到 opt/soft 目录下
tar -zxvf zookeeper-3.4.8.tar.gz -C opt/soft
2. 进入 zookeeper 的 conf 目录
cd opt/soft/zookeeper-3.4.8/conf
3. 复制一份 zoo_sample.cfg zoo.cfg
cp zoo_sample.cfg zoo.cfg
mkdir opt/soft/zookeeper-3.4.8/data
vi zoo.cfg
修改
dataDir=/opt/soft/zookeeper-3.4.8/data
新增
server.0=server1:2888:3888
server.1=server2:2888:3888
server.2=server3:2888:3888
echo '0'>myid
scp -r zookeeper-3.4.8 root@server2:/opt/soft/
scp -r zookeeper-3.4.8 root@server3:/opt/soft/
分别在三台机器上执行:zkServer.sh start
检查ZooKeeper状态:zkServer.sh status,应该是一个 leader,两个 follower
jps:检查三个节点是否都有 QuromPeerMain 进程
1. 将下载的 Kafka 解压到 opt/soft 目录下
tar -zxvf kafka_2.11-0.10.2.1.tgz -C opt/soft
2. 进入 Kafka/libs 目录
cd opt/soft/kafka_2.11-0.10.2.1/libs
3. 从 zookeepe r中复制 zookeeper-3.4.8.jar
cp opt/soft/zookeeper-3.4.8/zookeeper-3.4.8.jar ./
4. 删除 zookeeper-3.4.9.jar
rm zookeeper-3.4.9.jar
5. 进入 config 目录
cd config/
6. 创建 Kafka-logs 目录
/opt/soft/kafka_2.11-0.10.2.1/kafka-logs
7. 编辑 server.properties
vi server.properties
broker.id:每台机器依次增长的整数,0、1、2,集群中 Broker 的id更改
log.dirs=/opt/soft/kafka_2.11-0.10.2.1/kafka-logszookeeper.connect=server1:2181,server2:2181,server3:2181
添加(当前机器的 ip)
listeners=PLAINTEXT://192.168.106.189:9092
advertised.listeners=PLAINTEXT://192.168.106.189:9092
8. scp 到其他机器并按照规则更改 server.properties
scp -r opt/soft/zookeeper-3.4.8/ server2:/opt/soft
scp -r opt/soft/zookeeper-3.4.8/ server3:/opt/soft
9. 检查集群
创建 topic
bin/kafka-topics.sh --zookeeper server2:2181,server3:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create
创建 producer
bin/kafka-console-producer.sh --broker-list server1:9092,server2:9092,server3:9092 --topic TestTopic
创建 consumer
bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic TestTopic --from-beginning
3.4 Spark 集群安装
tar -zxvf spark-2..1-bin-hadoop2.7.tgz -C opt/soft
cd opt/soft/ spark-2.0.1-bin-hadoop2.7/conf
cp spark-env.sh.template spark-env.sh
cp slaves.template slaves
vi spark-env.sh
SPARK_WORKER_INSTANCES=2
SPARK_DAEMON_MEMORY=2g
SPARK_MASTER_IP=10.196.9.21
JAVA_HOME=/usr/local/java/jdk1.8.0_121
vi slaves
server1
server2
server3
scp -r opt/soft/ spark-2.0.1-bin-hadoop2.7/ server2:/opt/soft
scp -r opt/soft/ spark-2.0.1-bin-hadoop2.7/ server3:/opt/soft
./start-all.sh
在浏览器打开 Spark 的监控页面,查看集群是否启动成功
http://192.168.106.187:8080
在https://mvnrepository.com/按照scala, kafka, spark的版本下载驱动 spark-streaming-kafka-0-10_2.11-2.0.1.jar 复制到 jars 目录下。
在 kafka 目录下复制以下 jar 包到 jars 目录下
cp /opt/soft/kafka/libs/ kafka-clients-0.10.2.1.jar opt/soft/spark/jars
cp /opt/soft/kafka/libs/ zkclient-0.10.jar opt/soft/spark/jars
cp /opt/soft/kafka/libs/ metrics-core-2.2.0.jar opt/soft/spark/jars
3.5 安装 SequoiaDB
tar -zxvf sequoiadb-3.2.3-linux_x86_64-enterprise-installer.tar.gz
./sequoiadb-3.2.3-linux_x86_64-enterprise-installer.run --mode text --installer-language en --prefix opt/sequoiadb --upgrade false --force false --username sdbadmin --groupname sdbadmin_group --userpasswd sdbadmin --port 11790 --processAutoStart false --SMS true
Please choose an option [1] : 1
Do you want to continue? [Y/n]: Y
su - sdbadmin
cd opt/sequoiadb/bin
检查节点是否已经启动
./sdblist -t all
./sdblist -l
配置 opt 文件权限
chown -R sdbadmin:sdbadmin_group /opt
配置 sdbadmin 互相免密(包括本机)
ssh-keygen -t rsa
ssh-copy-id ip
var oma = new Oma("localhost", 11790)
oma.createCoord(18800, "/opt/sequoiadb/database/coord/18800")
oma.startNode(18800)
var db = new Sdb("localhost",18800)
db.createCataRG("server1", 11800, "/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
var cataRG = db.getRG("SYSCatalogGroup");
var node1 = cataRG.createNode("server2", 11800,"/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
node1.start()
var node1 = cataRG.createNode("server3", 11800,"/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
node1.start()
创建数据组 rg1
var dataRG = db.createRG("rg1")
dataRG.createNode("server1", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})
dataRG.createNode("server2", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
dataRG.createNode("server3", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
dataRG.start()
创建数据组 rg2
var dataRG1 = db.createRG("rg2")
dataRG1.createNode("server1", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})
dataRG1.createNode("server2", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
dataRG1.createNode("server3", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
dataRG1.start()
创建数据组 rg3
var dataRG2 = db.createRG("rg3")
dataRG2.createNode("server1", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})
dataRG2.createNode("server2", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
dataRG2.createNode("server3", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})
dataRG2.start()
4. 创建协调节点
var rg = db.createCoordRG()
rg.createNode("server1", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
rg.createNode("server2", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
rg.createNode("server3", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})
rg.start()
var oma = new Oma("localhost", 11790)
oma.removeCoord(18800)
1. 首先检查节点是否都已经启动
sdblist -l
sdblist -t all
db.createCS("cstest1");
db.cstest1.createCL("cl",{ShardingKey:{"a":1},ShardingType:"hash",AutoSplit:false,AutoIndexId:false,Partition:1024,Compressed:true,CompressionType:"lzw",EnsureShardingIndex:false});
for(var i=;i<10000;i++){db.cstest1.cl.insert({"a":i,age:i})}
查看数据
db.cstest1.cl.count()
db.cstest1.cl.findOne()
清除 cs
db.dropCS("cstest1")
db.createCS("cstest2");
db.cstest2.createCL("cl",{ShardingKey:{"a":1},ShardingType:"hash",AutoSplit:true,AutoIndexId:false,Partition:1024,Compressed:true,CompressionType:"lzw",EnsureShardingIndex:false});
for(var i=;i<10000;i++){db.cstest2.cl.insert({"a":i,age:i})}
查看数据
db.cstest2.cl.count()
db.cstest2.cl.findOne()
清除 cs
db.dropCS("cstest2")
连接数据组进行节点重启
db = new Sdb("localhost",11810)
var rg = db.getRG("rg1")
rg.reelect()
检查节点是否重新分配
sdblist -l
bin/sdblist -p xxx端口 --detail –expand
检查 weight 是否为 80
检查 logfilesz 是否为 1024
检查 sortbuf 是否为 512
检查 shardingbreak 是否为 180000
04
数据实时处理场景分析与实现
4.1 SequoiaDB 建立集合和集合空间
db.createDomain('mydomain',['rg1','rg2','rg3'],{AutoSplit:true})
db.createCS( "foo", { PageSize: 4096, Domain: "mydomain" } )
db.createCS( "foo", { PageSize: 4096, Domain: "mydomain" } )
4.2 产生数据
import java.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.codehaus.jettison.json.JSONObject
import java.util.UUID
import scala.util.Random
object KafkaProducer {
private val random = new Random()
private var pointer = -1
def getID() : String =UUID.randomUUID().toString
def account() : Double = {
random.nextInt(1000000)
}
def main(args: Array[String]): Unit = {
val topic = "test"
val brokers = "192.168.106.187:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while(true) {
// prepare event data
val event = new JSONObject()
event
.put("id", getID)
.put("account", account())
// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
Thread.sleep(200)
}
}
}
4.3 实时处理数据
import com.sequoiadb.base.Sequoiadb
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s._
import org.json4s.jackson.JsonMethods._
object AccountAnalytics {
val db= new Sequoiadb("192.168.106.188","","")
val bar=db.getCollectionSpace("foo").getCollection("bar")
implicit val formats = DefaultFormats
def main(args: Array[String]) {
val log = LoggerFactory.getLogger(this.getClass)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.106.187:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "example",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val masterUrl = if (args.length > ) args() else "local[2]"
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCount")
val streamingContext = new StreamingContext(conf, Seconds(5))
val topics = Array("test")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val events = kafkaStream.flatMap(line => {
val data = parse(line.value(),true)
// mapper.readValue(line., classOf[SdkBean])
val event = data.extract[Flow]
Some(event)
})
log.info(s"$events")
val userClicks = events.map { event => (event.id.getOrElse("null"), event.account)
}
userClicks.foreachRDD { rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val account = pair._2
val time=System.currentTimeMillis();
log.info(s"$id:$time")
bar.insert("{'id':'"+id+"','account':'"+account+"','time':"+time+ "}")
}
)
})
}
}
streamingContext.start()
streamingContext.awaitTermination()
db.releaseResource()
}
}
case class Flow(id: Option[String], account: String)
spark-submit --master spark://server1:7077 --class com.simple.AccountAnalytics --deploy-mode client ./test.jar
总结
数据产生之后,首先会提交到 Kafka 消息队列,并交由 Spark Streaming 进行实时处理,结合高性能的巨杉数据库 SequoiaDB,处理后的数据后会存储在 SequoiaDB 当中。
相关文章