1、实时订单统计

订单数据处理流程:

1.shell播放脚本读取订单数据到指定订单文件中.

2.使用flume监听订单文件,实时将订单数据发送到Kafka.

3.使用Spark streaming处理统计订单数据和乘车人数保存到redis中.

4.页面请求Java中台相应restful接口,restful接口查询redis中的数据返回页面,然后页面渲染显示.

1571121383981

1.1 订单数据回放

1.安装配置 Flume

flume agent配置:

代理名称:配置如下:a1(按照业务功能自定义一个名称即可)

1571053796134

配置文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=exec
#先使用tail -F的方式,随后做优化
a1.sources.r1.command=tail -F /root/order/order
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic=hai_kou_order_topic
a1.sinks.k1.brokerList=cdh-node01:9092,cdh-node02:9092,cdh-node03:9092
a1.sinks.k1.batchSize=20
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.producer.linger.ms=1
a1.sinks.k1.producer.compression.type=snappy
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2.kafka manager工具安装

此工具主要用作kafka主题消息的监控,主题增加,删除等操作.

3.消费kafka中的订单数据数据代码实现.

1.2 数据回放的断点续传解决方案

问题背景:

通常我们使用flume和kafka集成,都是使用flume监控文件,会在配置source时的命令,例如:tail -F 文件名,这种方式依然会存在一个问题,但flume的agent进程由于各种原因挂掉一段时间之后,

解决方案:

1.第一种方案,是在使用tail -F命令的地方修改

1
2
a1.sources.r2.command=
tail -n +$(tail -n1 /root/log) -F /root/data/nginx.log | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/root/log";fflush("")}' /root/log

2.第二种方案,高版本的flume可以使用tailDir Souce

1
2
3
4
5
6
a1.sources.s1.type = TAILDIR
a1.sources.s1.positionFile = /home/dev/flume/flume-1.8.0/log/taildir_position.json
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
a1.sources.s1.headers.f1.headerKey1 = aaa
a1.sources.s1.fileHeader = true

1.3 实时订单数据统计(订单情况、乘车人数情况)

OrderStreamingProcessor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package com.cartravel.spark

import com.cartravel.common.{Constants, TopicName}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.log4j.Level
import org.apache.log4j.Logger

import scala.collection.JavaConversions._

/**
* 订单数据流处理程
*/
class OrderStreamingProcessor {
}

case class Order(oderId: String)

object OrderStreamingProcessor {
def main(args: Array[String]): Unit = {
import org.apache.spark._
import org.apache.spark.streaming._
//设置Spark程序在控制台中的日志打印级别
Logger.getLogger("org").setLevel(Level.WARN)
//local[*]使用本地模式运行,*表示内部会自动计算CPU核数,也可以直接指定运行线程数比如2,就是local[2]
//表示使用两个线程来模拟spark集群
val conf = new SparkConf().setAppName("OrderMonitor").setMaster("local[1]")

//初始化Spark Streaming环境
val streamingContext = new StreamingContext(conf, Seconds(1))

//设置检查点
streamingContext.checkpoint("/sparkapp/tmp")

//"auto.offset.reset" -> "earliest"
//
val kafkaParams = Map[String, Object](
// "bootstrap.servers" -> "192.168.21.173:6667,192.168.21.174:6667,192.168.21.175:6667",
"bootstrap.servers" -> Constants.KAFKA_BOOTSTRAP_SERVERS,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test0001",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array(
TopicName.HAI_KOU_ORDER_TOPIC.getTopicName,
TopicName.CHENG_DU_ORDER_TOPIC.getTopicName,
TopicName.XI_AN_ORDER_TOPIC.getTopicName
)

topics.foreach(println(_))
println("topics:" + topics)

val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

stream.count().print();

//实时统计订单总数
val ordersDs = stream.map(record => {
//主题名称
val topicName = record.topic()
val orderInfo = record.value()

//订单信息解析器
var orderParser: OrderParser = null;

//不同主题的订单进行不同的处理
topicName match {
case "hai_kou_order_topic" => {
orderParser = new HaiKouOrderParser();
}
case "cheng_du_order_topic" => {
orderParser = new ChengDuOrderParser()
}
case "xi_an_order_topic" => {
orderParser = new XiAnOrderParser()
}
case _ => {
orderParser = null;
}
}

println("orderParser:" + orderParser)
if (null != orderParser) {
val order = orderParser.parser(orderInfo)
println("parser order:" + order)
order
} else {
null
}
})


//订单计数,对于每个订单出现一次计数1
val orderCountRest = ordersDs.map(order => {
if (null == order) {
("", 0)
} else if (order.getClass == classOf[ChengDuTravelOrder]) {
(Constants.CITY_CODE_CHENG_DU + "_" + order.createDay, 1)
} else if (order.getClass == classOf[XiAnTravelOrder]) {
(Constants.CITY_CODE_XI_AN + "_" + order.createDay, 1)
} else if (order.getClass == classOf[HaiKouTravelOrder]) {
(Constants.CITY_CODE_HAI_KOU + "_" + order.createDay, 1)
} else {
("", 0)
}
}).updateStateByKey((currValues: Seq[Int], state: Option[Int]) => {
var count = currValues.sum + state.getOrElse(0);
Some(count)
})

/**
* 乘车人数统计
* 如果是成都或者西安的订单,数据中没有乘车人数字段,所有按照默认一单一人的方式进行统计
* 海口的订单数据中有乘车人数字段,就按照具体数进行统计
*/
val passengerCountRest = ordersDs.map(order => {
if (null == order) {
("", 0)
} else if (order.getClass == classOf[ChengDuTravelOrder]) {
(Constants.CITY_CODE_CHENG_DU + "_" + order.createDay, 1)
} else if (order.getClass == classOf[XiAnTravelOrder]) {
(Constants.CITY_CODE_XI_AN + "_" + order.createDay, 1)
} else if (order.getClass == classOf[HaiKouTravelOrder]) {
var passengerCount = order.asInstanceOf[HaiKouTravelOrder].passengerCount.toInt
//scala不支持类似java中的三目运算符,可以使用下面的操作方式
passengerCount = if(passengerCount>0) passengerCount else 1
(Constants.CITY_CODE_HAI_KOU + "_" + order.createDay,passengerCount)
} else {
("", 0)
}
}).updateStateByKey((currValues: Seq[Int], state: Option[Int]) => {
var count = currValues.sum + state.getOrElse(0);
Some(count)
})

orderCountRest.foreachRDD(orderCountRDD=>{
import com.cartravel.util.JedisUtil
val jedisUtil = JedisUtil.getInstance()
val jedis = jedisUtil.getJedis
val orderCountRest = orderCountRDD.collect()
println("orderCountRest:"+orderCountRest)
orderCountRest.foreach(countrest=>{
println("countrest:"+countrest._1+","+countrest._2)
if(null!=countrest){
jedis.hset(Constants.ORDER_COUNT, countrest._1, countrest._2 + "")
}
})
jedisUtil.returnJedis(jedis)
})

passengerCountRest.foreachRDD(passengerCountRdd=>{
import com.cartravel.util.JedisUtil
val jedisUtil = JedisUtil.getInstance()
val jedis = jedisUtil.getJedis

val passengerCountRest = passengerCountRdd.collect()
passengerCountRest.foreach(countrest=>{
jedis.hset(Constants.PASSENGER_COUNT, countrest._1, countrest._2 + "")
})
jedisUtil.returnJedis(jedis)
})

//启动sparkstreaming程序
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.stop()
}
}

2、全域订单轨迹监控

2.1 实时订单轨迹监控

​ 盖亚数据计划开放的开源数据集中是已经生成的订单轨迹数据所以是不知道订单什么时候结束,真实的业务场景中是有开始和技术的标志位,但是我们可以在数据中认为的设置开始和技术标记,可以这么做在数据的开始开可以设置start字符串在数据的技术可以设置end技术的字符串,使用start和end字符串作为订单轨迹数据的开始和结束.

实现流程:

1571215041738

消费轨迹数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
com.cartravel.kafka.package com.cartravel.kafka;

import com.cartravel.common.Constants;
import com.cartravel.common.Order;
import com.cartravel.common.TopicName;
import com.cartravel.util.HBaseUtil;
import com.cartravel.util.JedisUtil;
import com.cartravel.util.ObjUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.*;

public class GpsConsumer implements Runnable {
private static Logger log = Logger.getLogger(GpsConsumer.class);
private final KafkaConsumer<String, String> consumer;
private final String topic;
//计数消费到的消息条数
private static int count = 0;
private FileOutputStream file = null;
private BufferedOutputStream out = null;
private PrintWriter printWriter = null;
private String lineSeparator = null;
private int batchNum = 0;
JedisUtil instance = null;
Jedis jedis = null;

private String cityCode = "";
private Map<String, String> gpsMap = new HashMap<String, String>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public GpsConsumer(String topic, String groupId) {
if (topic.equalsIgnoreCase(TopicName.CHENG_DU_GPS_TOPIC.getTopicName())) {
cityCode = Constants.CITY_CODE_CHENG_DU;
} else if (topic.equalsIgnoreCase(TopicName.XI_AN_GPS_TOPIC.getTopicName())) {
cityCode = Constants.CITY_CODE_XI_AN;
} else if (topic.equalsIgnoreCase(TopicName.HAI_KOU_ORDER_TOPIC.getTopicName())) {
cityCode = Constants.CITY_CODE_HAI_KOU;
}else{
throw new IllegalArgumentException(topic+",主题名称不合法!");
}

Properties props = new Properties();

//dev-hdp
// props.put("bootstrap.servers", "192.168.21.173:6667,192.168.21.174:6667,192.168.21.175:6667");
//dev-cdh
// props.put("bootstrap.servers", "192.168.21.177:9092,192.168.21.178:9092,192.168.21.179:9092");
//pro-cdh
props.put("bootstrap.servers", Constants.KAFKA_BOOTSTRAP_SERVERS);

// props.put("bootstrap.servers", "192.168.21.178:9092");

props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
// props.put("auto.offset.reset", "latest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<String,String>(props);
this.topic = topic;
}

@Override
public void run() {
while (true) {
try {
doWork();
} catch (Exception e) {
e.printStackTrace();
}
}
}

public void doWork() throws Exception {
batchNum++;
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("第" + batchNum + "批次," + records.count());
//司机ID
String driverId = "";
//订单ID
String orderId = "";
//经度
String lng = "";
//维度
String lat = "";
//时间戳
String timestamp = "";
Order order = null;
Order startEndTimeOrder = null;
Object tmpOrderObj = null;
if (records.count() > 0) {
Table table = HBaseUtil.getTable(Constants.HTAB_GPS);
JedisUtil instance = JedisUtil.getInstance();
jedis = instance.getJedis();
List<Put> puts = new ArrayList<>();
String rowkey = "";

if (gpsMap.size() > 0) {
gpsMap.clear();
}

//表不存在时创建表
if (!HBaseUtil.tableExists(Constants.HTAB_GPS)) {
HBaseUtil.createTable(HBaseUtil.getConnection(), Constants.HTAB_GPS, Constants.DEFAULT_FAMILY);
}

for (ConsumerRecord<String, String> record : records) {
count++;
log.warn("Received message: (" + record.key() + ", " + record.value() + ") at offset " +
record.offset() + ",count:" + count);
String value = record.value();
if (value.contains(",")) {
order = new Order();
String[] split = value.split(",");
driverId = split[0];
orderId = split[1];
timestamp = split[2];
lng = split[3];
lat = split[4];

rowkey = orderId + "_" + timestamp;
gpsMap.put("CITYCODE", cityCode);
gpsMap.put("DRIVERID", driverId);
gpsMap.put("ORDERID", orderId);
gpsMap.put("TIMESTAMP", timestamp + "");
gpsMap.put("TIME", sdf.format(new Date(Long.parseLong(timestamp+"000"))));
gpsMap.put("LNG", lng);
gpsMap.put("LAT", lat);

order.setOrderId(orderId);

puts.add(HBaseUtil.createPut(rowkey, Constants.DEFAULT_FAMILY.getBytes(), gpsMap));

//1.存入实时订单单号
jedis.sadd(Constants.REALTIME_ORDERS, cityCode + "_" + orderId);
//2.存入实时订单的经纬度信息
jedis.lpush(cityCode + "_" + orderId, lng + "," + lat);
//3.存入订单的开始结束时间信息

byte[] orderBytes = jedis.hget(Constants.ORDER_START_ENT_TIME.getBytes()
, orderId.getBytes());

if (orderBytes != null) {
tmpOrderObj = ObjUtil.deserialize(orderBytes);
}

if (null != tmpOrderObj) {
startEndTimeOrder = (Order) tmpOrderObj;
startEndTimeOrder.setEndTime(Long.parseLong(timestamp+"000"));
jedis.hset(Constants.ORDER_START_ENT_TIME.getBytes(), orderId.getBytes(),
ObjUtil.serialize(startEndTimeOrder));
} else {
//第一次写入订单的开始时间,开始时间和结束时间一样
order.setStartTime(Long.parseLong(timestamp));
order.setEndTime(Long.parseLong(timestamp));
jedis.hset(Constants.ORDER_START_ENT_TIME.getBytes(), orderId.getBytes(),
ObjUtil.serialize(order));
}
hourOrderInfoGather(jedis,gpsMap);
} else if (value.contains("end")) {
jedis.lpush(cityCode + "_" + orderId, value);
}
}
table.put(puts);
instance.returnJedis(jedis);
}
log.warn("正常结束...");
}

/**
* 统计城市的每小时的订单信息和订单数
* @throws Exception
*/
public void hourOrderInfoGather(Jedis jedis,Map<String, String> gpsMap) throws Exception{
String time = gpsMap.get("TIME");
String orderId = gpsMap.get("ORDERID");
String day = time.substring(0,time.indexOf(" "));
String hour = time.split(" ")[1].substring(0,2);
//redis表名,小时订单统计
String hourOrderCountTab = cityCode+"_"+day+"_hour_order_count";

//redis表名,小时订单ID
String hourOrderField = cityCode+"_"+day+"_"+hour;
String hourOrder = cityCode+"_order";

int hourOrderCount = 0;
//redis set集合中存放每小时内的所有订单id
if(!jedis.sismember(hourOrder,orderId)){
//使用set存储小时订单id
jedis.sadd(hourOrder,orderId);
String hourOrdernum = jedis.hget(hourOrderCountTab, hourOrderField);
if(StringUtils.isEmpty(hourOrdernum)){
hourOrderCount = 1;
}else{
hourOrderCount = Integer.parseInt(hourOrdernum)+1;
}

//HashMap 存储每个小时的订单总数
jedis.hset(hourOrderCountTab,hourOrderField,hourOrderCount+"");
}
}

public static void main(String[] args) {

Logger.getLogger("org.apache.kafka").setLevel(Level.INFO);
//kafka主题
String topic = "cheng_du_gps_topic";
//消费组id
String groupId = "cheng_du_gps_consumer_01";

GpsConsumer gpsConsumer = new GpsConsumer(topic, groupId);
Thread start = new Thread(gpsConsumer);
start.start();
}
}

2.2 历史订单轨迹回放

高德轨迹回放示例

功能实现流程:

1571216562760

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
com.cartravel.ordermonitor.TrackMonitorController

/**
* 查询订单历史轨迹点
*
* @param wrapper
* @return
*/
@PostMapping("/historyTrackPoints")
public ResultModel<List<TrackPoint>> historyTrackPoints(@RequestBody QueryWrapper wrapper) {
long startTime = System.currentTimeMillis();
logger.info("【查询图形(点线面)】");
ResultModel<List<TrackPoint>> result = new ResultModel<List<TrackPoint>>();
Object tmpOrderObj = null;
Order startEndTimeOrder = null;
List<TrackPoint> list = null;

try {
String orderId = wrapper.getOrderId();
JedisUtil instance = JedisUtil.getInstance();
Jedis jedis = instance.getJedis();

byte[] orderBytes = jedis.hget(Constants.ORDER_START_ENT_TIME.getBytes()
, orderId.getBytes());

if (orderBytes != null) {
tmpOrderObj = ObjUtil.deserialize(orderBytes);
}

if (null != tmpOrderObj) {
startEndTimeOrder = (Order) tmpOrderObj;
String starttime = startEndTimeOrder.getStartTime() + "";
String enttime = startEndTimeOrder.getEndTime() + "";

String tableName = Constants.HTAB_GPS;
list = HBaseUtil.getRest(tableName, wrapper.getOrderId(),
starttime, enttime, TrackPoint.class);
}

result.setSuccess(true);
result.setData(list);
} catch (Exception e) {
result.setMsg(e.getMessage());
}
logger.info("【查询订单历史轨迹点】msg:{},time:{}", result.getMsg(),
System.currentTimeMillis() - startTime);
return result;
}