i-Spark-5

一. Spark 的闭包处理

RDD, resilient distributed dataset,弹性(容错)分布式数据集。

分区列表, function,dep Option(分区类, Pair[Key,Value]),首选位置。

运行job时,spark将rdd打碎变换成task,每个task由一个executor执行

执行之前,spark会进行task的闭包(closure)计算。

闭包是指针对executor可见的变量和方法,以备在rdd的foreach中进行计算。

闭包就是串行化后并发送给每个executor.

local模式下,所有spark程序运行在同一JVM中,共享对象,counter是可以累加的

原因是所有executor指向的是同一个引用。

cluster模式下,不可以,counter是闭包处理的。

每个节点对driver上的counter是不可见的。

只能看到自己内部串行化的counter副本。

二. Spark的应用的 部署模式

1. 部署模式概述

spark-submit --class xxx xx.jar --deploy-mode (client | cluster)

--deploy-mode指定部署driver程序在client主机上还是在worker节点上。

[client]

  • driver运行在client主机上。
  • client可以不在cluster(集群)中。

[cluster]

  • driver程序提交给spark cluster的某个worker节点来执行。
  • worker是cluster中的一员。
  • 导出的jar需要放置到所有worker节点都可见的位置(如hdfs)才可以。

不论哪种方式,rdd的运算都在worker执行

2. deploy mode部署模式验证

部署模式划分

1
2
3
4
5
6
7
8
# 客户端 client 模式
$>spark-submit --class com.rox.spark.scala.DeployModeTest --master spark://cs1:7077 --deploy-mode client SparkDemo-1-deploymode.jar
结论: driver就是自己, 如果在cs1上, driver 就是cs1

# 上传jar到hdfs。
# 集群 cluster 模式
$>spark-submit --class com.it18zhang.spark.scala.DeployModeTest --master spark://cs1:7077 --deploy-mode cluster hdfs://cs1:8020/user/centos/SparkDemo-1-deploymode.jar
结论: driver是由 spark 挑选的,如果是 cs1上提交的,因为 cs1也不是 worker, 所以 driver和 worker 都跟cs1无关, 注意, cluster 模式,只要一提交完就结束了, 因为接下来跟他自己没关系了

小技巧: 导出 maven 中所有的依赖库到文件中

1
2
3
4
5
# maven 命令,下载指定job 的所有依赖的 jar 包到 当前的 ./lib 文件夹
mvn -DoutputDirectory=./lib -DgroupId=com.rox -DartifactId=SparkDemo1 -Dversion=1.0-SNAPSHOT dependency:copy-dependencies

# 把当前目录下的所有文件名写入到后面的文件中, 文件之间默认已空格隔开
ls | xargs > a.txt

配置spark on yarn执行模式

  • 使用 spark on yarn 不需要启动 spark!! job直接执行在 yarn 上
  • yarn 模式使用 yarn模式来作为 master, 而 yarn 已经配置了 HA 了

配置 jars 文件的目录

  1. 将spark的jars文件放到hdfs上.
    $>hdfs dfs -mkdir -p /user/ap/spark/jars
    [ap@cs1]~/apps/spark%> hdfs dfs -put jars/ /user/ap/spark
  2. 配置spark属性文件
    [/spark/conf/spark-default.conf]
    spark.yarn.jars hdfs://mycluster/user/ap/spark/jars/*

配置完这些之后: 提交任务的时候就会出现这个, 意思是, 找到了 jar 包, 就不复制了

1
2
> INFO Client: Source and destination file systems are the same. Not copying hdfs://mycluster/user/ap/spark/jars/JavaEWAH-0.3.2.jar
>

遇到问题: 内存不够

解决: 配置 yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
<description>
Whether virtual memory limits will be enforced for containers
是否会对容器执行虚拟内存限制
</description>
</property>

<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>4</value>
<description>
Ratio between virtua4l memory to physical memory when setting memory limits for containers
设置容器的内存限制时虚拟内存与物理内存的比率
</description>
</property>

配置机架感知

核心是自己实现一个类, 然后再 配置文件中声明 机架感知的实现类是哪个

具体使用自行查资料

1
2
3
4
5
# [core-site.xml]
<property>
<name>topology.node.switch.mapping.impl</name>
<value>com.rox.hdfs.rackaware.MyRackAware</value>
</property>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.rox.hdfs.rackaware.MyRackAware

import org.apache.hadoop.net.DNSToSwitchMapping;

import java.io.FileWriter;
import java.util.ArrayList;
import java.util.List;

/**
*机架感知类
*/
public class MyRackAware implements DNSToSwitchMapping {

public List<String> resolve(List<String> names) {
List<String> list = new ArrayList<String>();
try {
FileWriter fw = new FileWriter("/home/centos/rackaware.txt",true);
for(String str : names){
//输出原来的信息,ip地址(主机名)
fw.write(str + "\r\n");
//
if(str.startsWith("192")){
//192.168.231.202
String ip = str.substring(str.lastIndexOf(".") + 1);
if(Integer.parseInt(ip) <= 203) {
list.add("/rack1/" + ip);
}
else{
list.add("/rack2/" + ip);
}
}
else if(str.startsWith("s")){
String ip = str.substring(1);
if (Integer.parseInt(ip) <= 203) {
list.add("/rack1/" + ip);
} else {
list.add("/rack2/" + ip);
}
}
}
fw.close();
} catch (Exception e) {
e.printStackTrace();
}
return list;
}

public void reloadCachedMappings() {

}

public void reloadCachedMappings(List<String> names) {
}
}

代码解释

  • 通过Socket把打印的数据, 发送到指定机器cs5
  • cs5上, 开启 nc端口, 监听消息
  • standaloneyarn模式分别打成 jar包, 在spark 环境中执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.rox.spark.scala

import java.net.{InetAddress, Socket}

import org.apache.spark.{SparkConf, SparkContext}



object DeployModeTest {

/**
* write写出去
* 从运行 此程序的节点, 写到 cs5 上
* @param str
*/
def printInfo(str:String):Unit = {

val ip = InetAddress.getLocalHost.getHostAddress
val sock = new Socket("cs5",8888)
val out = sock.getOutputStream
out.write((ip + "" + str + "\r\n").getBytes())
out.flush()
sock.close()
}


def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("DeployModeTest")
// standalone 模式
//conf.setMaster("spark://cs1:7077")

// Yarn模式
conf.setMaster("yarn")

// local 模式
// conf.setMaster("local[4]")
val sc = new SparkContext(conf)
printInfo("hello guys---我就是Driver")


/**
* Distribute a local Scala collection to form an RDD.
* numSlices: the partition number of the new RDD.
*/
val rdd1 = sc.parallelize(1 to 10, 3)

// 打印第一次map
val rdd2 = rdd1.map(e => {
printInfo("直接定义3个分区, map1: "+e)
e * 2
})

// 重新分区为 2
val rdd3 = rdd2.repartition(2)

//
val rdd4 = rdd3.map(e => {
printInfo("重分区为2后, map2: " + e)
e
})

val res = rdd4.reduce((a,b)=>{
printInfo("求和, reduce: " + a + "," + b)
a + b
})

println(res)
printInfo("最后发给driver: " + res)
}
}

三. Spark 集群的运行方式

主要是cluster manager的区别。

[local]

[standalone]

  • 使用SparkMaster进程作为管理节点, 需要开启Spark 集群

[mesos]

  • 使用mesos的master作为管理节点。

[yarn]

  • 使用hadoop的ResourceManager作为master节点。

  • 不用开启 spark 集群。因为是依托yarn 集群来执行spark application, 高可用也是依托于 yarn 的高可用, 程序在执行时, 会拷贝 一些spark 依赖环境到 hdfs 上, 事后会删除

如果帮到你, 可以给我赞助杯咖啡☕️
0%