单车项目

一. 项目总体架构

image-20180820094956898

二. Nginx 部署 & 负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
使用 root 安装

1.上传nginx安装包
2.解压nginx
tar -zxvf nginx-1.12.2.tar.gz -C /usr/local/src/
3.进入到nginx的源码目录
cd /usr/local/src/nginx-1.12.2/
4.预编译
./configure
5.安静gcc编译器
yum -y install gcc pcre-devel openssl openssl-devel
6.然后再执行
./configure
7.编译安装nginx
make && make install
8.启动nginx
sbin/nginx
9.查看nginx进程
ps -ef | grep nginx
netstat -anpt | grep nginx


--------------------------------------------------------------------------

#将springboot程序部署在多台服务器上,然后启动springboot
$> java -jar /home/ap/jars/sbike-0.0.1-SNAPSHOT.war 1>/home/ap/logs/sbike.log 2>/home/ap/logs/sbike.err &
--------------------------------------------------------------------------

#修改nginx的配置文件,让nginx实现负载均衡功能
vi /usr/local/nginx/conf/nginx.conf

#关于nginx 的详细配置见笔记
https://app.yinxiang.com/shard/s37/nl/7399077/03809ee1-8c47-4383-98b1-a72867450681/

#负载均衡简单配置
$> vi /usr/local/nginx/conf/nginx.conf

## 在 http 中插入如下内容
http {
========================================================================
#响应数据的来源( tomcat 的服务器集群, weigth 是权重)
upstream tomcats {
server cs1:8888 weight=1;
server cs2:8888 weight=1;
server cs3:8888 weight=1;
}

#负载均衡服务器配置
server {
listen 80;
server_name cs7;

location / {
#转发给tomcats组
proxy_pass http://tomcats;
}
}
========================================================================
}

$> nginx/sbin/nginx -h #查看帮助

$> nginx/sbin/nginx -h #测试配置是否语法正确
$> nginx/sbin/nginx -s reload # -s signal : send signal to a master process: stop, quit, reopen, reload

#启动Nginx
$> sbin/nginx

(注意: 此时打好的 war 包需要已经在 cs1-cs3上运行起来了,
否则就无法往这3台 tomcat 上转发)

#访问: http://cs7/host
#会发现, 每一次访问都会分别转发给 cs1-cs3

三. Nginx 安装 kafka 插件

直接把日志给到 kafka
注意: 从 Nginx 直推日志到 kafka 是一个亮点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
安装nginx-kafka插件

1.安装git
yum install -y git

2.切换到/usr/local/src目录,然后将kafka的c客户端源码clone到本地 (The Apache Kafka C/C++ library )
cd /usr/local/src
# 安装 kafka的 c/c++ 的客户端
git clone https://github.com/edenhill/librdkafka

3.进入到librdkafka,然后进行编译
cd librdkafka
yum install -y gcc gcc-c++ pcre-devel zlib-devel
./configure
make && make install

4.安装nginx整合kafka的插件,进入到/usr/local/src,clone nginx整合kafka的源码
cd /usr/local/src
#安装 nginx 整合 kafka 的插件
git clone https://github.com/brg-liuwei/ngx_kafka_module

5.进入到nginx的源码包目录下 (编译nginx,然后将将插件同时编译)
cd /usr/local/src/nginx-1.12.2
./configure --add-module=/usr/local/src/ngx_kafka_module/
make
make install

6.修改nginx的配置文件,详情请查看当前目录的nginx.conf
#同样是在 http{} 中 修改的用 #### 包起来了
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
################################################
kafka;
kafka_broker_list cs1:9092 cs2:9092 cs3:9092;
################################################
server {
listen 80;
################################################
server_name cs8;
#charset koi8-r;
#access_log logs/host.access.log main;
location = /kafka/track {
kafka_topic track;
}

location = /kafka/user {
kafka_topic user;
}
################################################
#error_page 404 /404.html;

# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}


7.启动zk和kafka集群(创建topic)
zkServer.sh start
kafka-server-start.sh -daemon /home/ap/apps/kafka/config/server.properties
> 创建主题 track 和 user
kafka-topics.sh --create --zookeeper cs1:2181 --replication-factor 3 --partitions 3 --topic track
kafka-topics.sh --create --zookeeper cs1:2181 --replication-factor 3 --partitions 3 --topic user
> 查看
kafka-topics.sh --zookeeper cs1:2181 --describe --topic track


8.启动nginx,报错,找不到kafka.so.1的文件
error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

9.加载so库
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig

10.测试,向nginx中写入数据,然后观察kafka的消费者能不能消费到数据
# 因为在 nginx 的配置文件 nginx.conf 中, 已经指明了 kafka_broker_list 的所有主机, 所以才可以写到 kafka 中去
curl localhost/kafka/topic -d "message send to kafka topic"
#例子:
1> 在 cs1~3上随便一台开一个消费者
$> kafka-console-consumer.sh --bootstrap-server cs1:9092,cs2:9092,cs3:9092 --topic track --from-beginning
2> 在 nginx 服务骑上, 使用 # curl localhost/kafka/topic -d "message send to kafka topic" 往配置的主题目录发送消息
/usr/local/nginx $> curl localhost/kafka/track -d "哎哟我去, 你也太厉害辣...."

接下来, 测试微信小程序, 把 生命周期函数onReady后, 采集的用户openid + 经度 + 纬度 , post 请求 nginx的服务器 http://cs8/kafka/user,会直接转发到 kafka 对应的主题中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
0> # 运行 sbike 程序

1> # wechat 小程序中
wx.request({
//用POST方式请求es可以只指定index和type,不用指定id
// url: "http://localhost:8888/log/ready",

// 注意: 这里直接请求 Nginx, 由 nginx 直接把此条user 日志, 推到 kafka 中
url: "http://cs8/kafka/user",
data: {
time: new Date(),
openid: openid,
lat: lat,
log: log
},
method: "POST"
})


2> # 在 cs1-cs3 随便开一个 kafkaconsumer 进行消费
$> kafka-console-consumer.sh --bootstrap-server cs1:9092,cs2:9092,cs3:9092 --topic top --from-beginning

3> # 刷新 wechat 小程序, 会在 onReady后, 发送一条 post 请求到 nginx, 请求中为 user 信息
在 kafka consumer 端, 会搜集到 user 信息

四. Flume 高级

1.自定义 source

方法总结

  • 寻找相对应的源码实现类, 如果是 exec source, 就参考 ExecSource
  • 继承对应类继承的类
  • 至少实现 configure, start, stop 三个方法
  • start 中创建一个 单线程的线程池 Executors.newSingleThreadExecutor()
  • 创建一个实现了 runnable接口的内部类FileRunnable, 实现主要逻辑
    • 构造方法中传入 监控文件路径, 存储 offset 文件路径, 字符集, 查看 interval
    • 检查是否有 offset 的文件, 没有就创建
    • RandomAccessFile 读取 offset, seek到offset 读取
    • run 方法中 用 RandomAccessFile对象 raf readline()读取一行
    • 如果读到的不为空, 读完后, 用channelProcessor推到 channel, 然后更新 offset, 为空, 则 sleep interval

思维导图

image-20180820223854175

代码

a1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#bin/flume-ng agent -n a1 -f /home/hadoop/a1.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#具体定义source
a1.sources.r1.type = com.rox.flume.source.TailFileSource
a1.sources.r1.filePath = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/access.txt
a1.sources.r1.posiFile = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/posi.txt
a1.sources.r1.interval = 2000
a1.sources.r1.charset = UTF-8

#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#具体定义sink (file_roll sink)
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /Users/shixuanji/Documents/Code/Datas/flumeTest/posiFile_1
#也可以直接在控制台输出
#a1.sinks.k1.type = logger

#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

com.rox.flume.source.TailFileSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.rox.flume.source;

import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


/**
* 参考源码类
* @class ExecSource
* flume source 的生命周期:构造器 -> configure -> start -> processor.process
* 1.读取配置文件:(配置文件的内容:读取哪个文件、编码集、偏移量写到哪个文件、多长时间检查一下文件是否有新内容)
*/
public class TailFileSource extends AbstractSource implements Configurable, EventDrivenSource {

private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);

private String filePath; //监控文件路径
private String charset; //编码集
private String posiFile; //存储偏移量文件路径
private long interval; //多长时间检查一下文件是否有新内容, 后面用作: 每拉取一次, 就 sleep 多久
private ExecutorService executor; //线程池对象
private FileRunnable fileRunnable; //不断读取文件的多线程实现类


@Override
public void configure(Context context) {

filePath = context.getString("filePath");
charset = context.getString("charset", "UTF-8");
posiFile = context.getString("posiFile");
interval = context.getLong("interval", 1000L);
}

@Override
public synchronized void start() {
//创建一个单线程的线程池
executor = Executors.newSingleThreadExecutor();
//定义一个实现 Runnable 接口的类对象
fileRunnable = new FileRunnable(filePath,charset,posiFile,interval,getChannelProcessor());
//实现 Runnable 接口的类, 提交到线程池
executor.submit(fileRunnable);
//调用父类的 start 方法
super.start();
}

@Override
public synchronized void stop() {
fileRunnable.setFlag(false);
executor.shutdown();

//如果没有停止成功, 每隔0.5秒询问一次, 直到当次 task 执行完成
while (!executor.isTerminated()) {
logger.debug("Waiting for filer executor service to stop");
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for exec executor service "
+ "to stop. Just exiting.");
Thread.currentThread().interrupt();
}
}
super.stop();
}


/**
* 定义一个内部类 FileRunnable 实现 Runnable 接口
*/
private static class FileRunnable implements Runnable {

private long interval;
private String charset;
private ChannelProcessor channelProcessor;
private long offset = 0L;
private RandomAccessFile raf;
private boolean flag = true;
private File positionFile;

/**
* 构造方法
* @param filePath 监控的文件的路径
* @param charset 字符编码
* @param posiFile 存储偏移量文件的路径
* @param interval 读取一次 source 的间隔时间
* @param channelProcessor 通道处理器, 用于批量把事件写入通道
*/
public FileRunnable(String filePath, String charset, String posiFile, long interval, ChannelProcessor channelProcessor) {

this.interval = interval;
this.charset = charset; // 这里的 charset 是 FileRunnable 在初始化的时候传进来的
this.channelProcessor = channelProcessor;

//读取偏移量,如果有,就接着读,没有就从头读
positionFile = new File(posiFile);
//不存在存储偏移量的文件, 说明是第一次读, 此时创建一个
if (!positionFile.exists()) {
try {
positionFile.createNewFile();
} catch (IOException e) {
logger.error("create position file error", e);
}
}

// 存在偏移量文件, 此时读取偏移量
try {
//使用 apache.common.io 包中的工具类 FileUtils 读取
String offsetString = FileUtils.readFileToString(positionFile);
//如果不为空(记录过偏移量), 就转为 long
if (offsetString != null && !"".equals(offsetString.trim())) {
offset = Long.parseLong(offsetString);
}

//创建一个 RandomAccessFile 对象, 用来读取指定偏移量
raf = new RandomAccessFile(filePath, "r");
raf.seek(offset);

} catch (IOException e) {
logger.error("read position file error", e);
}
}

/**
* run 方法
*/
@Override
public void run() {
// 控制是否持续读取
while (flag) {
try {
String line = raf.readLine();
if (line != null) {
/**
* 读取监控文件中的数据
* @charsetName: 解码的类型
* @charset: 解码后, 转为此种编码
*/
line = new String(line.getBytes("ISO-8859-1"), charset);

// ==> 写入到 channel
channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));

//获取最新的偏移量,然后更新偏移量
offset = raf.getFilePointer();

//将偏移量写入到位置文件中, 覆盖
FileUtils.writeStringToFile(positionFile, offset + "", false);
} else {
Thread.sleep(interval);
}

} catch (IOException e) {
logger.error("read file thread error", e);
} catch (InterruptedException e) {
logger.error("sleep interval Interrupted", e);
}
}
}

public void setFlag(boolean b) {
this.flag = b;
}
}
}

2.自定义 source + kafka channel + kafka cluster

image-20180820225844564

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 自定义的 TailFileSource 为 source
# 使用 KafkaChannel 为 channel, 直接就干到了 kafka 中去了

# 1>在 kafka cluster 中 的一个节点开一个 consumer 消费者
$ kafka-console-consumer.sh --bootstrap-server cs1:9092,cs2:9092,cs3:9092 --topic usertest --from-beginning

# 2> 配置 a0.conf
===============================================================
#bin/flume-ng agent -c conf -f conf/a0.conf -n a0 -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a0.sources = r1
a0.channels = c1

#具体定义source
a0.sources.r1.type = com.rox.flume.source.TailFileSource
a0.sources.r1.filePath = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/access.txt
a0.sources.r1.posiFile = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/posi.txt
a0.sources.r1.interval = 2000
a0.sources.r1.charset = UTF-8

#a0.sources.r1.interceptors = i1
#a0.sources.r1.interceptors.i1.type = cn.rox.flume.interceptor.JsonInterceptor$Builder
#a0.sources.r1.interceptors.i1.fields = id,name,fv,age
#a0.sources.r1.interceptors.i1.separator = ,

a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a0.channels.c1.kafka.bootstrap.servers = cs1:9092,cs2:9092,cs3:9092
a0.channels.c1.kafka.topic = usertest
#Kafka Channel中, 设置写入为 flumeEvent 为 false, 默认是 true, 这里就不会写入事件(事件还包括body,header 等)
a0.channels.c1.parseAsFlumeEvent = false

a0.sources.r1.channels = c1


===============================================================

# 3>开启 flume 任务
➜ apache-flume-1.8.0-bin bin/flume-ng agent -c conf -f conf/a0.conf -n a0 -Dflume.root.logger=INFO,console

# 4>往 文件中, 写数据 & 查看 kafka consumer
echo "xxxx" >> /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/access.txt

3.Kafka cluster 为 source + kafka Channel + HDFS Sink

image-20180820231623971

4.自定义拦截器

作用: 主要是为原本输入的 String 添加了 scheme, 与原本输入字段组成 json 串, 再输入到 kafka 中, 接上面的 4.2

在2 的 基础上, 增加了拦截器

代码如下

a0.conf 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#bin/flume-ng agent -c conf -f conf/a0.conf -n a0 -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a0.sources = r1
a0.channels = c1

#具体定义source
a0.sources.r1.type = com.rox.flume.source.TailFileSource
a0.sources.r1.filePath = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/access.txt
a0.sources.r1.posiFile = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/posi.txt
a0.sources.r1.interval = 2000
a0.sources.r1.charset = UTF-8

#拦截器
a0.sources.r1.interceptors = i1
a0.sources.r1.interceptors.i1.type = com.rox.flume.interceptor.JsonInterceptor$Builder
a0.sources.r1.interceptors.i1.fields = id,name,fv,age
a0.sources.r1.interceptors.i1.separator = ,

a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a0.channels.c1.kafka.bootstrap.servers = cs1:9092,cs2:9092,cs3:9092
a0.channels.c1.kafka.topic = usertest
a0.channels.c1.parseAsFlumeEvent = false

a0.sources.r1.channels = c1

拦截器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.rox.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class JsonInterceptor implements Interceptor {

private String[] schema; //id,name,fv,age

private String separator; // ,


/**
* 构造方法, 传入schema(表头), separator(分隔符)
* @param schema
* @param separator 内容的分隔符
*/
public JsonInterceptor(String schema, String separator) {
this.schema = schema.split("[,]"); // schema 的分隔符也为 ,
this.separator = separator;
}


@Override
public void initialize() {
// no-op
}

@Override
public Event intercept(Event event) {
Map<String, String> tuple = new LinkedHashMap<String, String>();
//将传入的Event中的body内容,加上schema,然后在放入到Event
String line = new String(event.getBody());
String[] fields = line.split(separator);
for(int i = 0; i < schema.length; i++) {
String key = schema[i];
String value = fields[i];
tuple.put(key, value);
}
String json = JSONObject.toJSONString(tuple);
//将转换好的json,再放入到Event中
event.setBody(json.getBytes());
return event;
}

@Override
public List<Event> intercept(List<Event> events) {

for (Event e : events) {
intercept(e);
}
return events;
}

@Override
public void close() {
// no-op
}

/**
* Interceptor.Builder的生命周期方法
* 构造器 -> configure -> build
*/
public static class Builder implements Interceptor.Builder {

private String fields;
private String separator;

@Override
public Interceptor build() {
//在build创建JsonInterceptor的实例
return new JsonInterceptor(fields, separator);
}

/**
* 配置文件中应该有哪些属性?
* 1.数据的分割符
* 2.字段名字(schema)
* 3.schema字段的分隔符
* @param context
*/
@Override
public void configure(Context context) {
fields = context.getString("fields");
separator = context.getString("separator");
}
}
}

5.TairDir

TailDir 可以监控多个文件, 使用正则表达式匹配

这里定义2个 source, channel, sink.

其中一个是tairdir

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# a2.conf

#bin/flume-ng agent -n a2 -f /home/hadoop/a2.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a2.sources = r1 r2
a2.channels = c1 c2
a2.sinks = k1 k2

#具体定义source
a2.sources.r1.type = com.rox.flume.source.TailFileSource
a2.sources.r1.filePath = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/access.txt
a2.sources.r1.posiFile = /Users/shixuanji/Documents/Code/Datas/flumeTest/logs/posi.txt
a2.sources.r1.interval = 1000
a2.sources.r1.charset = UTF-8

a2.sources.r2.type = TAILDIR
a2.sources.r2.positionFile = /Users/shixuanji/Documents/Code/Datas/flumeTest/TailDir/position.json
a2.sources.r2.filegroups = g1
a2.sources.r2.filegroups.g1 = /Users/shixuanji/Documents/Code/Datas/flumeTest/TailDir/2018/.*.txt
a2.sources.r2.fileHeader = false

#具体定义channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#具体定义sink
a2.sinks.k1.type = file_roll
a2.sinks.k1.sink.directory = /Users/shixuanji/Documents/Code/Datas/flumeTest/TailDir/k1

a2.sinks.k2.type = file_roll
a2.sinks.k2.sink.directory = /Users/shixuanji/Documents/Code/Datas/flumeTest/TailDir/k2

#组装source、channel、sink
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

但是 , 此时还是 不支持 递归监控文件, 可以用这个第三方插件

https://github.com/qwurey/flume-source-taildir-recursive

五. 腾讯云短信集成

加入 maven 依赖

1
2
3
4
5
<dependency>
<groupId>com.github.qcloudsms</groupId>
<artifactId>qcloudsms</artifactId>
<version>1.0.5</version>
</dependency>

六. 整体架构

image-20180826133001140

image-20180826133013229

整体架构图

七. 指标计算

优化:
1.将数据存储了类型转换成parquet格式(列式存储)
2.在采集数据时转换(数据已经存储在kafka中了)
2.1将数据同步到HDFS中,然后写sparksql,将json的数据转成parquet
2.2编写一段sparkSteaming程序,实时的进行数据的转换

数据处理的流程?
数据怎么来的? 各种终端设备产生的日志(用户点触发了特定的事件,然后在对应的事件进程埋点,就是讲触发事件是产生的数据发送给日志采集服务器,是一个Nginx,可以将数据写入到kafka中,如果是离线,在将数据同步到HDFS中)
有哪些关键字段? 时间、地点(省、市、区)、金额、app类型(IOS、Android、微信小程序)
怎么处理的?数据的预处理(清洗 -> 转换(json -> parquet)) sparksql(离线) sparkSteaming(实时)
计算完保存到哪里?聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中

今天收集的用户充值的数据可以计算出哪些指标
实时指标:
1.当天截止到当前时间的充值中金额
2.各个省、市的充值金额
3.各个终端类型的充值金额
4.充值的成功率
5.充值的渠道
6.充值笔数
7.省、市的充值笔数
离线指标
1.某一天的充值金额
2.各个省、市的充值金额
3.各个终端类型的充值金额
4.充值的成功率
5.充值的渠道
6.充值笔数
7.省、市的充值笔数


收集用户的报修事件的日志
数据怎么来的?各种终端设备产生的日志,发现单车不能使用,然后点击报修按钮、智能锁上报车辆状态
有哪些关键字段?时间、地点(省、市、区)、用户id、车辆id、原因(部件)、app类型
怎么处理的?数据的预处理(清洗 -> 转换(json -> parquet)) sparksql
计算完保存到哪里?聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中

能计算哪些指标
1.报修次数
2.报修的区域
3.损坏的部件


活动参与
1.活动的点击次数
2.活动的参与次数
3.参会活动积分兑换
4.哪些用户对哪些活动感兴趣

如果帮到你, 可以给我赞助杯咖啡☕️
0%