-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathhive on spark.txt
1296 lines (878 loc) · 72.7 KB
/
hive on spark.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
微信交流群里有人问浪尖hive on spark如何调优,当时浪尖时间忙没时间回答,这里就给出一篇文章详细聊聊。强调一下资源设置调优,这个强经验性质的,这里给出的数值比例仅供参考。
hive on spark 性能远比hive on mr 要好,而且提供了一样的功能。用户的sql无需修改就可以直接运行于hive on spark。 udf函数也是全部支持。
本文主要是想讲hive on spark 在运行于yarn模式的情况下如何调优。
下文举例讲解的yarn节点机器配置,假设有32核,120GB内存。
yarn配置
yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb,这两个参数决定这集群资源管理器能够有多少资源用于运行yarn上的任务。 这两个参数的值是由机器的配置及同时在机器上运行的其它进程共同决定。本文假设仅有hdfs的datanode和yarn的nodemanager运行于该节点。
1. 配置cores
基本配置是datanode和nodemanager各一个核,操作系统两个核,然后剩下28核配置作为yarn资源。也即是yarn.nodemanager.resource.cpu-vcores=28
2. 配置内存
对于内存,预留20GB给操作系统,datanode,nodemanager,剩余100GB作为yarn资源。也即是 yarn.nodemanager.resource.memory-mb=100*1024
spark配置
给yarn分配资源以后,那就要想着spark如何使用这些资源了,主要配置对象:
execurtor 和driver内存,executro配额,并行度。
1. executor内存
设置executor内存需要考虑如下因素:
executor内存越多,越能为更多的查询提供map join的优化。由于垃圾回收的压力会导致开销增加。
某些情况下hdfs的 客户端不能很好的处理并发写入,所以过多的核心可能会导致竞争。
为了最大化使用core,建议将core设置为4,5,6(多核心会导致并发问题,所以写代码的时候尤其是静态的链接等要考虑并发问题)具体分配核心数要结合yarn所提供的核心数。 由于本文中涉及到的node节点是28核,那么很明显分配为4的化可以被整除,spark.executor.cores设置为4 不会有多余的核剩下,设置为5,6都会有core剩余。 spark.executor.cores=4,由于总共有28个核,那么最大可以申请的executor数是7。总内存处以7,也即是 100/7,可以得到每个executor约14GB内存。
要知道 spark.executor.memory 和spark.executor.memoryOverhead 共同决定着 executor内存。建议 spark.executor.memoryOverhead站总内存的 15%-20%。 那么最终 spark.executor.memoryOverhead=2 G 和spark.executor.memory=12 G
根据上面的配置的化,每个主机就可以申请7个executor,每个executor可以运行4个任务,每个core一个task。那么每个task的平均内存是 14/4 = 3.5GB。在executor运行的task共享内存。 其实,executor内部是用newCachedThreadPool运行task的。
确保 spark.executor.memoryOverhead和 spark.executor.memory的和不超过yarn.scheduler.maximum-allocation-mb
2. driver内存
对于drvier的内存配置,当然也包括两个参数。
spark.driver.memoryOverhead 每个driver能从yarn申请的堆外内存的大小。
spark.driver.memory 当运行hive on spark的时候,每个spark driver能申请的最大jvm 堆内存。该参数结合 spark.driver.memoryOverhead共同决定着driver的内存大小。
driver的内存大小并不直接影响性能,但是也不要job的运行受限于driver的内存. 这里给出spark driver内存申请的方案,假设yarn.nodemanager.resource.memory-mb是 X。
driver内存申请12GB,假设 X > 50GB
driver内存申请 4GB,假设 12GB < X <50GB
driver内存申请1GB,假设 1GB < X < 12 GB
driver内存申请256MB,假设 X < 1GB
这些数值是 spark.driver.memory和 spark.driver.memoryOverhead内存的总和。对外内存站总内存的10%-15%。 假设 yarn.nodemanager.resource.memory-mb=100*1024MB,那么driver内存设置为12GB,此时 spark.driver.memory=10.5gb和spark.driver.memoryOverhead=1.5gb
注意,资源多少直接对应的是数据量的大小。所以要结合资源和数据量进行适当缩减和增加。
3. executor数
executor的数目是由每个节点运行的executor数目和集群的节点数共同决定。如果你有四十个节点,那么hive可以使用的最大executor数就是 280(40*7). 最大数目可能比这个小点,因为driver也会消耗1core和12GB。
当前假设是没有yarn应用在跑。
Hive性能与用于运行查询的executor数量直接相关。 但是,不通查询还是不通。 通常,性能与executor的数量成比例。 例如,查询使用四个executor大约需要使用两个executor的一半时间。 但是,性能在一定数量的executor中达到峰值,高于此值时,增加数量不会改善性能并且可能产生不利影响。
在大多数情况下,使用一半的集群容量(executor数量的一半)可以提供良好的性能。 为了获得最佳性能,最好使用所有可用的executor。 例如,设置spark.executor.instances = 280。 对于基准测试和性能测量,强烈建议这样做。
4. 动态executor申请
虽然将spark.executor.instances设置为最大值通常可以最大限度地提高性能,但不建议在多个用户运行Hive查询的生产环境中这样做。 避免为用户会话分配固定数量的executor,因为如果executor空闲,executor不能被其他用户查询使用。 在生产环境中,应该好好计划executor分配,以允许更多的资源共享。
Spark允许您根据工作负载动态扩展分配给Spark应用程序的集群资源集。 要启用动态分配,请按照动态分配中的步骤进行操作。 除了在某些情况下,强烈建议启用动态分配。
5. 并行度
要使可用的executor得到充分利用,必须同时运行足够的任务(并行)。在大多数情况下,Hive会自动确定并行度,但也可以在调优并发度方面有一些控制权。 在输入端,map任务的数量等于输入格式生成的split数。对于Hive on Spark,输入格式为CombineHiveInputFormat,它可以根据需要对基础输入格式生成的split进行分组。 可以更好地控制stage边界的并行度。调整hive.exec.reducers.bytes.per.reducer以控制每个reducer处理的数据量,Hive根据可用的executor,执行程序内存,以及其他因素来确定最佳分区数。 实验表明,只要生成足够的任务来保持所有可用的executor繁忙,Spark就比MapReduce对hive.exec.reducers.bytes.per.reducer指定的值敏感度低。 为获得最佳性能,请为该属性选择一个值,以便Hive生成足够的任务以完全使用所有可用的executor。
hive配置
Hive on spark 共享了很多hive性能相关的配置。可以像调优hive on mapreduce一样调优hive on spark。 然而,hive.auto.convert.join.noconditionaltask.size是基于统计信息将基础join转化为map join的阈值,可能会对性能产生重大影响。 尽管该配置可以用hive on mr和hive on spark,但是两者的解释不同。
数据的大小有两个统计指标:
totalSize- 数据在磁盘上的近似大小。
rawDataSize- 数据在内存中的近似大小。
hive on mr用的是totalSize。hive on spark使用的是rawDataSize。由于可能存在压缩和序列化,这两个值会有较大的差别。 对于hive on spark 需要将 hive.auto.convert.join.noconditionaltask.size指定为更大的值,才能将与hive on mr相同的join转化为map join。
可以增加此参数的值,以使地图连接转换更具凶猛。 将common join 转换为 map join 可以提高性能。 如果此值设置得太大,则来自小表的数据将使用过多内存,任务可能会因内存不足而失败。 根据群集环境调整此值。
通过参数 hive.stats.collect.rawdatasize 可以控制是否收集 rawDataSize 统计信息。
对于hiveserver2,建议再配置两个额外的参数: hive.stats.fetch.column.stats=true 和 hive.optimize.index.filter=true.
Hive性能调优通常建议使用以下属性:
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.merge.mapfiles=true
hive.merge.mapredfiles=false
hive.merge.smallfiles.avgsize=16000000
hive.merge.size.per.task=256000000
hive.merge.sparkfiles=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M)
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4 (MR and Spark)
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.optimize.ppd=true
预启动YARN容器
在开始新会话后提交第一个查询时,在查看查询开始之前可能会遇到稍长的延迟。还会注意到,如果再次运行相同的查询,它的完成速度比第一个快得多。
Spark执行程序需要额外的时间来启动和初始化yarn上的Spark,这会导致较长的延迟。此外,Spark不会等待所有executor在启动作业之前全部启动完成,因此在将作业提交到群集后,某些executor可能仍在启动。 但是,对于在Spark上运行的作业,作业提交时可用executor的数量部分决定了reducer的数量。当就绪executor的数量未达到最大值时,作业可能没有最大并行度。这可能会进一步影响第一个查询的性能。
在用户较长期会话中,这个额外时间不会导致任何问题,因为它只在第一次查询执行时发生。然而,诸如Oozie发起的Hive工作之类的短期绘画可能无法实现最佳性能。
为减少启动时间,可以在作业开始前启用容器预热。只有在请求的executor准备就绪时,作业才会开始运行。这样,在reduce那一侧不会减少短会话的并行性。
要启用预热功能,请在发出查询之前将hive.prewarm.enabled设置为true。还可以通过设置hive.prewarm.numcontainers来设置容器数量。默认值为10。
预热的executor的实际数量受spark.executor.instances(静态分配)或spark.dynamicAllocation.maxExecutors(动态分配)的值限制。 hive.prewarm.numcontainers的值不应超过分配给用户会话的值。
注意:预热需要几秒钟,对于短会话来说是一个很好的做法,特别是如果查询涉及reduce阶段。 但是,如果hive.prewarm.numcontainers的值高于群集中可用的值,则该过程最多可能需要30秒。 请谨慎使用预热。
1.hive执行引擎
Hive默认使用MapReduce作为执行引擎,即Hive on mr。实际上,Hive还可以使用Tez和Spark作为其执行引擎,分别为Hive on Tez和Hive on Spark。由于MapReduce中间计算均需要写入磁盘,而Spark是放在内存中,所以总体来讲Spark比MapReduce快很多。
默认情况下,Hive on Spark 在YARN模式下支持Spark。
2.前提条件:安装JDK-1.8/hadoop-2.7.2等,参考之前的博文
3.下载hive-2.1.1.src.tar.gz源码解压后,打开pom.xml发现spark版本为1.6.0---官网介绍版本必须对应才能兼容如hive2.1.1-spark1.6.0
4.下载spark-1.6.0.tgz源码(网上都是带有集成hive的,需要重新编译)
5.上传到Linux服务器,解压
6.源码编译
#cd spark-1.6.0
#修改make-distribution.sh的MVN路径为/usr/app/maven/bin/mvn ###查看并安装pom.xml的mvn版本
#./make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.4,parquet-provided"
#等待一个多小时左右吧,保证联网环境,有可能外网访问不到下载不了依赖项,配置访问外网或配置阿里云仓库,重新编译
7.配置
#vim /etc/hosts 192.168.66.66 xinfang
#解压spark-1.6.0-bin-hadoop2-without-hive.tgz,并命名为spark
#官网下载hive-2.1.1解压 并命令为hive(关于hive详细配置,参考http://blog.csdn.net/xinfang520/article/details/77774522)
#官网下载scala2.10.5解压,并命令为scala
#chmod -R 755 /usr/app/spark /usr/app/hive /usr/app/scala
#配置环境变量-vim /etc/profile
复制代码
#set hive
export HIVE_HOME=/usr/app/hive
export PATH=$PATH:$HIVE_HOME/bin
#set spark
export SPARK_HOME=/usr/app/spark
export PATH=$SPARK_HOME/bin:$PATH
#set scala
export SCALA_HOME=/usr/app/scala
export PATH=$SCALA_HOME/bin:$PATH
复制代码
#配置/spark/conf/spark-env.sh
export JAVA_HOME=/usr/app/jdk1.8.0
export SCALA_HOME=/usr/app/scala
export HADOOP_HOME=/usr/app/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
export SPARK_LAUNCH_WITH_SCALA=0
export SPARK_WORKER_MEMORY=512m
export SPARK_DRIVER_MEMORY=512m
export SPARK_MASTER_IP=192.168.66.66
#export SPARK_EXECUTOR_MEMORY=512M
export SPARK_HOME=/usr/app/spark
export SPARK_LIBRARY_PATH=/usr/app/spark/lib
export SPARK_MASTER_WEBUI_PORT=18080
export SPARK_WORKER_DIR=/usr/app/spark/work
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_PORT=7078
export SPARK_LOG_DIR=/usr/app/spark/logs
export SPARK_PID_DIR='/usr/app/spark/run'
#配置/spark/conf/spark-default.conf
1
2
3
4
5
6
7
spark.master spark://xinfang:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://xinfang:9000/spark-log
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.executor.memory 512m
spark.driver.memory 512m
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#修改hive-site.xml(hive详细部署参考http://blog.csdn.net/xinfang520/article/details/77774522)
<configuration>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.66.66:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>1</value>
</property>
<!--<property>
<name>hive.hwi.listen.host</name>
<value>192.168.66.66</value>
</property>
<property>
<name>hive.hwi.listen.port</name>
<value>9999</value>
</property>
<property>
<name>hive.hwi.war.file</name>
<value>lib/hive-hwi-2.1.1.war</value>
</property>-->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/user/hive/tmp</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/user/hive/log</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>192.168.66.66</value>
</property>
<property>
<name>hive.server2.webui.host</name>
<value>192.168.66.66</value>
</property>
<property>
<name>hive.server2.webui.port</name>
<value>10002</value>
</property>
<property>
<name>hive.server2.long.polling.timeout</name>
<value>5000</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>true</value>
</property>
<property>
<name>datanucleus.autoCreateSchema </name>
<value>false</value>
</property>
<property>
<name>datanucleus.fixedDatastore </name>
<value>true</value>
</property>
<!-- hive on mr-->
<!--
<property>
<name>mapred.job.tracker</name>
<value>http://192.168.66.66:9001</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
-->
<!--hive on spark or spark on yarn -->
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<property>
<name>spark.home</name>
<value>/usr/app/spark</value>
</property>
<property>
<name>spark.master</name>
<value>spark://xinfang:7077</value> 或者yarn-cluster/yarn-client
</property>
<property>
<name>spark.submit.deployMode</name>
<value>client</value>
</property>
<property>
<name>spark.eventLog.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.eventLog.dir</name>
<value>hdfs://xinfang:9000/spark-log</value>
</property>
<property>
<name>spark.serializer</name>
<value>org.apache.spark.serializer.KryoSerializer</value>
</property>
<property>
<name>spark.executor.memeory</name>
<value>512m</value>
</property>
<property>
<name>spark.driver.memeory</name>
<value>512m</value>
</property>
<property>
<name>spark.executor.extraJavaOptions</name>
<value>-XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"</value>
</property>
</configuration>
#新建目录
1
2
3
4
hadoop fs -mkdir -p /spark-log
hadoop fs -chmod 777 /spark-log
mkdir -p /usr/app/spark/work /usr/app/spark/logs /usr/app/spark/run
mkdir -p /usr/app/hive/logs
#拷贝hive-site.xml到spark/conf下(这点非常关键)
#hive进入客户端
hive>set hive.execution.engine=spark; (将执行引擎设为Spark,默认是mr,退出hive CLI后,回到默认设置。若想让引擎默认为Spark,需要在hive-site.xml里设置)
hive>create table test(ts BIGINT,line STRING); (创建表)
hive>select count(*) from test;
若整个过程没有报错,并出现正确结果,则Hive on Spark配置成功。
http://192.168.66.66:18080
8.网上转载部分解决方案
第一个坑:要想在Hive中使用Spark执行引擎,最简单的方法是把spark-assembly-1.5.0-hadoop2.4.0.jar包直接拷贝 到$HIVE_HOME/lib目录下。
第二个坑:版本不对,刚开始以为hive 能使用 spark的任何版本,结果发现错了,hive对spark版本有着严格要求,具体对应版本你可以下载hive源码里面,搜索他pom.xml文件里面的spark版本,如果版本不对,启动hive后会报错。具体错误如下:
Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create spark client.)' FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask
第三个坑:./make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.4" ,开启spark报错找不到类
解决办法是在spark-env.sh里面添加 :export SPARK_DIST_CLASSPATH=$(hadoop classpath)
#如果启动包日志包重复需要删除
#根据实际修改hive/bin/hive:(根据spark2后的包分散了)
sparkAssemblyPath='ls ${SPARK_HOME}/lib/spark-assembly-*.jar'
将其修改为:sparkAssemblyPath='ls ${SPARK_HOME}/jars/*.jar'
#spark1 拷贝spark/lib/spark-* 到/usr/app/hive/lib
===================================
hive 是目前大数据领域,事实上的sql 标准。其底层默认是基于MapReduce实现的,但是由于MapReduce速度不够快。因此近几年,陆续出来了新的Sql 查询引擎。包括
Spark Sql ,hive on tez ,hive on spark.
Spark Sql 和hive on spark 是不一样的。spark sql 是Spark 自己开发出来针对各种数据源,包括hive ,json,Parquet,jdbc,rdd等都可以执行查询,一套基于spark计算的引擎的查询引擎。因此他是spark 的一个项目,只不过是提供了针对hive 执行查询的功能而已。适合在一些使用spark 技术栈的大数据应用类中使用。
而Hive on spark 是hive 的一个子项目,它是指不通过mapReduce 作为唯一的查询引擎,而是将spark 作为底层的查询引擎。hive on spark 只适用于hive 在可预见的未来,很有可能Hive默认的底层引擎就从MapReduce 切换到Spark 了 。使用于将原来有的Hive 数据仓库以及数据统计分析替换为spark 引擎,作为全公司通用的大数据统计分析引擎。
知识背景(2)
hive 基本工作原理:
hive ql 语句=>
语法分析=>AST=>
生成逻辑执行计划=>Operator Tree=>
优化逻辑生成计划=>Optimized Operator Tree=>
生成物理执行计划=>Task Tree =>
优化物理生成计划=>Optimized Task Tree=>
执行优化后的Optimized Task Tree。
知识背景(3)
Hive on spark 计算原理
将Hive 表作为SparkRDD 来进行操作
使用hive 原语
对于一些针对于RDD的操作,比如groupByKey,softByKey等不使用Spark的transformation操作和原语。如果那样的话,那么就需要重新实现一套Hive 的原语,而且如果Hive 增加了新功能,那么又要实现新的spark 原语。因此选择将hive 的原语包装为针对于RDD的操作即可。
3.新的执行计划生成机制
使用SparkCompiler 将逻辑执行计划,即可Operator Tree ,转换为Task Tree ,提交Spark Task 给 Spark 进行执行。sparkTask 包装了DAG ,DAG 包装为SparkWork .SparkTask 根据SparkWork 表示的DAG 计算。
SparkContext生命周期
hive on Spark 会为每个用户的会话比如说执行一次Sql 创建一个SparkContext但是Spark 不允许在一个JVM 内穿概念多个SparkContext。因此需要在单独的JVM中启动每个会话的Sparkcontext 然后通过RPC 与远程JVM中的Spark Context 进行通信。
5本地和远程运行模式
Hive on spark 提供两种运行模式,本地和远程。如果将SparkMaster 这是为local ,比如set.spark.master=local 那么就是本地模式,sparkContext 与客户端运行在一个JVM 中。否则如果将sparkMaster 设置为master 的地址,那么就是远程模式,sparkcontext 会在远程jvm 中启动,远程模式下 每个用户session 都会创建一个sparkClient sparkClient 启动RemoveDriver RemoveDriver负责创建SparkContext
知识背景(4)
hive on spark提供了一些优化
1 Map join Spark Sql 默认对join 是支持使用BroatCast 机制 将小表广播到各个节点上,以进行join 但是问题是这会driver 和worker 带来很大的内存开销。因为广播的数据要一直报讯在Driver 中所以目前采取的措施是类似于MapReduce 的Distribuesd cache 机制 ,即提高Hdfs replica factor 的赋值因子,让数据在每一个计算节点上都有一个备份,从而可以在本地进行读取数据。
2.cache table
对于某些需要对一张表执行多次操作的场景,hive on spark 内部做了优化,即将要多次操作的表cache 到内存中以便于提升性能。但是这里要注意并不是所有的情况都会自动进行cache 所以说hive on spark 很有很多需要完善的地方
环境搭建
首先需要搭建一个hive
可以参考http://www.haha174.top/article/details/253250
只需要设置 set hive.execution.engine=spark 命令设置hive 的执行引擎为Saprk 即可
set spark.master=local 或者 set spark.master=127.0.0.1:7077
欢迎关注,更多福利
作者:意浅离殇
链接:https://www.jianshu.com/p/cf7f2503d469
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
背景
Hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。
简介
Hive on Spark是从Hive on MapReduce演进而来,Hive的整体解决方案很不错,但是从查询提交到结果返回需要相当长的时间,查询耗时太长,这个主要原因就是由于Hive原生是基于MapReduce的,那么如果我们不生成MapReduce Job,而是生成Spark Job,就可以充分利用Spark的快速执行能力来缩短HiveQL的响应时间。
Hive on Spark现在是Hive组件(从Hive1.1 release之后)的一部分。
与SparkSQL的区别
SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。Hive原本是没有很好支持MapReduce之外的引擎的,而Hive On Tez项目让Hive得以支持和Spark近似的Planning结构(非MapReduce的DAG)。所以在此基础上,Cloudera主导启动了Hive On Spark。这个项目得到了IBM,Intel和MapR的支持(但是没有Databricks)。
使用示例
大体与SparkSQL结构类似,只是SQL引擎不同。部分核心代码如下:
val hiveContext = new HiveContext(sc)
import hiveContext._
hql("CREATE TABLE IF NOT EXIST src(key INT, value STRING)")
hql("LOAD DATA LOCAL PATH '/Users/urey/data/input2.txt' INTO TABLE src")
hql("FROM src SELECT key, value").collect().foreach(println)
小结
结构上Hive On Spark和SparkSQL都是一个翻译层,把一个SQL翻译成分布式可执行的Spark程序。比如一个SQL:
SELECT item_type, sum(price)
FROM item
GROUP item_type;
上面这个SQL脚本交给Hive或者类似的SQL引擎,它会“告诉”计算引擎做如下两个步骤:读取item表,抽出item_type,price这两个字段;对price计算初始的SUM(其实就是每个单独的price作为自己的SUM)因为GROUP BY说需要根据item_type分组,所以设定shuffle的key为item_type从第一组节点分组后分发给聚合节点,让相同的item_type汇总到同一个聚合节点,然后这些节点把每个组的Partial Sum再加在一起,就得到了最后结果。不管是Hive还是SparkSQL大致上都是做了上面这样的工作。
需要理解的是,Hive和SparkSQL都不负责计算,它们只是告诉Spark,你需要这样算那样算,但是本身并不直接参与计算。
1 HiveOnSpark简介
Hive On Spark (跟hive没太大的关系,就是使用了hive的标准(HQL, 元数据库、UDF、序列化、反序列化机制))
Hive原来的计算模型是MR,有点慢(将中间结果写入到HDFS中)
Hive On Spark 使用RDD(DataFrame),然后运行在spark 集群上
真正要计算的数据是保存在HDFS中,mysql这个元数据库,保存的是hive表的描述信息,描述了有哪些database、table、以及表有多少列,每一列是什么类型,还要描述表的数据保存在hdfs的什么位置?
hive跟mysql的区别?
hive是一个数据仓库(存储数据并分析数据,分析数据仓库中的数据量很大,一般要分析很长的时间)
mysql是一个关系型数据库(关系型数据的增删改查(低延迟))
hive的元数据库中保存要计算的数据吗?
不保存,保存hive仓库的表、字段、等描述信息
真正要计算的数据保存在哪里了?
保存在HDFS中了
hive的元数据库的功能
建立了一种映射关系,执行HQL时,先到MySQL元数据库中查找描述信息,然后根据描述信息生成任务,然后将任务下发到spark集群中执行
hive on spark 使用的仅仅是hive的标准,规范,不需要有hive数据库一样可行。
hive : 元数据,是存放在mysql中,然后真正的数据是存放在hdfs中。
2 安装mysql
mysql数据库作为hive使用的元数据
3 配置HiveOnSpark
生成hive的元数据库表,根据hive的配置文件,生成对应的元数据库表。
spark-sql 是spark专门用于编写sql的交互式命令行。
当直接启动spark-sql以local模式运行时,如果报错:
是因为配置了Hadoop的配置参数导致的:
执行测试命令:
create table test (name string);
insert into test values(“xxtest”);
local模式下,默认使用derby数据库,数据存储于本地位置。
要想使用hive的标准,需要把hive的配置文件放到spark的conf目录下
cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/
vi hive-site.xml
hive-site.xml文件:
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hdp-01:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
</configuration>
把该配置文件,发送给集群中的其他节点:
cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/
for i in 2 3 ;do scp hive-site.xml hdp-0$i:`pwd` ;done
重新停止并重启spark: start-all.sh
启动spark-sql时,
出现如下错误是因为操作mysql时缺少mysql的驱动jar包,
解决方案1:--jars 或者 --driver-class-path 引入msyql的jar包
解决方案2: 把mysql的jar包添加到$spark_home/jars目录下
启动时指定集群:(如果不指定master,默认就是local模式)
spark-sql --master spark://hdp-01:7077 --jars /root/mysql-connector-java-5.1.38.jar
sparkSQL会在mysql上创建一个database,需要手动改一下DBS表中的DB_LOCATION_UIR改成hdfs的地址
hdfs://hdp-01:9000/user/hive/spark-warehouse
也需要查看一下,自己创建的数据库表的存储路径是否是hdfs的目录。
执行spark-sql任务之后:可以在集群的监控界面查看
同样 ,会有SparkSubmit进程存在。
4 IDEA编程
要先开启spark对hive的支持
//如果想让hive运行在spark上,一定要开启spark对hive的支持
val session = SparkSession.builder()
.master("local")
.appName("xx")
.enableHiveSupport() // 启动对hive的支持, 还需添加支持jar包
.getOrCreate()
要添加spark对hive的兼容jar包
<!--sparksql对hive的支持-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
在本地运行,还需把hive-site.xml文件拷贝到resource目录下。
resources目录,存放着当前项目的配置文件
编写代码,local模式下测试:
// 执行查询
val query = session.sql("select * from t_access_times")
query.show()
// 释放资源
session.close()
创建表的时候,需要伪装客户端身份
System.setProperty("HADOOP_USER_NAME", "root") // 伪装客户端的用户身份为root
// 或者添加运行参数 –DHADOOP_USER_NAME=root
基本操作
// 求每个用户的每月总金额
// session.sql("select username,month,sum(salary) as salary from t_access_times group by username,month")
// 创建表
// session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")
// 删除表
// session.sql("drop table t_access1")
// 插入数据
// session.sql("insert into t_access1 select * from t_access_times")
// .show()
// 覆盖写数据
// session.sql("insert overwrite table t_access1 select * from t_access_times where username='A'")
// 覆盖load新数据
// C,2015-01,10
// C,2015-01,20
// session.sql("load data local inpath 't_access_time_log' overwrite into table t_access1")
// 清空数据
// session.sql("truncate table t_access1")
// .show()
// 写入自定义数据
val access: Dataset[String] = session.createDataset(List("b,2015-01,10", "c,2015-02,20"))
val accessdf = access.map({
t =>
val lines = t.split(",")
(lines(0), lines(1), lines(2).toInt)
}).toDF("username", "month", "salary")
// .show()
accessdf.createTempView("t_ac")
// session.sql("insert into t_access1 select * from t_ac")
// overwrite模式会重新创建新的表 根据指定schema信息 SaveMode.Overwrite
// 本地模式只支持 overwrite,必须在sparksession上添加配置参数:
// .config("spark.sql.warehouse.dir", "hdfs://hdp-01:9000/user/hive/warehouse")
accessdf
.write.mode("overwrite").saveAsTable("t_access1")
集群运行:
需要把hive-site.xml配置文件,添加到$SPARK_HOME/conf目录中去,重启spark
上传一个mysql连接驱动(sparkSubmit也要连接MySQL,获取元数据信息)
spark-sql --master spark://hdp-01:7077 --driver-class-path /root/mysql-connector-java-5.1.38.jar
--class xx.jar
然后执行代码的编写:
// 执行查询 hive的数据表
// session.sql("select * from t_access_times")
// .show()
// 创建表
// session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")
// session.sql("insert into t_access1 select * from t_access_times")
// .show()
// 写数据
val access: Dataset[String] = session.createDataset(List("b,2015-01,10", "c,2015-02,20"))
val accessdf = access.map({
t =>
val lines = t.split(",")
(lines(0), lines(1), lines(2).toInt)
}).toDF("username", "month", "salary")
accessdf.createTempView("v_tmp")
// 插入数据
// session.sql("insert overwrite table t_access1 select * from v_tmp")
session.sql("insert into t_access1 select * from v_tmp")
// .show()
// insertInto的api 入库
accessdf.write.insertInto("databaseName.tableName")
session.close()
Hive的由来
以下部分摘自Hadoop definite guide中的Hive一章
“Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询。
Hive大大简化了对大规模数据集的分析门槛(不再要求分析人员具有很强的编程能力),迅速流行起来,成为Hadoop生成圈上的Killer Application. 目前已经有很多组织把Hive作为一个通用的,可伸缩数据处理平台。”
数据模型(Data Model)
Hive所有的数据都存在HDFS中,在Hive中有以下几种数据模型
Tables(表) table和关系型数据库中的表是相对应的,每个表都有一个对应的hdfs目录,表中的数据经序列化后存储在该目录,Hive同时支持表中的数据存储在其它类型的文件系统中,如NFS或本地文件系统
分区(Partitions) Hive中的分区起到的作用有点类似于RDBMS中的索引功能,每个Partition都有一个对应的目录,这样在查询的时候,可以减少数据规模
桶(buckets) 即使将数据按分区之后,每个分区的规模有可能还是很大,这个时候,按照关键字的hash结果将数据分成多个buckets,每个bucket对应于一个文件
Query Language
HiveQL是Hive支持的类似于SQL的查询语言。HiveQL大体可以分成下面两种类型
DDL(data definition language) 比如创建数据库(create database),创建表(create table),数据库和表的删除
DML(data manipulation language) 数据的添加,查询
UDF(user defined function) Hive还支持用户自定义查询函数
Hive architecture
hive的整体框架图如下图所示
由上图可以看出,Hive的整体架构可以分成以下几大部分
用户接口 支持CLI, JDBC和Web UI
Driver Driver负责将用户指令翻译转换成为相应的MapReduce Job
MetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎
HiveQL执行过程
HiveQL的执行过程如下所述
parser 将HiveQL解析为相应的语法树
Semantic Analyser 语义分析
Logical Plan Generating 生成相应的LogicalPlan
Query Plan Generating
Optimizer
最终生成MapReduce的Job,交付给Hadoop的MapReduce计算框架具体运行。
Hive实例
最好的学习就是实战,Hive这一小节还是以一个具体的例子来结束吧。
前提条件是已经安装好hadoop,具体安装可以参考源码走读11或走读9
step 1: 创建warehouse
warehouse用来存储raw data
$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse
step 2: 启动hive cli
$ export HIVE_HOME=<hive-install-dir>
$ $HIVE_HOME/bin/hive
step 3: 创建表
创建表,首先将schema数据写入到metastore,另一件事情就是在warehouse目录下创建相应的子目录,该子目录以表的名称命名
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
step 4: 导入数据
导入的数据会存储在step 3中创建的表目录下
LOAD DATA LOCAL INPATH '/u.data'
OVERWRITE INTO TABLE u_data;
step 5: 查询
SELECT COUNT(*) FROM u_data;
hiveql on Spark
Q: 上一章节花了大量的篇幅介绍了hive由来,框架及hiveql执行过程。那这些东西跟我们标题中所称的hive on spark有什么关系呢?
Ans: Hive的整体解决方案很不错,但有一些地方还值得改进,其中之一就是“从查询提交到结果返回需要相当长的时间,查询耗时太长”。之所以查询时间很长,一个主要的原因就是因为Hive原生是基于MapReduce的,哪有没有办法提高呢。您一定想到了,“不是生成MapReduce
Job,而是生成Spark Job”, 充分利用Spark的快速执行能力来缩短HiveQl的响应时间。
下图是Spark 1.0中所支持的lib库,SQL是其唯一新添加的lib库,可见SQL在Spark 1.0中的地位之重要。
HiveContext
HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。
让我们回顾一下,SqlContext中牵涉到的类及其间的关系如下图所示,具体分析过程参见本系列中的源码走读之11。
既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。
有了上述的比较,就能抓住源码分析时需要把握的几个关键点
Entrypoint HiveContext.scala
QueryExecution HiveContext.scala
parser HiveQl.scala
optimizer
数据
使用到的数据有两种
Schema Data 像数据库的定义和表的结构,这些都存储在MetaStore中
Raw data 即要分析的文件本身
Entrypoint
hiveql是整个的入口点,而hql是hiveql的缩写形式。
def hiveql(hqlQuery: String): SchemaRDD = {
val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but does not perform any execution.
result.queryExecution.toRdd
result
}
上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参
HiveQL, parser
parseSql的函数定义如代码所示,解析过程中将指令分成两大类
nativecommand 非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成
非nativecommand 主要是select语句
def parseSql(sql: String): LogicalPlan = {
try {
if (sql.toLowerCase.startsWith("set")) {
NativeCommand(sql)
} else if (sql.toLowerCase.startsWith("add jar")) {
AddJar(sql.drop(8))
} else if (sql.toLowerCase.startsWith("add file")) {
AddFile(sql.drop(9))
} else if (sql.startsWith("dfs")) {
DfsCommand(sql)
} else if (sql.startsWith("source")) {
SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
} else if (sql.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
}
} catch {
case e: Exception => throw new ParseException(sql, e)
case e: NotImplementedError => sys.error(
s"""
|Unsupported language features in query: $sql
|${dumpTree(getAst(sql))}
""".stripMargin)
}
}
哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量,列表很长,代码就不一一列出。
对于非nativeCommand,最重要的解析函数就是nodeToPlan
toRdd
Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。
在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素
override lazy val toRdd: RDD[Row] =
analyzed match {
case NativeCommand(cmd) =>
val output = runSqlHive(cmd)
if (output.size == 0) {
emptyResult
} else {
val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
sparkContext.parallelize(asRows, 1)
}
case _ =>
executedPlan.execute().map(_.copy())
}
native command的执行流程
由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下
analyzer
HiveTypeCoercion
val typeCoercionRules =
List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
StringToIntegralCasts, FunctionArgumentConversion)
optimizer
PreInsertionCasts存在的目的就是确保在数据插入执行之前,相应的表已经存在。
override lazy val optimizedPlan =
optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
此处要注意的是catalog的用途,catalog是HiveMetastoreCatalog的实例。
HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。
HiveMetastoreCatalog
HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api。其中包括了广为人知的deSer library。
以CreateTable函数为例说明对Hive Library的依赖。
def createTable(
databaseName: String,
tableName: String,
schema: Seq[Attribute],