Spark常用命令

一. spark-submit/shell

1. standalone模式

注意: 以下都是 standalone 模式下的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# 使用spark-submit提交一个任务: 比如求Pi
~/apps/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cs1:7077 \
--executor-memory 512m \
--total-executor-cores 1 \
~/apps/spark/examples/jars/spark-examples_2.11-2.3.1.jar \
100


# [高可用] 方式下使用 spark-submit 提交一个任务: 比如求Pi
~/apps/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://cs1:7077,cs6:7077 \
--executor-memory 512m \
--total-executor-cores 1 \
~/apps/spark/examples/jars/spark-examples_2.11-2.3.1.jar \
100


# 进入spark-shell连接spark集群:
~/apps/spark/bin/spark-shell \
--master spark://cs1:7077,cs6:7077 \
--executor-memory 2G \
--total-executor-cores 12


# 进入spark-shell连接 高可用spark集群:
~/apps/spark/bin/spark-shell \
--master spark://cs1:7077,cs6:7077 \
--executor-memory 512m \
--total-executor-cores 1


# 求wordcount直接打印输出,输入本地文件,所以应该最好在spark-shell连接local的时候使用
sc.textFile("file:///home/hadoop/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)


# 求wordcount直接打印输出,输入HDFS文件,所以应该最好在spark-shell连接spark集群的时候使用
sc.textFile("hdfs://mycluster/wc/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)


# 求wordcount直接打印输出,输入HDFS文件,输出HDFS文件
sc.textFile("hdfs://mycluster/wc/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2).saveAsTextFile("hdfs://mycluster/wc/output/")


// 打印输出
sc.textFile("hdfs://mycluster/wc/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2).foreach(x => println(x._1 +"-"+ x._2))


// 收集结果
sc.textFile("hdfs://mycluster/wc/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2).collect


// 搭建高可用:修改spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=cs1,hadoop03,cs6 -Dspark.deploy.zookeeper.dir=/spark231"

// 修改spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://mycluster/sparklog231"
// 修改spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://mycluster/spark231log
// 创建日志目录
hadoop fs -mkdir -p hdfs://mycluster/sparklog231


// 启动和关闭spark集群
~/apps/spark/sbin/stop-all.sh
~/apps/spark/sbin/start-all.sh


// 启动历史服务器
$SPARK_HOME/sbin/start-history-server.sh


// 使用spark提交自己编写的wordcount
$SPARK_HOME/bin/spark-submit \
--class com.mazh.WordCountScala \
--master spark://cs1:7077,cs6:7077 \
--executor-memory 512m \
--total-executor-cores 1 \
/home/hadoop/sparkwc-1.0-SNAPSHOT.jar \
hdfs://mycluster/wc/input \
hdfs://mycluster/wc/output_11

2. Yarn 模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
local 本地单线程
local[K] 本地多线程(指定K个内核)
local[*] 本地多线程(指定所有可用内核)
spark://HOST:PORT 连接到指定的 Spark standalone cluster master,需要指定端口。
mesos://HOST:PORT 连接到指定的 Mesos 集群,需要指定端口。
yarn-client客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。
yarn-cluster集群模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。


# 提交任务到本地运行:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[4] \
--driver-memory 512M \
--executor-memory 512M \
--total-executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples_2.11-2.3.1.jar \
10

# 提交任务到Spark集群运行:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop02:7077,hadoop04:7077 \
--driver-memory 512M \
--executor-memory 512M \
--total-executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples_2.11-2.3.1.jar \
100




# 提交到YARN集群,使用yarn-client模式:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 512M \
--executor-memory 512M \
--total-executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples_2.11-2.3.1.jar \
10


# 提交到YARN集群,使用yarn-cluster模式:
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 512M \
--total-executor-cores 1 \
$SPARK_HOME/examples/jars/spark-examples_2.11-2.3.1.jar \
10



# 提交spark任务到YARN集群时,要求配置:
spark-env.sh中:
export HADOOP_CONF_DIR=/home/hadoop/apps/hadoop-2.7.6/etc/hadoop/

spark-defaults.conf中:
spark.yarn.jars /home/hadoop/apps/hadoop-2.7.6/share/hadoop/yarn
> 或者直接指定在 hdfs 上的位置(先上传到 hdfs 上)

如若不生效,则直接拷贝yarn-site.xml文件到$SPARK_HOME



# 上述任务在启动的时候,有可能会出现异常, 修改hadoop集群的yarn-site.xml文件, 增加如下配置:
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
<description>Whether virtual memory limits will be enforced for containers</description>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>4</value>
<description>Ratio between virtual memory to physical memory when setting memory limits for containers</description>
</property>

3.简而言之

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 使用spark-submit提交一个任务到普通的Spark Standalone集群: 比如求Pi
~/apps/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 1 \
~/apps/spark/examples/jars/spark-examples_2.11-2.3.1.jar \
100


// 高可用方式下使用spark-submit提交一个任务到高可用的Spark集群: 比如求Pi
~/apps/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop02:7077,hadoop04:7077 \
--executor-memory 512m \
--total-executor-cores 1 \
~/apps/spark/examples/jars/spark-examples_2.11-2.3.1.jar \
100



// 高可用方式下使用spark-submit提交一个任务到高可用的YARN集群,使用client模式: 比如求Pi
~/apps/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--executor-memory 512m \
--total-executor-cores 1 \
~/apps/spark/examples/jars/spark-examples_2.11-2.3.1.jar \
100


// 高可用方式下使用spark-submit提交一个任务到高可用的YARN集群,使用cluster模式: 比如求Pi
~/apps/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--executor-memory 512m \
--total-executor-cores 1 \
~/apps/spark/examples/jars/spark-examples_2.11-2.3.1.jar \
100

二. 内存资源不足的配置

修改yarn-site.xml, 增加2个配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
<description>
Whether virtual memory limits will be enforced for containers
是否会对容器执行虚拟内存限制
</description>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>4</value>
<description>
Ratio between virtual memory to physical memory when setting memory limits for containers
设置容器的内存限制时虚拟内存与物理内存的比率
</description>
</property>

三. 启动集群相关命令总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# 第一:启动zookeeper
zkServer.sh stop
zkServer.sh start
zkServer.sh status



# 第二:启动HDFS
start-dfs.sh
stop-dfs.sh
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
hadoop-daemon.sh start zkfc
hadoop-daemon.sh start journalnode
hadoop-daemon.sh start secondarynamenode
hdfs haadmin -transitionToActive --forcemanual nn1
hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2
hdfs dfsadmin -safemode get



# 第三:启动YARN
start-yarn.sh
stop-yarn.sh
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
yarn rmadmin -getServiceState rm1
yarn rmadmin -getServiceState rm2
yarn rmadmin -transitionToActive --forcemanual rm1



# 第四: 启动MapReduce历史服务器
mr-jobhistory-daemon.sh start historyserver
mr-jobhistory-daemon.sh stop historyserver



# 第五:启动hbase
stop-hbase.sh
start-hbase.sh
hbase-daemon.sh start master
hbase-daemon.sh start regionserver



# 第六:启动Hive
sh start-hive.sh
nohup hiveserver2 1>/home/hadoop/log/hive_std.log 2>/home/hadoop/log/hive_err.log &



# 第七:启动spark

/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/start-all.sh
/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/stop-all.sh
/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/start-master.sh
/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/stop-master.sh
/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/start-slaves.sh
/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/stop-slaves.sh

在hadoop02机器上启动spark的historyserver
/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/sbin/start-history-server.sh

进入shell:
$SPARK_HOME/bin/spark-shell
$SPARK_HOME/bin/spark-shell \
--master spark://hadoop02:7077,hadoop04:7077 \
--executor-memory 2G \
--total-executor-cores 2

~/apps/spark-2.3.0-bin-hadoop2.7/bin/spark-shell \
--master spark://hadoop02:7077 \
--executor-memory 512m \
--total-executor-cores 1



spark-shell的wordCount程序:
hadoop fs -mkdir -p /wc/input
hadoop fs -put words.txt /wc/input
sc.textFile("hdfs://myha01/wc/input/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2, false).collect




# 第八:启动Storm
nohup $STORM_HOME/bin/storm nimbus 1>~/log/storm-nibus.log 2>&1 &
nohup $STORM_HOME/bin/storm ui 1>~/log/storm-ui.log 2>&1 &
nohup $STORM_HOME/bin/storm supervisor 1>~/log/storm-supervisor.log 2>&1 &

Spark 启动脚本分析图

SparkCore 调优思维导图

四. 其它注意点

1.关于sc.textFile(“.....”)读取到文件的切片

  • sc.textFile("xxx"), 从 hdfs 上读取到的数据, 默认是2个分区
  • sc.textFile(“xxx”, 1) 这样会按照文件个数来切片, 如果后面不加上最小切片数量, 默认就是2个切片, 会把所有的文件的 size 加起来 除以2 得到一个 goalsize 目标切片大小, 来比较, 如果 > goalsize 的1.1 倍的话, 就会被再切片
  • 通常如果几个文件大小区别特别大(比如 3k, 3k, 300k) 的情况下 , 会被多切出一个或多个分区
如果帮到你, 可以给我赞助杯咖啡☕️
0%