[TOC]
1、项目需求
数据类型:csv, orc, excel, word, txt, log
数据存储位置
不是在一台机器上
第三方
公司内部多个系统,
因为历史积累原因。数据集存储在了不同的引擎:redis、mysql、hbase、hdfs、elasticsearch、kafka等
因为早前没有统一规划数据源,导致数据源多达十几种;所以不好规划统一,导致查询关联及其麻烦(要各种预先抽取多个数据源到同一个地方,然后在做统一处理,最后出报表 ,而且查询及其缓慢)
2、涉及知识点
1、前后端任务交换指令:Akka
2、计算引擎:sparkSQL
3、二次定义sparkSQL语法:Antlr
4、服务自动发现:zookeeper
3、项目架构说明
3.1.逻辑架构
3.2.项目架构
4、最终目标
.1、项目的实现细节
4、Actor入门
通讯原理图:
4.1、java和scala在并发编程模型对比:
Java内置线程模型 | Scala Actor模型 |
---|---|
“共享数据锁模型” | share nothing |
每个object有一个monitor,监视多线程对共享数据的访问【线程内部】 | 不共享数据,actor之间通过message传递(基于事件驱动) |
加锁的代码通过synchronized标志 | |
死锁的问题 | |
每个线程内部是顺序执行的 | 每个actor内部是顺序执行的 |
4.2、Actor的执行顺序
1、调用start()方法启动Actor
2、调用start()方法后其act()方法会被执行
3、向Actor发送消息
4.3、发送消息的方式
! | 发送异步消息,没有返回值。 |
---|---|
!? | 发送同步消息,等待返回值。 |
!! | 发送异步消息,返回值是 Future[Any]。 |
4.4、Actor例子
1 | <dependency> |
4.1.1、Actor可以不断的接收消息
驱动程序
说明:在act()方法中加入了while (true) 循环,就可以不停的接收消息
注意:发送start消息和stop的消息是异步的,但是Actor接收到消息执行的过程是同步的按顺序执行
4.1.2:结合case class发送消息
1):写3个case class
2):接收消息的actor
3):驱动
5:Akka入门
akka,一款高性能,高容错,分布式的并行框架
特点:
1.并行与并发
2.异步非阻塞
3.容错
4.持久化
5.轻量级,每个actor占用内存比较小 300byte,1G 内存容纳300w个actor
场景:
分布式计算中的分布式通讯,解决的是高并发场景的问题,(消息体比较小),吞吐量不是很高,零拷贝()
密集型计算场景
总结:对高并发和密集型的计算场景,akka都可以使用
5.1、使用Akka来进行消息传递
驱动:
pom里面需要引入akka的配置:
1 | <dependency> |
6、工程搭建
6.1、工程模块的创建
1):创建模块名称和工程包名
6.2、给工程添加依赖关系
各个模块配置工程以来pom.xml文件
6.3:编写驱动程序
本节的目的:
把驱动程序编写好,并启动起来;
但是让驱动能够顺利启动,我们需要完成如下操作:
6.3.1、编写参数的获取和校验操作
6.3.2、构建解释器基类
然后写任务状态的基类:
在提供对外的解释器接口
6.3.3、创建文本解析标识的代码包
6.3.4、对解释器基类的增强
第一步:在伴生类中提供换行符匹配操作(就是个正则表达式)
第二步:对接口进行功能增强
问题:对spark-shell绑定变量的作用是什么?
第三步:spark-shell绑定变量
spark-shell绑定变量,第一个要绑定的就是sparkSession
以及:sparkContext下面的内容、隐士转换、sparkSQL、udf函数等内容
但是现在我们还没有sparkSession , 所以我们先实现一个sparkSession的构建
【com.kkb.engine下面构建EngineSession】
因为执行了enableHiveSupport,所以需要加入服务器的hive-site.xml文件
然后启动metastore服务
因为我们参照的是livy代码,本身就是一个rest 服务,用来做spark和web端的一种交互;
所以livy很好的帮我们解决了,spark-shell从初始化的绑定、到绑定变量错误的处理;
都已经帮我们解决好了
6.3.5、实现spark解释器
这样,我们构建好了spark的解释器,实际就是为了构建一个属于自己的spark-shell;
好在其他框架实现了,我们只需要把其他框架的源码拿来即可
所以我们在回到 驱动程序App类:
【App类】
6.3.6:获取zk的客户端
这样,我们继续按照流程往下走 , 那么此时就要构建zk的客户端了
因为,我们后面会把 引擎注册到zk里面,并且依赖于zk进行服务的自动发现
【在common工程下,创建zk的工具类】
第一步:导包
我们采用第三方的zk工具,尽量帮我们封装代码
1 | <dependency> |
然后创建ZKUtils工具类
第二步:编写获取zk客户端代码
1):
2):
3):
4):
把之前写的加载配置文件的工具类,放入common工程下
然后把common工程打入engine工程里面
就是在engine的pom文件里面添加common工程包
1 | <dependency> |
5):
6.3.7、注册Akka并发变成模型
第一步:把基本信息类导入
第二步:考虑引擎在zk中的情况
那么,我们如果注册引擎,其实就是把PlatEngine这个类注册到zk里面去;
那么问题来了:
最开始运行的时候 ,zk里面肯定没有引擎信息(所谓引擎信息 , 我们认为其实就是id –> ip:port);
1 –> node1:3001
2—>node2:3002
但是如果不是最初启动时候,那么引擎肯定是存在的,那么我们就要确保,再次注册引擎的时候 ,
id—>ip:port
这里面的id和端口,绝对不能重复
所以我们需要让id和端口绝对不一致,那么最好的方式就是用zk来维护状态 , 如果这个状态存在,那么就将id和端口递增
第三步:编写引擎注册到zk的代码
注册引擎,大概分成4个步骤:
1、准备好目录信息
2、创建引擎的父目录
3、创建引擎的临时节点(数据写入节点信息)
4、把前3步合并
那么接下来,我们来实现这个功能:
1、准备好目录信息
2、创建引擎的父目录
如果上面这个路径在zk里面是不存在的,那么直接创建一个永久节点路径;作为引擎的存储路径
3、创建引擎的临时节点(数据写入节点信息)
这样,有了父目录之后,我们就可以把数据信息写入;
所以健壮的写法是,必须考虑,如果父目录不存在的情况:
4、合并前3步,完成引擎的注册
‘
第四步:封装注册引擎代码,并返回akka配置信息
1、创建包和类
2、
3、利用我们刚刚写的zk方法,进行注册
第五步:需要考虑,如果后续需要注册多个引擎,怎么办
1、去zk里面查看是否有已经存在的引擎
1.1):在zk里面添加查询子节点的功能
获取完子节点后,把里面的数据拿出来,就是IP:port
1.2):获取子节点里面的数据
1.3):将返回的元组:(Option[String , Stat]) , 封装成引擎信息
【将1.2代码利用到1.3】
1.4):合并前三步
我们第一步获取子节点名称(id:1、2、3)
然后根据子节点的名称,来通过第第三步,来获取具体的引擎信息
所以,我们这一步做个合并处理
这样我们封装好了,如何获取zookeeper里面的引擎信息,那么按照步骤,我们接下来要顺序注册引擎
比如:
最开始
id = 1 , port = 3000
顺序增长
id = 2 , port = 3001
2、顺序增长id和port
那么顺序增长的前提是,当前zookeeper里面已经存在这个引擎了,所以才会顺序增长
最后在把我们刚刚写好的,注册引擎拿到最下面,顺序增长完port和id以后,开始注册
3、把注册代码拿下来
6.3.8、回到驱动类,注册Akka信息
这样,我们在回到驱动类,注册Akka信息
6.3.9:测试上面的代码
然后我们在测试下,上面写的代码是不是达到预期的效果
然后去zookeeper里面查看
6.3.10、获取当前akka的参数
经过测试,我们上面写的代码没有任何问题;
那么接下来,我们要获取当前akka的地址(也就是引擎的地址)
6.3.11、把引擎信息,维护到EngineSession里面
我们在构建spark-shell功能时候,把sparkSession维护到了EngineSession里面了
那么干脆 , 我们就把这个类作为任务状态的统计类;
那么我们把引擎信息,也维护进来
然后我们回到驱动程序,实例化一个EngineSession
6.3.12、设置并行度
那么接下来,我们就要启动Akka了 ;
这样,我们有了内部参数的维护,那么我们在返回驱动程序 , 把并行度设置上
6.3.13、创建Akka模型
我们之所以设置并行度,最终目标就是要并行的启动Akka任务模型
所以接下来,我们要创建 一个Akka的任务模型
6.3.14:启动Akka模型
现在我们有了akka模型 , 而且我们也拿到了并行度
接下来启动所有模型
这样我们就正常的启动了;
但是有这样一种可能,就是很可能主线程提前结束了,子线程还在继续运行;
就是可能会出现僵尸进程!
6.3.15:避免出现僵尸进程
所以我们需要让主线程等待子线程结束后,在执行关闭回收操作
所以我们在EngineSession里面,添加一些功能,让主线程等待子线程
然后让JobActor继承这个日志功能
7、编写JobActor
7.1、编写jobActor的初始化preStart阶段
7.1.1、将jobActor注册到zookeeper中
第一步,在ZKUtiils里面添加引擎路径
第二步:在jobActor里拼接引擎路径
第三步:初始化zk客户端
因为我们来初始化zk的客户端
第四步:将actor的引擎,注册到zk
首先我们在ZKUtils里面封装个方法,专门来对接jobActor的注册
然后在jobActor的初始化里面,进行注册jobActor的引擎
然后启动测试,查看zk里面是否注册进去
测试结果
7.2、编写jobActor的结束阶段代码
首先我们把后续需要的一些变量提前初始化好:
1、spark的解释器
2、sparkSession
上面这俩变量,会在actor的生命周期结束时候进行回收
第一步:定义成员变量
第二步:初始化阶段给上面两个变量赋值
第三步:在postStop,actor的生命周期结束阶段对这俩变量进行回收
7.3 、编写jobActor的receive
首先我们要开始编写一个actor的钩子,目的很简单,万一出现了错误,我们可以 catch住这个错误,然后把错误回显给客户端(web端)
7.3.1、编写一个 actor的钩子
7.3.2、在receive里匹配指令
1):匹配指令 , 添加actorHook,并初始化变量
2):编写一个获取全局唯一的任务组ID , 因为后续会基于任务组ID来获取引擎信息
3):将job信息(包含任务组ID) ,返回给前端
4):更新线程副本里面的作业描述
之所以这样做:
1、可以对任务作出描述,方便任务的web端定位
2、有了任务的描述,那么后续是可以取消已经提交的任务
5):基于commandMode进行匹配具体操作
接下来,我们要匹配具体的操作,就是看传递过来的指令是代码还是SQL;
然后根据指令的不同,选择不同的操作方式
7.3.3:接收CODE,然后处理
定义一个变量assemble_instruction,来组装命令
接受命令:
最后处理完后,把结果回显给客户端
上面处理后,会返回一个response的结果 , 我们需要把结果回显给客户端
因此,我们需要接收这个response,然后解析他 , 然后收集他
第一步:去EngineSession添加一个记录任务的map集合
去EngineSession添加一个记录任务的map集合 , 主要是为了保存批处理的作业信息
第二步:根据执行的结果,存储job的状态
第三步:响应任务状态
7.3.4:接收SQL,然后处理
SQL:select name from person where age > 18
sql执行流程(sql生命周期):
不管解析被划分为几步,在Spark 执行环境中,都要转化成RDD的调用代码,才能被spark core所执行
那么这里面有个关键的点,就是查询的SQL , 怎么转化成Unresolved LogicalPlan;
Unresolved LogicalPlan 这个阶段接收的是抽象的语法树,所以我们需要知道的就是,这个SQL语句是怎么转成抽象语法树的;
答案就是:antlr4(spark是在2.0以后,开始使用antlr4解析的sql语法)
spark通过antlr4去解析SQL语句,形成抽象语法树AST;
也就是说,详细的流程是这样的:
7.3.4.1:antlr的入门
1)语法
grammar
名称和文件名要一致- Parser 规则(即 non-terminal)以小写字母开始
- Lexer 规则(即 terminal)以大写字母开始
- 用
'string'
单引号引出字符串
2):配置antlr的环境变量
首先你要有配置antlr的环境变量
1 | export CLASSPATH=".:/usr/local/lib/antlr-4.5.3-complete.jar:$CLASSPATH" |
然后我们按照语法格式来写一个hello word的代码;
【随便打开 一个maven工程,然后导包】
1 | <dependency> |
第一步:编写antlr文件
通过上面,我们大概了解,antlr其实主要就是写正则
第二步:生成java代码
接下来,通过命令生成java代码
1 | antlr4 learnAntlr.g4 |
第三步:重写learnAntlrBaseListener
第四步:词法和语法解析
第五步:运行测试
7.3.4.2:在代码中编写antlr文件
7.3.4.3:讲解antlr文件里面的意思
1):load的列子
假如我想实现加载文件操作,形成一个表,然后在基于这个表做查询操作;
以上的动作是:数据源—>load(加载)——>select这个表
比如:
那么我们在Engine.g4文件中就是:
他们的对应关系就是:
2):save的列子
比如我们从kafka记载了数据,形成tb表 , 然后将数据写入mysql
简单说就是: kafka –>load –>save —>mysql
1 | load kafka.veche4 |
那么对应我们antlr文件就是:
load操作,我们刚刚讲过了,接下来在说下,怎么save的
【看图做对比】
7.3.4.4:最后把剩余的功能统一说下
7.3.4.5:生成antlr的代码
命令:
1 | antlr4 Engine.g4 |
7.3.5:重写EngineBaseListener
就像我们自己写的例子一样,此时我们要重写一下EngineBaseListener,对这个类的功能做增量,来满足我们的需求
第一步:创建EngineSQLExecListener类【不带参数】
第二步:重写EngineSQLExecListener里面的exitSql方法
重写这个方法的依据:
antlr4在离开sql的时候,会触发exitSql
所以我们要重写这块儿,来触发我们的业务逻辑
第三步:测试上面的步骤
1):首先我们要先确保当前的流程一定是通的,所以我们要测试下,确保当前流程绝对没问题
所以我们写个方法,专门来执行词法 和语法的解析
2):然后我们测试下,看看操作是否通过
3):然后集成一下接收到命令后,怎么对接这块儿
4):启动App驱动,然后在test里面发送命令
第四步:编写模式匹配的load操作
接下来,我们来完善load操作:
1):创建包
2):规范化sql里面的方法
编写trait来规范化操作,方便后期维护,也是一种面向接口的编程思想
3):编写load操作,LoadAdaptor
第一步:编写整体的大框
第二步:确定要解析的内容和成员变量
在这个里面,我们要解析语法:
1 | LOAD format POINT? path WHERE? expression? booleanExpression* AS tablename |
第三步:遍历语法树的孩子节点
1 | spark.job.mode = stream 就代表是流处理 , 否则就是离线处理 |
4):EngineSQLExecListener这个类的模式匹配,将LoadAdaptor做关联
5):测试当前的LoadAdaptor
测试LoadAdaptor是否对当前节点树做了解析,那么直接打印一下这个option信息
在测试类,编写测试命令
离线:
1 | val instruction = "load jdbc.testTableName " + |
流:
1 | val instruction = "load kafka.`veche4`" + |
第五步:处理匹配离线处理的数据源
这一步,我们要根据刚刚load操作出来的结果,然后去匹配操作。然后去查询出数据
也就是;
load —>匹配到离线 —>select结果
1):构建离线匹配数据源的类(BatchJobLoadAdaptor)
因为我们对接数据源是通过spark sql去处理的,所以我们先在EngineSQLExecListener这个监听者实现类里面添加sparkSession
这样我们的BatchJobLoadAdaptor(匹配离线数据源)就可以使用sparkSQL,去查询数据源的数据了
2):监听者实现类(EngineSQLExecListener),添加sparkSession
3):离线匹配数据源的类(BatchJobLoadAdaptor)开始根据sparkSession查询数据
匹配数据源
最后回到JobActor , 给EngineSQLExecListener添加sparkSession的参数
然后在LoadAdaptor里面添加上BatchJobLoadAdaptor匹配数据源的操作
4):接下来,做个简单的测试,看看能不能查出来数据
在BatchJobLoadAdaptor里面打印一下结果
最后在测试类执行如下代码
结果:
出现如上结果,此时说明流程已经走通
第六步:完成EngineSQLExecListener里面的select操作
sparkSession.sql(查询sql)的操作
所以 我们创建一个selectAdaptor类,来匹配select操作
1):然后编写selectAdaptor里面的内容:
2):执行sql语句,创建临时表,然后在把处理结果进行保存
加载这个结果的原因说明:
我们后面要将处理的结果返回给客户端 , 并且还要将处理的结果保存起来;
比如说,后续公司需要开发一些功能,将处理后的数据结果,可以下载下来,保存成csv文件;
那么此时对处理结果进行保存,显然是必要的
最后:
我们在JobActor里面封装一下解析SQL的操作,让我们解析并执行完SQL以后,可以拿到结果
第七步:封装处理结果操作
我们返回的处理结果是dataFrame , 所以我们需要将dataFrame转成json
这样在将结果返回给客户端(因为客户端没法直接解析dataFrame)
1):将dataFrame结果,解析成json
在adaptor工程下,写个方法,专门来将dataFrame转成json
方法:
使用SparkSQL内置函数接口开发StructType/Row转Json函数,我们以前肯定使用过:
【列子】
1 | def main(args: Array[String]): Unit = { |
跟踪上述,发现最终都会调用Spark源码中的org.apache.spark.sql.execution.datasources.json.JacksonGenerator类,使用Jackson,根据传入的StructType、JsonGenerator和InternalRow,生成Json字符串。
JacksonGenerator类
【我们在adaptor工程下使用】
2):编写结果的报告处理和结果落地保存
3):把解析SQL操作和汇报结果操作进行封装
4):最后在匹配SQL处理处 , 使用parseSQL方法
5):测试上面的所有操作
在汇报结果的方法reportResult里面,对处理的结果展示一下
【最后结果展示】
sql的结果
hdfs上的数据
7.3.6、编写存储数据SaveAdaptor
在存储数据时,我们需要考虑两个场景:
1 | 1:考虑场景问题:离线、流式 |
7.3.6.1、创建SaveAdaptor类
7.3.6.2、分析antlr里面的save操作
1 | ('save'|'SAVE') (overwrite | append | errorIfExists | ignore | update)* tableName 'as' format '.' path ('where' | 'WHERE')? expression? booleanExpression* ('partitionBy' col)? ('coalesce' numPartition)? |
上面的语法解析,实际就是解析类似如下的SQL
7.3.6.3、写好局部变量,对应解析后的值
接下来要提供一些局部变量,我们解析出来值后,赋值给这些局部变量
7.3.6.4、遍历节点树,解析每一个节点
接下来遍历节点,按照我们自己写的antlr语法规则,开始遍历语法规则