一. 项目总体架构
二. Nginx 部署 & 负载均衡
1 | 使用 root 安装 |
三. Nginx 安装 kafka 插件
直接把日志给到 kafka
注意: 从 Nginx 直推日志到 kafka 是一个亮点
1 | 安装nginx-kafka插件 |
接下来, 测试微信小程序, 把 生命周期函数
onReady
后, 采集的用户openid
+经度
+纬度
, post 请求nginx
的服务器 http://cs8/kafka/user,会直接转发到kafka
对应的主题中
1 | 0> # 运行 sbike 程序 |
四. Flume 高级
1.自定义 source
方法总结
- 寻找相对应的源码实现类, 如果是
exec source
, 就参考ExecSource
类 - 继承对应类继承的类
- 至少实现
configure, start, stop 三个方法
- start 中创建一个 单线程的线程池
Executors.newSingleThreadExecutor()
- 创建一个实现了
runnable
接口的内部类FileRunnable
, 实现主要逻辑- 构造方法中传入 监控文件路径, 存储
offset
文件路径, 字符集, 查看interval
- 检查是否有 offset 的文件, 没有就创建
- 用
RandomAccessFile
读取 offset,seek
到offset 读取 - 在 run 方法中 用
RandomAccessFile
对象raf readline()
读取一行 - 如果读到的不为空, 读完后, 用
channelProcessor
推到 channel, 然后更新 offset, 为空, 则 sleepinterval
- 构造方法中传入 监控文件路径, 存储
思维导图
代码
a1.conf
1 | #bin/flume-ng agent -n a1 -f /home/hadoop/a1.conf -c conf -Dflume.root.logger=INFO,console |
com.rox.flume.source.TailFileSource
1 | package com.rox.flume.source; |
2.自定义 source + kafka channel + kafka cluster
1 | # 自定义的 TailFileSource 为 source |
3.Kafka cluster 为 source + kafka Channel + HDFS Sink
4.自定义拦截器
作用: 主要是为原本输入的 String 添加了 scheme, 与原本输入字段组成 json 串, 再输入到 kafka 中, 接上面的 4.2
在2 的 基础上, 增加了拦截器
代码如下
a0.conf
文件
1 | #bin/flume-ng agent -c conf -f conf/a0.conf -n a0 -Dflume.root.logger=INFO,console |
拦截器代码
1 | package com.rox.flume.interceptor; |
5.TairDir
TailDir 可以监控多个文件, 使用正则表达式匹配
这里定义2个 source, channel, sink.
其中一个是tairdir
1 | # a2.conf |
但是 , 此时还是 不支持 递归监控文件, 可以用这个第三方插件
https://github.com/qwurey/flume-source-taildir-recursive
五. 腾讯云短信集成
加入 maven 依赖
1 | <dependency> |
六. 整体架构
整体架构图
七. 指标计算
优化:
1.将数据存储了类型转换成parquet格式(列式存储)
2.在采集数据时转换(数据已经存储在kafka中了)
2.1将数据同步到HDFS中,然后写sparksql,将json的数据转成parquet
2.2编写一段sparkSteaming程序,实时的进行数据的转换
数据处理的流程?
数据怎么来的? 各种终端设备产生的日志(用户点触发了特定的事件,然后在对应的事件进程埋点,就是讲触发事件是产生的数据发送给日志采集服务器,是一个Nginx,可以将数据写入到kafka中,如果是离线,在将数据同步到HDFS中)
有哪些关键字段? 时间、地点(省、市、区)、金额、app类型(IOS、Android、微信小程序)
怎么处理的?数据的预处理(清洗 -> 转换(json -> parquet)) sparksql(离线) sparkSteaming(实时)
计算完保存到哪里?聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中
今天收集的用户充值的数据可以计算出哪些指标
实时指标:
1.当天截止到当前时间的充值中金额
2.各个省、市的充值金额
3.各个终端类型的充值金额
4.充值的成功率
5.充值的渠道
6.充值笔数
7.省、市的充值笔数
离线指标
1.某一天的充值金额
2.各个省、市的充值金额
3.各个终端类型的充值金额
4.充值的成功率
5.充值的渠道
6.充值笔数
7.省、市的充值笔数
收集用户的报修事件的日志
数据怎么来的?各种终端设备产生的日志,发现单车不能使用,然后点击报修按钮、智能锁上报车辆状态
有哪些关键字段?时间、地点(省、市、区)、用户id、车辆id、原因(部件)、app类型
怎么处理的?数据的预处理(清洗 -> 转换(json -> parquet)) sparksql
计算完保存到哪里?聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中
能计算哪些指标
1.报修次数
2.报修的区域
3.损坏的部件
活动参与
1.活动的点击次数
2.活动的参与次数
3.参会活动积分兑换
4.哪些用户对哪些活动感兴趣