离线项目 DMP
专有名词:
- DSP(Demand Side Platform)
DSP 原理图
整体流程图
有相同字段值, 出现顺序不一样的用户使用图计算, 再进行合并
DMP => 此部分的用户画像是大数据工程师做的
项目整体业务流程图
项目技术点
- 主要做离线这块, 这个项目实时的也知道
- Flume 自定义 source
字段 :
计算逻辑
问题:
- 有 Spark 的 jvm调优经验吗?
- 有 shuffle 就一定慢吗
Spark 任务涉及到的调优:
序列化— Java 原生的序列化传输比较慢, 使用
KryoSerializer
垃圾回收时间( gc )长, 基本上都是在1s 内, 用了 gsm 的配置方式 : TODO, 效果不是很明显, 因为集群的资源还是比较丰富的
–num-executors
本机总内存32G
默认一个 block 对应一个 executors
可以 repartition, 之后的分区数就是这个 executors 的数量, 一个分区对应一个 executors
一般分区数量为集群总核数的2-3倍
如果总虚拟内存 32G, 总核数 32核的话, 可以分区为 60, 4台节点的话, 每台的
num-executors
为15个但是还要看总的内存数量, 总内存数为 (executor-memory +1) * num-executors
因此 32 / 3 = 10, 所以设置
num-executors
为10个, 32个核减去一个系统的占用, 一个 executor 得到2个核数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33nohup spark-submit --class com.rox.dmp.tools.Bzip2Parquet \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 15 \
dmp-1.0-SNAPSHOT.jar \
/adlogs/biz2/mycus snappy 60 /adlogs/out/test56 &
#############################
nohup spark-submit --class com.rox.dmp.tools.Bzip2Parquet \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 8 \
/home/ap/jars/dmp-1.0-SNAPSHOT.jar \
/adlogs/biz2/mycus snappy 20 /adlogs/out/test57 &
# 目前来看这样调整是比较快的
nohup spark-submit --class com.rox.dmp.tools.Bzip2Parquet \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 8 \
/home/ap/jars/dmp-1.0-SNAPSHOT.jar \
/adlogs/biz2/mycus snappy 16 /adlogs/out/test58 &
开启 spark 的 history 服务器: start-history-server.sh
查看历史数据:
http://cs1:18080/history/application_1536038023803_0008/1/jobs/
32核是虚的, 4台机器总共才8核
增加任务的并行度总结
- executor-memory:
- 大小和 num-executors 有关系, 他们的乘积不能大于集群 总的 内存 大小
- (executor-memory + 1) * num-executors <= 集群总内存容量
- 注意: 做乘积的时候, executor-memory 得多加一个 G
- executor-cores:
- executor-cores * num-executors <= 集群中的 总的核数 容量
- 一个 executor 如果只分配了一个核, 在这个 executor 中的线程数量同一时刻只能有一个 task, 并且是串行
- 如果 executor 分配了 n 核, 在这个 executor 中的 task 是并行的, 并行的最大数量是 n
- num-executors
- 申请的总的 executor数量, executors 数量最好和 分区数量 成倍数关系
- partitionNumber
- Spark 官网建议我们分区的数量最好是机器核数的2-3倍
其它的参考 spark 官网的调优指南
使用 RDD 计算地域分布
Graphx 共同好友示例
使用图计算, 合并相同的用户
实时项目
中国移动充值业务项目
kafka偏移量的计算
redis 的字段
介绍 DMP 项目
- DMP精准广告推送
- 两大块
- 报表
- 维度, 指标, 意义..
- 技术结构
- spark-sql, spark-core
- 技术亮点
- udf
- parquet 文件
- 其它的优化–并行度,,….等
- 存到 mysql集群
- 报表做出来后, 给j2ee组的人, 做查询和可视化
- 用户画像
- 标签体系, 知识库
- 原始日志拿过来之后, 与知识库进行匹配, 如果匹配上了, 给此人打上相应的标签
- 紧接着举个例子: 每个标签有自己唯一的编码, 类似于A00001, 这是一个大类, 大类下面又分为 00001, 这是一个子类, 后面会记录一个1, 这个数字是累加的, 数字越大, 就代表这个标签权重在这个人身上越大, 比如一个人比较爱看武侠, 权重是10, 一个人武侠是0, 言情是10
- 当时做一个商圈标签, 当时日志中只有经纬度坐标嘛, 当时就用了百度的 拟地理位置编码 服务, 结合每一次请求的经纬度, 会生成 sn签名, 然后再携带这个 sn 签名, 还有自己的 ak,sk, 然后通过 HttpClient 发送请求, 百度返回一个 json 字符串, 然后解析其中的 business 节点, 这个节点就是 纬度 和 经度 对应的 商圈信息; 把所有的日志中的经纬度拿出来, 把经纬度转成 geoHash 编码, 作为 key, 然后用经纬度请求百度接口拿到的 json字符串中的business 字段作为 value, 存到 redis, 这样会形成自己的商圈知识库, 然后基于此库, 打上相应的商圈标签
- 如果某个经纬度查询不到商圈信息, 也就是说从百度的 api 中, 这个地点附近没有商圈, 就会用7位的 geoHash 的 key 做模糊查询, 原本8位经度是 19米, 7位是 76米, 7位直接在数据库中使用 like 进行模糊匹配就行了 keys xxxxxxx*
- geoHash 编码: 简单来讲吗, 首先把纬度经度分别编码成2进制, 然后组合成新串, 再使用 0-9, b-z(去掉 a, i, l o), 进行 base32编码, 得到的一组字符串; 这个字符串有什么特征呢, 如果把2个经纬度坐标进行编码后, 得到的字符串前8位都一样的画, 那他们的距离就在19米之内, 如果是9位一样的话, 就是在2米之内, 7位一样的话, 就是在 73米之内
- 说难点的时候, 要说: 我觉得最有意思的是…
- 再比如: 问一个其它的: 比如男女标签, 首先是有这个字段的, 如果字段没有值的时候, 会根据使用的某类 app, 或者某些关键属性来打, 我们公司有这些知识库
- 还有一个标签衰减: 因为人的兴趣是随着时间变化而变化的嘛, 比如一段时间喜欢什么, 另一端时间又不喜欢了, 具体做法是: 把昨天的标签系数 * 一个权重值 + 今天的系数
- 标签数据存到 hbase 中, rowkey 采用用户的 imei 号, 或者 mac 地址 , 只会把用户当天的标签存储到 hbase 中, 列族只有一个, 列是以日期命名的, 列以日期为 key, 一天一个列; 问: rowkey 为什么这么设计, 说便于查询把, 规定是要存到 hbase 中, 并且只存1个月, 我觉得存到 hdfs也可以;
- 把最终的用户标签存到 ES 中, ES和 spark 有个整和的 jar 包
- 报表
geoHash 的精确度表
- 9的话是2米左右
Hive: 问题, 原理, 优化, 窗口函数(udf, udaf…区别)
hbase: 原理 (hmaster, hregionserve…), 优化(至少3点…), 问题…
spark: 源码