i-Spark-1

Tips:

并行

集群计算。

并行计算。

硬件方面的概念

并发

并发执行。

线程方面的概念

一. Spark 简介

1.概念

Lightning-fast cluster computing。

快如闪电的集群计算。

大规模快速通用的计算引擎。

速度: 比hadoop 100x,磁盘计算快10x

使用: java / Scala /R /python

​ 提供80+算子(操作符),容易构建并行应用。

通用: 组合SQL ,流计算 + 复杂分析。

DAG: direct acycle graph,有向无环图。

Spark 与 MR 关系: 基于hadoop的mr,扩展MR模型高效使用MR模型,内存型集群计算,提高app处理速度。

2.Spark 模块

Spark core //核心模块 ,通用执行引擎,提供内存计算和对外部数据集的引用。RDD

Spark SQL //SQL ,构建在core之上,引入新的抽象SchemaRDD,提供了结构化和半结构化支持。

Spark Streaming //流计算 DStream

Spark MLlib //机器学习

Spark graph //图计算

3.RDD

是spark的基本数据结构,是不可变数据集。

RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点进行计算。

可以包含任何java,scala,python和自定义类型。

RDD是只读的记录分区集合。RDD具有容错机制。

创建RDD方式: 1)并行化一个现有集合. 2) 由另一个 RDD 转换

hadoop 花费90%时间 read, write。

内存处理计算。在job间进行数据共享。内存的IO速率高于网络和disk的10 ~ 100之间。

4. Spark RDD 内部包含5个主要属性

  1. 分区列表
  2. 针对每个split的计算函数。
  3. 对其他rdd的依赖列表
  4. 可选,如果是KeyValueRDD的话,可以带分区类。
  5. 可选,首选块位置列表(hdfs block location);

二. Spark 安装

1. local模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1).下载spark-2.1.3-bin-hadoop2.7.tgz

2).解压到/home/ap/apps
创建符号连接 : ln -s spark-2.1.3-bin-hadoop2.7 spark

3).环境变量
[.zshrc]
SPARK_HOME=/home/ap/apps/spark
PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

[source]
$>source ~/.zshrc

4).验证spark
$>cd apps/spark/sbin
$>./spark-shell

5).webui
http://cs1:4040/

# PS : 启动本地模式,登录4040 webUI端口, 只需要启动spark-shell. 要把master 或者 worker 关掉

2. Stand alone 分布式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1). 配置其他主机的所有环境变量, 发送cs1上的.zshrc 到其它机器
--------------------
# 发送
<sub>> xsync.sh </sub>/.zshrc
# source
<sub>> xcall.sh "source </sub>/.zshrc"

2). 配置master节点的slaves
--------------------
[/soft/spark/conf/slaves]
cs2
cs3
cs4
cs5

3). 发送文件到其它节点
--------------------
~> xsync.sh apps/spark

4). 启动spark集群
--------------------
~> apps/spark/sbin/start-all.sh

5). 查看进程
--------------------
$>xcall.jps jps
master //cs1
worker //cs2
worker //cs3
worker //cs4

6). 查看 webUI
--------------------
http://cs1:8080

3. Stand alone HA 模式

规划:

cs1, cs6 为 master

cs2~cs5 为 worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1). 配置 HADOOP_CONF_DIR, 目的是找到 hdfs-site.xml 和 core-site.xml
--------------# conf/spark-env.sh 中----------------
# 配 HADOOP_CONF_DIR, 这样就不用拷贝hdfs-site.xml 和 core-site.xml 到 conf 目录了
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

ps: 也可以直接复制 hdfs-site.xml 和 core-site.xml 到 spark/conf 下, 可能还得在 conf/defaults.conf中指定
spark.files file:///home/ap/apps/spark/conf/hdfs-site.xml,file:///home/ap/apps/spark/conf/core-site.xml ,但是不推荐这样

2). 配置 JAVA_HOME & zookeeper
--------------# conf/spark-env.sh 中----------------
export JAVA_HOME=/usr/local/jdk1.8.0_73
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=cs1,cs2,cs3 -Dspark.deploy.zookeeper.dir=/spark"

3) 分发整个 conf 目录文件到集群
------------------------------------------------
$cs1 <sub>> xsync.sh </sub>/apps/spark/conf

4) 启动集群
------------------------------------------------
cs1> ~/apps/spark/sbin/start-all.sh
cs6> ~/apps/spark/sbin/start-master.sh
查看>
http://cs1:8080
http://cs6:8080

5) 测试: 启动spark-shell,连接spark集群上
------------------------------------------------
$> spark-shell --master spark://cs1:7077
$> sc.textFile("hdfs://mycluster/user/ap/a.txt").collect();

4. Spark 历史服务器(可选)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
第一步:
------------------------------------------------
cd /home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/conf
cp spark-defaults.conf.template spark-defaults.conf

在文件里面添加如下内容:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://mycluster/sparklog

第二步:
------------------------------------------------
在 spark-env.sh 的文件里面添加如下内容:
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://mycluster/sparklog"

第三步:
------------------------------------------------
在启动 HistorServer 服务之前 hdfs://mycluster/sparklog 目录要提前创建
hadoop fs -mkdir -p hdfs://mycluster/sparklog

第四步:启动 Spark HistoryServer
------------------------------------------------
$> SPARK_HOME/sbin/start-history-server.sh

第五步:访问 Spark History WebUI
------------------------------------------------
http://cs1:18080/

5. yarn 模式


三. Spark 简单体验

1. API

[SparkContext]

  • Spark功能的主要入口点。代表到Spark集群的连接,可以创建RDD、累加器和广播变量.
  • 每个JVM只能激活一个SparkContext对象,在创建sc之前需要stop掉active的sc。

[RDD]

  • resilient distributed dataset,弹性分布式数据集。等价于集合。

[SparkConf]

  • spark配置对象,设置Spark应用各种参数,kv形式。

2. Spark 实现 WordCount

2.1 spark-shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 登录 shell 
spark-shell --master spark://cs1:7077

//加载文本文件,以换行符方式切割文本.Array(hello world2,hello world2 ,...)
val rdd1 = sc.textFile("/home/ap/a.txt");

//单词统计1 => 分步骤
$scala>val rdd1 = sc.textFile("/home/centos/test.txt")
$scala>val rdd2 = rdd1.flatMap(line=>line.split(" "))
$scala>val rdd3 = rdd2.map(word = > (word,1))
$scala>val rdd4 = rdd3.reduceByKey(_ + _)
$scala>rdd4.collect

// 一句话
scala> sc.textFile("/home/ap/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res6: Array[(String, Int)] = Array((world2,1), (world1,1), (world4,1), ("",1), (hello,4), (world3,1))

2.2 idea 编程

[pom依赖文件]

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>

Scala 版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.{SparkConf, SparkContext}
object WordCountDemo {
def main(args: Array[String]): Unit = {
//创建Spark配置对象
val conf = new SparkConf();
conf.setAppName("WordCountSpark")
//设置master属性
conf.setMaster("local") ;

//通过conf创建sc
val sc = new SparkContext(conf);

//加载文本文件
val rdd1 = sc.textFile("d:/scala/test.txt");
//压扁
val rdd2 = rdd1.flatMap(line => line.split(" ")) ;
//映射w => (w,1)
val rdd3 = rdd2.map((_,1))
val rdd4 = rdd3.reduceByKey(_ + _)
val r = rdd4.collect()
r.foreach(println)
}
}

Java 版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* java版
*/
public class WordCountJava2 {
public static void main(String[] args) {
//创建SparkConf对象
SparkConf conf = new SparkConf();
conf.setAppName("WordCountJava2");
conf.setMaster("local");

//创建java sc
JavaSparkContext sc = new JavaSparkContext(conf);
//加载文本文件
JavaRDD<String> rdd1 = sc.textFile("d:/scala//test.txt");

//压扁
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String[] arr = s.split(" ");
for(String ss :arr){
list.add(ss);
}
return list.iterator();
}
});

//映射,word -> (word,1)
JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});

//reduce化简
JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

//
List<Tuple2<String,Integer>> list = rdd4.collect();
for(Tuple2<String, Integer> t : list){
System.out.println(t._1() + " : " + t._2());
}
}
}

提交作业到 Spark 集群运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
======================================提交到本机
1) 导出jar包
-------------------------------
2) spark-submit提交命令运行job
-------------------------------
# scala 版 wordcount
spark-submit --master local --name MyWordCount --class MyWordCount_Scala SparkDemo-1-1.0-SNAPSHOT.jar /home/ap/a.txt

# java 版 wordcount
spark-submit --master local --name MyWordCountJava --class com.rox.spark.WordCountJava SparkDemo-1-1.0-SNAPSHOT.jar /home/ap/a.txt

======================================提交到Spark集群
1) 代码中把textFile改为 args[0]

2) 将源文件传到 hdfs 上

3) 运行spark-submit
$> spark-submit --master spark://cs1:7077 --name MyWordCount2 --class MyWordCount_Scala SparkDemo-1-1.0-SNAPSHOT.jar hdfs://cs1:8020/user/ap/a.txt

3. spark/sbin 下脚本分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[start-all.sh]
-------------------------------
sbin/spark-config.sh
sbin/spark-master.sh //启动master进程
sbin/spark-slaves.sh //启动worker进程

[start-master.sh]
-------------------------------
sbin/spark-config.sh
org.apache.spark.deploy.master.Master
spark-daemon.sh start org.apache.spark.deploy.master.Master --host --port --webui-port ...

[spark-slaves.sh]
-------------------------------
sbin/spark-config.sh
slaves.sh //conf/slaves

[slaves.sh]
-------------------------------
for conf/slaves {
ssh host start-slave.sh ...
}

[start-slave.sh]
-------------------------------
CLASS="org.apache.spark.deploy.worker.Worker"
sbin/spark-config.sh
for (( .. )) ; do
start_instance $(( 1 + $i )) "$@"
done

$>cd /soft/spark/sbin
$>./stop-all.sh //停掉整个spark集群.
$>./start-master.sh //停掉整个spark集群.
$>./start-master.sh //启动master节点
$>./start-slaves.sh //启动所有worker节点

$>./stop-slave.sh --help // 查看帮助, 其它的查看帮助也是这样, 命令后面加上 --help

4. 解决数据倾斜问题 (多次 map-reduce)

解决数据倾斜:

1> 先对每个 word 添加 个数 (word,1)

2> 取出 word, 在 word后随机拼接 _num, 然后再与 v 组成 (newK, v)元祖

3> 对新的 (k,v)进行 reduceByKey, 统计出每个 k 的个数 (newK, count)

4> 对结果继续进行 map, 去掉 _num, 把原本的 k 取出来, 再跟 count 组成新的元祖 (k,count)

5> 继续进行 reduceByKey, 此时并发量最多也只是随机数的个数了, 不会产生严重的数据倾斜, 计算出最终的结果

6> 存到文件中 saveAsTextFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 执行防止数据倾斜的 MR, 最后存到 hdfs 上
sc.textFile("hdfs://mycluster/user/ap/a.txt",4).flatMap(_.split(" ")).map((_,1)).map(t => {
val w1 = t._1;
import scala.util.Random;
val n = Random.nextInt(100);
(w1 + "_" + n, t._2);
}).reduceByKey(_ + _,4).map(t => {
val w2 = t._1;
val count = t._2;
val w3 = w2.split("_")(0);
(w3, count);
}).reduceByKey(_ + _,4).saveAsTextFile("/user/ap/scala/DataSkew")

// 查看 hdfs 上的数据
[ap@cs2]~/apps/spark/conf% hdfs dfs -text /user/ap/scala/DataSkew/part-00000
(world2,1)
(,1)

执行的 DAG(有向无环图)


四. RDD 的 transform & action

详细的见官方文档

我的github代码

1. 变换 (transform )

注意: 直到遇到一个不是返回 RDD 对象的方法/函数, 才会开始计算

Spark is lazy, so nothing will be executed unless you call some transformation or action that will trigger job creation and execution. Look at the following snippet of the word-count example.

变换: 返回指向新rdd的指针,在rdd之间创建依赖关系。每个rdd都有计算函数和指向父RDD的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
map()                                    //对每个元素进行变换,应用变换函数
//(T)=>V
--------------------------------------------------------------------------------------------------------------
filter() //过滤器,(T)=>Boolean
--------------------------------------------------------------------------------------------------------------
flatMap() //压扁,T => TraversableOnce[U]
--------------------------------------------------------------------------------------------------------------

mapPartitions() //对每个分区进行应用变换,输入的Iterator,返回新的迭代器,可以对分区进行函数处理。
//Iterator<T> => Iterator<U>
--------------------------------------------------------------------------------------------------------------

mapPartitionsWithIndex(func) //同上,(Int, Iterator<T>) => Iterator<U>
--------------------------------------------------------------------------------------------------------------

sample(withReplacement, fraction, seed) //采样返回采样的RDD子集。
//withReplacement 元素是否可以多次采样.
//fraction : 期望采样数量.[0,1]
--------------------------------------------------------------------------------------------------------------

union() //类似于mysql union操作。
//select * from persons where id < 10
//union select * from id persons where id > 29 ;
--------------------------------------------------------------------------------------------------------------

intersection //交集,提取两个rdd中都含有的元素。
--------------------------------------------------------------------------------------------------------------

distinct([numTasks])) //去重,去除重复的元素。
--------------------------------------------------------------------------------------------------------------

groupByKey() //(K,V) => (K,Iterable<V>)
--------------------------------------------------------------------------------------------------------------

reduceByKey(*) //按key聚合。
--------------------------------------------------------------------------------------------------------------

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) //按照key进行聚合 :TODO 没完全清楚
key:String U:Int = 0

其实reduceBykey就是aggregateByKey的简化版。
就是aggregateByKey多提供了一个函数 seqOp
类似于Mapreduce的combine操作(就在map端执行reduce的操作)

--------------------------------------------------------------------------------------------------------------

sortByKey //排序
--------------------------------------------------------------------------------------------------------------

join(otherDataset, [numTasks]) //连接,(K,V).join(K,W) =>(K,(V,W))
--------------------------------------------------------------------------------------------------------------

cogroup //协分组
//(K,V).cogroup(K,W) =>(K,(Iterable<V>,Iterable<!-- <W> -->))
--------------------------------------------------------------------------------------------------------------

cartesian(otherDataset) //笛卡尔积,RR[T] RDD[U] => RDD[(T,U)]
--------------------------------------------------------------------------------------------------------------
pipe //将rdd的元素传递给脚本或者命令,执行结果返回形成新的RDD
//例子: 在 spark-shell 下
scala> sc.parallelize(Array("/home/ap")).pipe("ls").collect
res0: Array[String] = Array(apps, a.txt, calllog, dump.rdb, flumedata, hadoopdata, ihivedata, jars, kafka, logs, python, softs, SparkDemo-1-1.0-SNAPSHOT.jar, spool, zookeeper, zookeeper.out)
--------------------------------------------------------------------------------------------------------------

coalesce(numPartitions) //减少分区
--------------------------------------------------------------------------------------------------------------

repartition //可增可减
--------------------------------------------------------------------------------------------------------------
repartitionAndSortWithinPartitions(partitioner) //再分区并在分区内进行排序

2. 产生作业 Actions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
collect()                                //收集rdd元素形成数组.

count() //统计rdd元素的个数

reduce() //聚合,返回一个值。

first //取出第一个元素take(1)

take // take(n) , 是一个集合, 需要迭代打印

takeSample (withReplacement,num, [seed])

takeOrdered(n, [ordering]) // 返回前 n 个排序的元素(可默认, 可自定义排序)

saveAsTextFile(path) //保存到文件 生产中使用

saveAsSequenceFile(path) //保存成序列文件

saveAsObjectFile(path) (Java and Scala)

countByKey() //按照key,统计每个key下value的个数

reduceByKey() //按照 key, 计算 key 后的 value

注意 : rdd2.map((_, 1)) 和 rdd2.map((_, 2)) 针对于 前两者的计算结果是不一样的
如果帮到你, 可以给我赞助杯咖啡☕️
0%