Winse Blog

走走停停, 熙熙攘攘, 忙忙碌碌, 不知何畏.

Hive on Spark

先看官网的资源Hive on Spark: Getting Started 。文档是值得信任和有保证的,但是有前提:Spark版本得是hive/pom.xml中指定的。

重新编译spark(assembly包中去掉hive、hadoop)

这里hive-1.2.1用的是spark-1.3.1 !!!

1
[hadoop@cu2 spark-1.3.1]$ ./make-distribution.sh --name "hadoop2.6.3-without-hive" --tgz --mvn "$(which mvn)" -Pyarn,hadoop-provided,hadoop-2.6,parquet-provided -Dhadoop.version=2.6.3 -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -DskipTests

拷贝打包好的 spark-1.3.1-bin-hadoop2.6.3-without-hive.tgz 到服务器。解压并做一个软链接到spark(或者指定 SPARK_HOME 环境变量 ),Hive不遗余力啊,把所有想的jar通过各种办法拿到 ( sparkHome=$(readlink -f $bin/../../spark) )。

1
2
3
4
5
[hadoop@hadoop-master2 ~]$ ln -s spark-1.3.1-bin-hadoop2.6.3-without-hive spark

把压缩包传到hdfs,这样每次启动任务就少传几百M的数据。后面spark.yarn.jar配置会用到
[hadoop@hadoop-master2 ~]$ cd spark/lib/
[hadoop@hadoop-master2 lib]$ hadoop fs -put spark-assembly-1.3.1-hadoop2.6.3.jar /spark/

做好软链接后效果:

1
2
3
4
5
6
7
[hadoop@hadoop-master2 ~]$ ll | grep -E "hive|spark"
drwxrwxr-x   9 hadoop hadoop 4096 1月  14 08:08 apache-hive-1.2.1-bin
lrwxrwxrwx   1 hadoop hadoop   21 1月  14 08:07 hive -> apache-hive-1.2.1-bin
lrwxrwxrwx   1 hadoop hadoop   40 3月  28 16:38 spark -> spark-1.3.1-bin-hadoop2.6.3-without-hive
drwxrwxr-x  10 hadoop hadoop 4096 3月  28 16:31 spark-1.3.1-bin-hadoop2.6.3-without-hive
drwxrwxr-x  12 hadoop hadoop 4096 3月  25 16:18 spark-1.6.0-bin-2.6.3
drwxrwxr-x  11 hadoop hadoop 4096 3月  28 11:15 spark-1.6.0-bin-hadoop2-without-hive

这里的spark-1.6.0是教训啊!记住最好最好用hive/pom.xml中spark的版本!!!

修改hive配置

由于spark会加载很多的class,需要把permsize调大。

1
2
[hadoop@hadoop-master2 ~]$ less ~/hive/conf/hive-env.sh
export HADOOP_OPTS="$HADOOP_OPTS -XX:MaxPermSize=256m -Dhive.home=${HIVE_HOME} "

在conf目录下增加spark-defaults.conf文件,指定spark的配置。动态资源分配查看:dynamic-resource-allocation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[hadoop@hadoop-master2 conf]$ cat spark-defaults.conf 
spark.yarn.jar    hdfs:///spark/spark-assembly-1.3.1-hadoop2.6.3.jar

spark.dynamicAllocation.enabled    true
spark.shuffle.service.enabled      true
spark.dynamicAllocation.executorIdleTimeout    600
spark.dynamicAllocation.minExecutors    160 
spark.dynamicAllocation.maxExecutors    1800
spark.dynamicAllocation.schedulerBacklogTimeout   5

spark.driver.memory    10g
spark.driver.maxResultSize   0

spark.eventLog.enabled  true
spark.eventLog.compress  true
spark.eventLog.dir    hdfs:///spark-eventlogs
spark.yarn.historyServer.address hadoop-master2:18080


spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max    512m
  • minExecutors 最好应该是和datanode机器数量差不多,每台一个executor才能本地计算嘛!
  • dynamicAllocation需要yarn的配合,具体查看前一篇文章,或者直接看官网的资料。
  • eventlog查看历史记录需要,配置好后每个任务的信息会存储到eventlog.dir的路径。通过18080端口可以看到历史记录。

跑起来

spark.master 默认是 yarn-cluster, 这里先本地(local)跑一下看下效果。然后再改成yarn-cluster/yarn-client就可以了(推荐使用yarn-client,如果yarn-cluster模式AppMaster同时也是Driver,内存比较难控制,日志看起来也麻烦)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[hadoop@hadoop-master2 hive]$ hive --hiveconf hive.execution.engine=spark 

hive> set spark.master=local;
hive> select count(*) from t_house_info ;
Query ID = hadoop_20160328163952_93dafddc-c8b1-4bc9-b851-5e51f6d26fa8
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = 0

Query Hive on Spark job[0] stages:
0
1

Status: Running (Hive on Spark job[0])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-28 16:40:02,077 Stage-0_0: 0(+1)/1      Stage-1_0: 0/1
2016-03-28 16:40:03,078 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished
Status: Finished successfully in 2.01 seconds
OK
1
Time taken: 10.169 seconds, Fetched: 1 row(s)
hive> 

再回过头看其实挺简单,和官方文档中的差不多。

注意:hive的日志级别可以通过 hive-log4j.properties 来配置。

有一个问题,不管yarn-cluser还是yarn-client(hive1.2.1-on-spark1.3.1),application强制kill掉以后,再查询会失败,应该是application杀了但是session还在!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[hadoop@file1 ~]$ yarn application -kill application_1460379750886_0012
16/04/13 08:47:17 INFO client.RMProxy: Connecting to ResourceManager at file1/192.168.102.6:8032
Killing application application_1460379750886_0012
16/04/13 08:47:18 INFO impl.YarnClientImpl: Killed application application_1460379750886_0012

    > select count(*) from t_info where edate=20160413;
Query ID = hadoop_20160413084736_ac8f88bb-5ee1-4941-9745-f4a8a504f2f3
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = eb7e038a-2db0-45d7-9b0d-1e55d354e5e9
Status: Failed
FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask

坑坑坑

刚开始弄的时刻,没管spark的版本的。直接上spark-1.6.0,然后完全跑不通,看hive.log的日志,啥都看不出来。最后查看http://markmail.org/message/reingwn556e7e37yHive on Spark的老大邮件列表的回复,把 spark.master=local 设置成本地跑才看到一点点有用的错误信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
hive> set hive.execution.engine=spark;
hive> select count(*) from t_ods_access_log2 where day=20160327;
Query ID = hadoop_20160328083028_a9fb9860-38dc-4288-8415-b5b2b88f920a
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create spark client.)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask

日志里面’毛’有用信息都没有!

把日志级别调成debug(hive-log4j.properties),并把 set spark.master=local; 设置成本地。再跑日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2016-03-28 15:13:52,549 DEBUG internal.PlatformDependent (Slf4JLogger.java:debug(71)) - Javassist: unavailable
2016-03-28 15:13:52,549 DEBUG internal.PlatformDependent (Slf4JLogger.java:debug(71)) - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes.  Please check the configuration for better performance.

2016-03-28 15:14:56,594 DEBUG storage.BlockManager (Logging.scala:logDebug(62)) - Putting block broadcast_0_piece0 without replication took  8 ms
2016-03-28 15:14:56,597 ERROR util.Utils (Logging.scala:logError(95)) - uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.AbstractMethodError
        at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:62)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
        at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
        at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

调用抽象方法的错误。然后查看了hive-1.2.1中 SparkListener实现类JobMetricsListener 确实没有(spark-1.6.0)62行错误的onBlockUpdated方法实现。然后把spark换成1.3.1一切就好了,其他就是文章前面写的。

心得: 刚刚开始用一个新东西的时刻,还是安装官网指定的版本来用省心。等到自己熟悉后,在玩其他的。

hive on spark VS SparkSQL VS hive on tez

前一篇已经弄好了SparkSQL,SparkSQL也有thriftserver服务,这里说说为啥还选择搞hive-on-spark:

  • SparkSQL-Thriftserver所有结果全部内存,快是快,但是不能满足查询大量数据的需求。如果查询几千万的数据,SparkSQL是搞不定的。而hive-on-spark除了计算用spark其他逻辑都是hive的,返回的结果会先写hdfs,再慢慢返回给客户端。
  • SparkSQL-Thriftserver代码的是全部用scala重写的,和已有hive业务不一定兼容!!
  • SparkSQL-Thriftserver有一个最大的优势就是整个server相当于hive-on-spark的一个session,网页监控漂亮清晰。而hive-on-spark不同的session那就相当于不同的application!!(2016-4-13 20:57:23)用了动态分配,没感觉SparkSQLThriftserver快很多。
  • SparkSQL由于基于内存,再一些调度方面做了优化。如[limit]: hive是死算,sparksql递增数据量的一次次的试。sparksql可以这么做的,毕竟算好的数据在内存里面放着。

hive和sparksql的理念不同,hive的存储是HDFS,而sparksql只是把HDFS作为持久化工具,它的数据基本都放内存。

查看hive的日志,可以看到返回结果后有写HDFS的动作体现,会有类似日志:

1
2
3
2016-03-28 19:39:25,687 INFO  exec.FileSinkOperator (Utilities.java:mvFileToFinalPath(1882))
 - Moving tmp dir: hdfs://zfcluster/hive/scratchdir/hadoop/de2b263e-9601-4df7-bc38-ba932ae83f42/hive_2016-03-28_19-38-08_834_7914607982986605890-1/-mr-10000/.hive-staging_hive_2016-03-28_19-38-08_834_7914607982986605890-1/_tmp.-ext-10001 
 to: hdfs://zfcluster/hive/scratchdir/hadoop/de2b263e-9601-4df7-bc38-ba932ae83f42/hive_2016-03-28_19-38-08_834_7914607982986605890-1/-mr-10000/.hive-staging_hive_2016-03-28_19-38-08_834_7914607982986605890-1/-ext-10001
  • tez的优势spark都有,并且tez其实缓冲优势并不大。而spark的缓冲效果更明显,而且可以快速返回。例如:你查3万条数据,tez是要全部查询然后再返回的,而sparksql取到3万条其他就不算了(效果看起来是这样子,具体没看源码实现;md hive-on-spark还是会全部跑)。
  • tez任务缓冲不能共享,spark更加细化,可以有process级别缓冲(就是用上次计算过的结果,加载过的缓冲)!例如,你查数据记录同时又要返回count,这时有些操作是prcess_local级别的,这个tez是不能比的!
  • spark的日志UI看起来更便捷,呵呵

单就从用的角度,spark全面取胜啊。

参考

–END

Comments