背景:熟悉MR执行的步骤后,可以往3个点继续分析:
1. code:MR的执行code,根据执行的步骤产出流程图。
2.引擎:了解TEZ/SPARK sql执行的步骤,产出如MR一样的流程图,清楚MR,TEZ,SPARK SQL的区分
3.sql编译过程:熟悉hsql提交到执行计划,到MR执行的过程,输出文档。
目前从第三点入手,主要还是跟工作息息相关。
美团文章:https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
看懂每个细节,那么hive-sql编译这块就查不到了。
一、HSQL提交后的基本转化步骤:
1. hue提交HiveSQL
2. Parser,根据Antlr定义的规则,完成语法解析,将SQL抽象为AST Tree语法树。
3. Semantic Analyzer,抽象除查询的基本组成单元QueryBlock
4. Logical plan,遍历QueryBlock,转化为执行操作树OperatporTree
5. Logical plan optimizer,优化器对OperatorTree进行变换操作,合并不必要的ReduceSinkOperator,减少shuffle数据量
6. Physical plan,遍历OperatorTree,转换为MR任务
7. Logical plan optimizer,物理层优化器对MR任务进行变换操作,生成最终的执行计划
二、每个步骤作用或者目的(简单版本)
1. Antlr是个开源语言识别工具,可以完成词法分析、语法分析、中间代码生成,最终还可以转换为antlr的抽象语法树。【其实就是使用下开源工具来做sql解析,自己做麻烦的很】
2. AST 仍然复杂,不够结构化,转化为queryblock将sql进一步抽象和结构化。QB是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。【1解析的结果太复杂,使用QB简化,不然3步骤处理起来麻烦】
3. 将QB转化为operator tree。map和reduce阶段均是由operatortree组成的。逻辑操作符,就是在Map阶段或者Reduce阶段完成单一特定的操作。【2结果转化MR认识的结构和形状】
4. 就对3结果进行优化:合并操作符,达到减少MapReduce Job,减少shuffle数据量的目的【重点再详解】
5. 就是把operator tree变化为MR任务。【重点再详解】
6. 就是对5输出MR任务进行优化。【重点再详解】
三、每个步骤重点详解:
1. antlr:
a. 词法规则和语法规则:词法规则HiveLexer.g和语法规则的4个文件SelectClauseParser.g,FromClauseParser.g,IdentifiersParser.g,HiveParser.g
b. 注意一下内层子查询也会生成一个TOK_DESTINATION节点。请看上面SelectStatement的语法规则,这个节点是在语法改写中特意增加了的一个节点。原因是Hive中所有查询的数据均会保存在 HDFS临时的文件中,无论是中间的子查询还是查询最终的结果,Insert语句最终会将数据写入表所在的HDFS目录下。
2. queryblock:
a. AST Tree生成QueryBlock的过程是一个递归的过程,先序遍历AST Tree,遇到不同的Token节点,保存到相应的属性中,主要包含以下几个过程:
1. TOK_QUERY => 创建QB对象,循环递归子节点
2. TOK_FROM => 将表名语法部分保存到QB对象的aliasToTabs
等属性中
3. TOK_INSERT => 循环递归子节点
4. TOK_DESTINATION => 将输出目标的语法部分保存在QBParseInfo对象的nameToDest属性中
5. TOK_SELECT => 分别将查询表达式的语法部分保存在destToSelExpr
、destToAggregationExprs
、destToDistinctFuncExprs
三个属性中
6. TOK_WHERE => 将Where部分的语法保存在QBParseInfo对象的destToWhereExpr属性中
b. QB中重要的属性:
1. aliasToSubq(表示QB类的aliasToSubq属性)保存子查询的QB对象,aliasToSubq key值是子查询的别名
2. nameToDest,key的形式是inclause-i,value是对应的ASTNode节点,即TOK_DESTINATION节点
3. JoinExpr保存TOK_JOIN节点
4. QBExpr这个对象是为了表示Union操作
5. qbm保存每个输入表的元信息,比如表在HDFS上的路径,保存表数据的文件格式等
3. Operator:
a. 基本操作符:TableScanOperator,SelectOperator,FilterOperator,JoinOperator,GroupByOperator,ReduceSinkOperator
1. TableScanOperator从MapReduce框架的Map接口原始输入表的数据,控制扫描表的数据行数,标记是从原表中取数据
2. JoinOperator完成Join操作
3. FilterOperator完成过滤操作
4. ReduceSinkOperator将Map端的字段组合序列化为Reduce Key/value, Partition Key,只可能出现在Map阶段,同时也标志着Hive生成的MapReduce程序中Map阶段的结束
5. Operator在Map Reduce阶段之间的数据传递都是一个流式的过程。每一个Operator对一行数据完成操作后之后将数据传递给childOperator计算【????】
b. operator主要的属性和方法:
1. RowSchema表示Operator的输出字段
2. InputObjInspector outputObjInspector解析输入和输出字段
3. processOp接收父Operator传递的数据,forward将处理好的数据传递给子Operator处理
4. Hive每一行数据经过一个Operator处理之后,会对字段重新编号,colExprMap记录每个表达式经过当前Operator处理前后的名称对应关系,在下一个阶段逻辑优化阶段用来回溯字段名
5. 由于Hive的MapReduce程序是一个动态的程序,即不确定一个MapReduce Job会进行什么运算,可能是Join,也可能是GroupBy,所以Operator将所有运行时需要的参数保存在OperatorDesc 中,OperatorDesc在提交任务前序列化到HDFS上,在MapReduce任务执行前从HDFS读取并反序列化
c. QB转化为Operator tree步骤:
1. aliasToSubq => 有子查询,递归调用
2. aliasToTabs => TableScanOperator
3. joinExpr => QBJoinTree => ReduceSinkOperator + JoinOperator
4. destToWhereExpr => FilterOperator
5. destToGroupby => ReduceSinkOperator + GroupByOperator
6. destToOrderby => ReduceSinkOperator + ExtractOperator
备注:1. 由于Join/GroupBy/OrderBy均需要在Reduce阶段完成,所以在生成相应操作的Operator之前都会先生成一个ReduceSinkOperator,将字段组合并序列化为Reduce Key/value, Partition Key
2. 美团那片文章有具体的例子和详细的解释,可以阅读下非常变化理解转化过程
4. 逻辑层优化器
a. SimpleFetchOptimizer:优化没有GroupBy表达式的聚合查询 【减少shuffle数据量】
b. MapJoinProcessor:MapJoin,需要SQL中提供hint,0.11版本已不用 【减少shuffle数据量】
c. BucketMapJoinOptimizer:BucketMapJoin 【减少shuffle数据量】 d.GroupByOptimizer:Map端聚合 【减少shuffle数据量】 e. ReduceSinkDeDuplication:合并线性的OperatorTree中partition/sort key相同的reduce【一个Job干尽可能多的事情/合并】 f.PredicatePushDown:谓词前置 【一个Job干尽可能多的事情/合并】 g. CorrelationOptimizer:利用查询中的相关性,合并有相关性的Job,HIVE-2206 【一个Job干尽可能多的事情/合并】 h. ColumnPruner:字段剪枝 【一个Job干尽可能多的事情/合并】5. OperatorTree生成MapReduce Job的过程
a. rule1:找出树种符合 "TS%"的跟节点,生成mapreducetask[stage-1]对象,确定mapwork
b. rule2:规则"TS%.*RS%",在继续遍历ts[p]的子operator种。
c. rule3:规则"RS%.*RS%",此时会在第二个RS之前将树剪开,并为join生成一个FS,RS会生成一个TS【其实就是临时中间表】
d. rule4:FS%,这时候将movetask与mapreducetask连接起来,并且生成一个statstask。
e.合并stage:情况opstack,将towalk第二个元素加入栈。遍历其他节点,重复上述步骤。
f.切分map reduce: 以RS为界,切分map和reducework。
6. 物理层优化器
a. map_join:简单说是将小表读入内存,顺序扫描大表完成join。
1. mapreduce local task,将小表读入内存,生成hashtablefiles上传到distributed cache,并且对hashtablefiles进行压缩。
2. mapreduce的map阶段,会从distributed cache读取hash tablefules到内存,顺序扫描大表,在map阶段直接进行join,将数据传递给下一个mapreduce
3. 若join的两张表一张是临时表,就会生成一个conditionaltask,在运行期间判断是否使用mapjoin 的 commonjoinresolver优化器,commonjoinresolver优化器就是将commonjoin转化为mapjoin,
4. 转化过程: 1. 深度优先遍历task tree。2. 找到joinoperator,判断左右表数据量大小 。 3.对与小表+大表=>mapjointask,对于小/大表 + 中间表=>conditionaltask