hive vs spark
Hive 、Hive On SparK、Spark
一、框架
1.1 Hive:
1> 把HQL翻译长map-reduce的代码,并且有可能产生很多mapreduce的job
2> 把生产的Mapreduce代码及相关资源打包成jar并发布到Hadoop的集群当中并进行运行
计算靠MapReduce,存储靠HDFS,驱动靠Driver,运行靠Yarn。将其看作Hadoop的数据仓库工具。
1.2 Hive On Spark
使用Spark作为计算引擎,将Hive的查询作为Spark的任务来执行。
用户的每个Session会创建一个SparkClient,SparkClient会启动RemoteDriver进程,并由RemoteDriver创 建SparkContext。SparkTask执行时,通过Session提交任务,任务的主体就是对应的SparkWork。SparkClient 将任务提交给RemoteDriver,并返回一个SparkJobRef,通过该SparkJobRef,客户端可以监控任务执行进度,进行错误处理以及采集统计信息等。
1.3 Spark
spark是一个实现快速通用的集群计算平台。用来构建大型的、低延迟的数据分析应用程序。它扩展了广泛使用的MapReduce计算模型。高效的支撑更多计算模式,包括交互式查询和流处理。spark的一个主要特点是能够在内存中进行计算,及时依赖磁盘进行复杂的运算,Spark因此比MapReduce更加高效。
二、Spark与Hive对比
2.1 读写流程
2.1.1 Spark读写
⑴读取:Driver根据文件Block数量,创建多个Task并行读取HDFS。读取文件信息构建RDD,等到Active RDD执行,根据RecordReader正式读取Block文件内容,并存储到内存Block中。
⑵写出:Driver根据RDD分区分块情况,计算出需要的Task数,下发任务到Executor,Executor执行这些Task,将具体RDD数据写入到HDFS对应目录下。
2.1.2 Hive读写
⑴读取:①执行MapReduce时:通过对SQl语句的解析,将其划分为多个MapReduce作业,根据map阶段读取数据时,对split的分片数量,启动对应数量的Map进行读取。
②导入表数据时:通过物理计划中的安排,Driver中对应执行MapRedTask,将本地文件复制到HDFS对应目录下。
hive> load data local inpath 'wyp.txt' into table wyp;
Copying data from file:/home/wyp/wyp.txt
Copying file: file:/home/wyp/wyp.txt
Loading data to table default.wyp
Table default.wyp stats:
[num_partitions: 0, num_files: 1, num_rows: 0, total_size: 67]
OK
Time taken: 5.967 seconds
⑵写出:①执行MapReduce时:MapReduce作业中,对应的ReduceTask将结果写入到临时文件夹,然后Driver执行MoveTask,再将结果转移到wareHouse对应目录下。
②其他情况:1.MapredLocalTask:将表内数据从HDFS上复制下来,写入到本地
2.FetchTask:Hive中少数可以不通过MapReduce作业而获取数据的方式,直接读取HDFS上的数据。
2.1.3 Spark与Hive就读写HDFS 对比
Hive由于使用MapReduce运行计算,自然继承了MapReduce计算过程中大型的中间磁盘落地过程,会导致大量的IO消耗,此外多个MapReduce任务也会频繁的读取HDFS,且部分MapReduce顺序执行,因此对HDFS的读写要求更大。
而Spark则相反,仅仅在任务开始前执行一次IO操作,将任务需要的数据从HDFS上读出,放在本地内存进行计算(持久化RDD),计算完成后将结果写回HDFS同样如此。因此Spark对HDFS要求就不如Hive大。
2.2性能及调优
2.2.1 Spark
Spark优化策略:
(1)针对RDD调优:①避免创建重复RDD。
②尽可能复用RDD。
③对多次使用RDD进行持久化(保存在内存或磁盘中)。
(2)针对计算过程进行优化
①尽量避免使用Shuffle算子(多节点上同一个key拉到一个节点)
②当不可避免使用Shuffle算子时,用map-side预聚合的shuffle操作。Map-side类似于MapReduce中的本地combine,即在每个节点本地对相同的key进行一次聚合操作,这样在后续拉取所有节点相同key操作时,就会大大减少需要拉取的数据量。如使用reduceKey和aggregateByKey代替groupKey算子。
③使用高性能的算子。
(3)广播大变量,Kryo优化序列化、优化数据结构等等
(4)解决数据倾斜问题。
(5)资源参数调优:
①num-executors:设置Spark作业总共要多少个Executor进程来执行。
②executor-memory:设置每个Executor进程的内存。
③executor-cores:设置每个Executor进程的CPU core数量,决定了每个Executor进程并行执行task进程的能力。
④spark.default.parallelism:设置每个Stage的默认task数量。即多少个task来读取对应的HDFS文件。默认为一个HDFS block对应一个task。佳设置为num-executors *execute-cores的2到3倍。
总的来说,Spark的性能优化主要针对RDD的使用和计算过程以及spark集群本身性能上面,与HDFS IO相关的主要就是资源参数调优和RDD持久化复用,即读的时候提高并行读取能力,而且尽量减少读的次数,提高数据复用等等。
2.2.2 Hive
Hive优化策略:
(1) 使用Fetch抓取:针对Hive中对某些情况的查询可以不必使用MapReduce计算。这样会大大减少工作量。
(2) 利用本地模式:在本地执行MapReduce Job,只在一台机器执行,大大减少了分布式集群之间的协调与传输消耗。
(3) 使用压缩存储:创建表时,尽量使用Orc、parquet等列式存储格式。由于列存储,每一列的数据在物理上是存储在一起的,Hive查询只会遍历需要的列数据,大大减少处理数据量。且Hive终转为MapReduce程序来执行,而MapReduce的性能瓶颈在于网络和磁盘IO,要解决性能瓶颈,主要就是减少数据量,因此对数据进行压缩是一个好方式。
(4) 针对表的优化:比如合并MapReduce,动态分区,大表jion小表等等。
(5) 针对数据倾斜:
① Map数量:
1.MapTask数量由输入分片inputsplits决定,而输入分片由三个参数决定:HDFS默认数据块大小、小分片大小、大分片大小。
2.计算方式:long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
3.影响:map数太少导致一个map处理文件逻辑过于复杂,耗时,map数太多导致一个map任务的启动和初始化时间远大于逻辑处理时间,造成资源浪费。
②小文件合并:map执行前合并小文件,减少Map数,使用combineHiveInputFormat对小文件进行合并。
③复杂文件增加Map数:当Input文件很大,map执行非常慢时,可以增加map数据,使得每个map处理的数据量减少。方法为调整maxsize大值,即让maxsize大值低于blcoksize就可以增加map个数,实现多个map读取一个block。
④调整Reduce数量。
⑤并行执行:Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。如果有更多的阶段可以并行执行,那么job可能就越快完成。
总的来说,针对Hive的优化,除开使用特殊的读取或者存储方式来减少任务量外,大的实际上就是对MapReduce过程的优化,因为Hive程序底部就是由MapReduce程序支持的。所以对MapTask和ReduceTask的调优就成为了重中之重。
2.3 数据处理
2.3.1 Spark数据处理
Spark对数据的处理必然离不开RDD这个Spark基本的数据抽象集合。由于RDD是只读的分区记录的集合,因此RDD中采取Partition对数据进行管理。当spark从HDFS中将文件的信息读取出来并构建为RDD后,那么就等待stage中的active RDD触发读取操作,将个RDD对应的数据读入内存,并进行计算。
因此spark的中间过程都不涉及对HDFS的读取,只会在后如果有需要,那么就将结果存回HDFS。
2.3.2 Hive数据处理
Hive 主要可以分为两部分服务端组件和客户端组件,服务端组件:Driver组件(Complier、Optimizer、Executor),Metastore,Thrift服务(支持可扩展且跨语言的开发)。客户端组件:CLI(命令行接口),Thrift客户端,WEBGUI。
其中Metastore,也就是元数据服务组件,这个组件存储hive元数据,包括各种表和分区的所有结构信息,hive的元数据存储在关系数据库中。而Hive所有的数据都存储在HDFS上,以表、分区、桶的形式存在管理。而表数据的导入分为四种方式:
①从本地文件系统导入数据到Hive 表
②从HDFS 导入数据到Hive表
③从别的表查询出相应的数据并导入到Hive表中
④在创建表的时候通过从别的表中查询出数据导入到新建的Hive表中
当Hive将终的物理计划(MapReduce)提交JobTracker后,任务就会直接读取HDFS中文件进行相应操作,也就是说之后的数据处理流程基本就是MapReduce的流程,当数据处理完后,会将结果输出到HDFS上的临时目录/tmp上,然后在move到warehouse下的对应表的存储目录。
2.3.3 对比
Spark是将数据先转为RDD,然后根据RDD读入本地内存之中,在根据RDD算子,对数据进行计算。而Hive是根据SQL语句,解析出物理执行计划,分解为多个MapReduce任务,由MapReduce任务读取HDFS上的对应数据,进行计算,再将结果存回HDFS。总而言之,spark本身是集群计算平台,只是将HDFS作为数据源,读取数据进内存计算,而Hive本身只是一个数据仓库管理工具,hive服务端只负责解析、优化SQL语句,生成MapReduce任务,所有的数据都存在HDFS,基本计算都由MapReduce承担。
2.4文件切片
2.4.1 spark文件切片
计算公式:Splitsize =Math.max(minSize, Math.min(goalSize, blockSize))
Minsize为设置的小处理块大小,默认为1 B。
Goalsize为平均切分文件大小,goalsize=totalsize/numsplits,其中goalSize为待处理文件大小,numsplits就是指定的numPartitions。
BlockSize就是HDFS上block的大小,也就是128M。
也就是说Splitsize为goalSize和blockSize中的较小值,在和minsize比较,取较大值。其中Minsize决定文件切片数量的下限,而goalsize决定文件切片数量的上限,也就是说减少goalsize可以提高一个block切片的个数,而切片的个数对应spark启动的Task个数。
因此,在某种程度上说,提高切片个数可以提高资源的利用率,但是切片个数太多必然会造成类似小文件问题,造成资源的堵塞,序列化和传输时间大大增加。因此设计Task数目为num-executors * executor-cores
的2~3倍较为合适。
2.4.2 Hive文件切片
计算方式:long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
MapTask数量由输入分片inputsplits决定,而输入分片由三个参数决定:HDFS默认数据块大小、小分片大小、大分片大小。
影响:map数太少导致一个map处理文件逻辑过于复杂,耗时,map数太多导致一个map任务的启动和初始化时间远大于逻辑处理时间,造成资源浪费。
2.4.3 对比
Spark与Hive都是对每一个切片起一个任务,spark启动一个task读取一个split切片,hive对应的MapReduce使用一个map去读取一个split切片,默认都是对一个block起一个任务。但是由于Spark 单次IO的特点,建议将用多个task读取一个blcok,已达到资源的高效利用,也就是将Task数设置为高并行度的2到3倍。而对于Hive来说,由于多MapReduce任务,多IO的特性,单个优远远比不上整体优,因此对于Map个数的设置也没有一个明确的值,多数情况下,还是按照一个block起一个Map。
2.5 支持文件格式
2.5.1 Spark
2.5.2 Hive
格式名
存储方式
说明
Textfile
行存储
存储空间消耗比较大,并且压缩的text 无法分割和合并,查询的效率低,可以直接存储,加载数据的速度高
SequenceFile
行存储
存储空间消耗大,压缩的文件可以分割和合并,查询效率高
RCFile
列存储
存储空间小,查询的效率高 ,需要通过text文件转化来加载,加载的速度低
ORCFile
行存储+列存储
存储空间小,查询的高 ,需要通过text文件转化来加载,加载的速度低
2.6使用场景
2.6.1 Hive场景
l Hive的出现可以让那些精通SQL技能、但是不熟悉MapReduce 、编程能力较弱与不擅长Java语言的用户能够在HDFS大规模数据集上很方便地利用SQL 语言查询、汇总、分析数据,毕竟精通SQL语言的人要比精通Java语言的多得多。
l Hive适合处理离线非实时数据
相关文章