一、背景
hive、spark、flink是hadoop最常用的,三个计算入口。hive最古老,同时有metastore,spark算的快,flink流技术支持最好。目前hive和spark融合度高,flink较为独行。
1.1 spark、hive关系:
hive和spark相互依存,如hive使用spark引擎,进行计算(当然也能使用tez引擎),spark连接hive metastore,获取表的元数据关系。本文不讨论tez引擎的问题。仅讨论hive使用spark引擎,和spark连接spark元数据情形。
hive使用spark引擎:
虽然hive使用spark引擎,但是不会触发spark的钩子函数的。仅作为引擎使用。
spark连接hive metastore。
spark内执行ddl的时候,spark监听ddl可以捕捉到。同时由于修改hive metastore,hive metastore的钩子也能监听到。所以spark 和 hive都能监听到,如果都监听务必仅向数据库写入一份,或者直接都用merge方式写入。
1.2 目前已经有框架
1.2.1、datahub
datahub
领英,创建,完全开源。社区较为活跃,依赖图数据库,文档详尽,支持很多数据源。定义为数据发现平台(Data Discovery Platform ),数据管理平台,集成了元数据管理和数据血缘功能。拥有UI界面。由python和java编写完成,元数据导入等使用python脚本完成。
数据血缘的表示如下:
source
1.2.2、atlas
atlas
阿特拉斯,由apache主导,社区活跃略低,较重,依赖hbase。集成数据发现功能,数据定义提较为灵活。
支持
1.3、血缘粒度
hive在不太老的版本上可以实现column级血缘,下文有介绍。
spark目前较难实现column级血缘。只能实现table级。
1.4、项目架构
其中有些是比较特别的,比如SQL形式的create table as
,create view
,和SPARK的save into datasource
,都是又是ddl也是dml的,会被执行两次。但由于项目中数据模型的一致性且存储模型均使用merge方式,所以最终存储到图数据的数据不会受到影响。
二、hive数据血缘实现思路
2.1 hive数据血缘捕捉实现
实现org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext
可以获取到任意类型的sql,比如insert,create table as,explain,use database等等。
需要注意的是:
(1)此类监测的hiveserver2,因此spark、flink的ddl等都不会被此类监测到。
(2)此类能检测到sql文本内容。
此类只有一个方法
void run(HookContext var1) throws Exception;
从HookContext中可以获取到
// 查询计划
val plan: QueryPlan = hookContext.getQueryPlan
// 查询的operation,即查询类型
// 例如 org.apache.hadoop.hive.ql.plan.HiveOperation#QUERY 通过 getOperationName方法可以和op进行匹配
val op = plan.getOperationName
// 通过SessionState可以获取到当前的SessionId和当前数据库名,还有当前Session内的所有临时temporary表名称,还有当前的HiveConf。
val ss: SessionState = SessionState.get
// 获取当前Session的用户名称
val userName = hookContext.getUgi.getUserName
// 执行时间
val queryTime = getQueryTime(plan)
// 执行的Sql语句
val sql = plan.getQueryStr.trim
// 通过HiveMetaStoreClient 可以实时再查询库表的元数据。
// 使用ms需要注意:如果此表已经drop或者alter就不要使用ms查询了,否则程序报异常。
val ms: HiveMetaStoreClient = getMetastore(ss.getConf)
和血缘有关系的操作有一些内容:
本例中view相关的也纳入血缘范畴。
op match {
case dml if (Set(HiveOperation.QUERY.getOperationName // insert into
, HiveOperation.CREATETABLE_AS_SELECT.getOperationName// create table as
, HiveOperation.CREATEVIEW.getOperationName // create view
, HiveOperation.ALTERVIEW_RENAME.getOperationName // alter view as
).contains(op)) => {
// 下文将详细讲解
}
case load if Set(HiveOperation.LOAD.getOperationName).contains(load) => {
LOG.info(s"lineage event: ${op}!")
// 封装对象
}
case truncate if Set(HiveOperation.TRUNCATETABLE.getOperationName).contains(truncate) => {
// 封装对象
}
case other=> LOG.info(s"lineage event: ${op} passed!")
}
这里着重说明case dml
,情形较为复杂:
主要分为finalSelOps是否为空两种情况。
def toScalaLinkedHashMap[K, V](input: java.util.Map[K, V]): LinkedHashMap[K, V] = {
val output: LinkedHashMap[K, V] = LinkedHashMap.empty
output.putAll(input)
output
}
// index是指每一列都对应已改索引,血缘信息需要通过index查找到
val index: LineageCtx.Index = hookContext.getIndex
// finalSelOps就是select最终获取的列的信息
// LinkedHashMap[列名称,[SelectOperator,sink表]]
val finalSelOps: mutable.LinkedHashMap[String, ObjectPair[SelectOperator, Table]] = toScalaLinkedHashMap(index.getFinalSelectOps)
// hive2.3.9以后修复了finalSelOps为空的bug,能获取列的血缘信息。
// 具体参考:https://issues.apache.org/jira/browse/HIVE-14706
if (finalSelOps.values.isEmpty) {
// 无法获取最终选取的列的信息,就只能获取表的血缘了。
// plan.getInputs获取数据的source,可能是表,视图,临时表,也可能是 insert into values(我是临时表) 生成的临时表 ,另外注意:如insert into table from some view时候,plan.getInputs会将view和其关联的table名都获取到,此时无法区分到底是从视图+表而来,还是只从视图来。
// plan.getOutputs获取数据的sink,可能含有database需要过滤,比如create table实际也修改database信息。需要注意:如果师表必须判断是否为temporary表,如果是临时表需要在程序里创建一个临时cache,存储此临时表的血缘信息。之后可能在plan.getInputs中使用到此临时表的血缘信息,将临时表替换为实体表或视图的血缘信息。
}else{
// 可以获取列级别血缘
// 获取到最终的insert into表的列信息(不一定是全部列)
var tgtTblCurrSchemas: Seq[FieldSchema] = plan.getResultSchema.getFieldSchemas.toList
for (pair <- finalSelOps.values) {
// 获取每一列的学院信息finalSelOp就是一列的血缘信息。
val finalSelOp: SelectOperator = pair.getFirst
// 每一列的血缘可能来自多个表的多个列。
val tblDeps: Seq[LineageInfo.Dependency]=index.getDependencies(finalSelOp).values().toSeq
// sink表可以通过
// (1) pair.getSecond
// (2) val tblOutputs: mutable.Set[WriteEntity] = plan.getOutputs.filter(out => Set(Entity.Type.TABLE, Entity.Type.PARTITION).contains(out.getType)) // WriteEntity中含有表信息。
}
LineageInfo.Dependency
追溯内容如下:
即能获取到表的列信息。
public static class Dependency implements Serializable {
private static final long serialVersionUID = 1L;
private DependencyType type;
private String expr;
private Set<BaseColumnInfo> baseCols;
}
public static class BaseColumnInfo implements Serializable {
private static final long serialVersionUID = 1L;
private TableAliasInfo tabAlias;
private FieldSchema column;
}
2.2 hive数据血缘debug方式
具体参考
远程remote debug hive的方法,用于hive监听器/钩子编写
只能远程debug,不能本地debug。
三、hive元数据捕捉思路
3.1 全量
使用HiveMetaStoreClient
的方法直接遍历即可。
// 实际使用中更简单,只需要配置HIVE_CONF_DIR环境变量,HiveConf类就会读取hive-site.xml等xml文件内容。
val metastore=new HiveMetaStoreClient(new HiveConf())
3.2 增量
实现org.apache.hadoop.hive.metastore.MetaStoreEventListener
即可
需要注意的是
(1)此类是监听的hivemetastore,即spark、flink等对Metastore的修改都会被监听到。
(2)此类获取的信息较少,不含sql。
(3)hive启动会自动创建实现类不用担心config: Configuration
参数。
class HiveMetastoreHook(config: Configuration) extends MetaStoreEventListener(config) {
...
}
MetaStoreEventListener
类常用的方法如下:
public abstract class MetaStoreEventListener implements Configurable {
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
// contain create view
}
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
// contain drop view
}
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
// contain alter view
}
public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
}
public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
}
}
3.3 debug
同样参考参考:
远程remote debug hive的方法,用于hive监听器/钩子编写
四、spark数据血缘实现思路
4.1 批处理血缘实现
实现org.apache.spark.sql.util.QueryExecutionListener
的onSuccess
方法即可。
/**
* A callback function that will be called when a query executed successfully.
*
* @param funcName name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param durationNs the execution time for this query in nanoseconds.
*
* @note This can be invoked by multiple different threads.
*/
@DeveloperApi
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
需要注意的是:
(1)此方法一般使用post hook,即程序执行完毕后调用此方法,对于以spark-sql、spark-shell作为入口执行spark,则所有sql或者代码都会触发此钩子函数,但是对于spark-submit jar方法提交的任务,由于用户main程序执行完毕后会触发spark session close,程序实际会直接退出,连带这血缘的thread也会退出,导致血缘无法完成,不管是idea本地debug或者yarn执行都如此。
当然也可以使用(pre hook,但产生的血缘关系也是不准确的,因为任务可以取消)
(2)此方法可以加监测到batch,micro batch(stream),ml三种任务的血缘事件。本例忽略了ml机器学习相关的事件。
(3)当在spark中对hive表执行load操作也会被监测到。
(4)spark数据源分为两类:1、hive metastore表;2、datasource,例如:
当然,parquet,csv也都可以是datasource,它的信息是不存储在hive metastore中的。
(5)继续细分spark表或datasource数据模型,可以为:hive metastore表、hdfs 文件、本地文件、jdbc表、kafka topics、hbase表,local(程序内写死的数据),console print等。
(6)spark中表实体类:
// table id 只有表名+数据库名
case class TableIdentifier(table: String, database: Option[String]) extends IdentifierWithDatabase
// catalog table 即从metastore或者内存临时创建的table
case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType, // 存储列信息
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
createVersion: String = "",
properties: Map[String, String] = Map.empty,
stats: Option[CatalogStatistics] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty)
(6)获取sparkSession方式:
有时候执行获取到表名,还需列信息,此时就需要再使用sparkSession从metastore获取。
val sessionOption: Option[SparkSession] = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
(7)spark博大精深,很难弄明白。
4.2 从QueryExecution收集执行过程
// qe:QueryExecution几乎可以获取到所有信息。
var outNodes: Seq[SparkPlan] = qe.sparkPlan.collect {
case p: UnionExec => p.children
case p: DataWritingCommandExec => Seq(p)
case p: WriteToDataSourceV2Exec => Seq(p)
case p: LeafExecNode => Seq(p)
}.flatten
if (qd.sink.isDefined && !outNodes.exists(_.isInstanceOf[WriteToDataSourceV2Exec])) {
val sink = qd.sink.get
outNodes ++= Seq(
WriteToDataSourceV2Exec(
new MicroBatchWriter(0, // MicroBatchWriter就是stream
new SinkDataSourceWriter(sink)), qd.qe.sparkPlan))
}
// 以下内容实际提取模型的代码量较大。省略。
outNodes.flatMap {
case r: ExecutedCommandExec =>{
// 主要和ddl相关的,LoadDataCommand、SaveIntoDataSourceCommand除外
}
case r: DataWritingCommandExec =>{
// 数据写入相关,例如: insert into,create table as,save into datasource
}
case r: WriteToDataSourceV2Exec => {
// stream相关,比如kafka。建议此处忽略`MicroBatchWriter`流相关的血缘。
}
case ignore => {}
以下具体分析:
4.3 执行过程之RunnableCommand子类解析
分析下ExecutedCommandExec
的成员的RunnableCommand
case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode
// 举例子
case class SaveIntoDataSourceCommand(
query: LogicalPlan, // source信息从LogicalPlan获取
dataSource: CreatableRelationProvider, // 判断类型
options: Map[String, String], // sink信息从此map获取
mode: SaveMode) extends RunnableCommand
case class LoadDataCommand(
table: TableIdentifier, // 表
path: String, // 路径
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends RunnableCommand
case class TruncateTableCommand(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand
// 剩下的都的大同小异。
4.4 执行过程之DataWritingCommand子类解析
分析下DataWritingCommandExec
成员DataWritingCommand
case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) extends SparkPlan
trait DataWritingCommand extends Command
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable, // sink表
query: LogicalPlan, // source信息从LogicalPlan获取
outputColumnNames: Seq[String],
mode: SaveMode)
extends DataWritingCommand
case class CreateDataSourceTableAsSelectCommand(
table: CatalogTable, // sink表
mode: SaveMode,
query: LogicalPlan, // source信息从LogicalPlan获取
outputColumnNames: Seq[String])
extends DataWritingCommand
case class InsertIntoHiveDirCommand(
isLocal: Boolean,
storage: CatalogStorageFormat, // location 信息
query: LogicalPlan, // source信息从LogicalPlan获取
overwrite: Boolean,
outputColumnNames: Seq[String]) extends SaveAsHiveFile
// 剩下的都的大同小异。
可以看到血缘信息都藏在query: LogicalPlan
中。
重点来了,LogicalPlan分析:
4.4.1 执行过程之LogicalPlan解析血缘
随意找个LogicalPlan
的子类InsertIntoTable
看看。
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean)
extends LogicalPlan
子类太多了分析不过来。
解决:分析血缘从logicalPlan入手采用collectLeaves
方法即可,收集最末端的叶子节点。
val children = logicalPlan.collectLeaves()
val res: Seq[BaseEntityElement] = children.flatMap {
case r: HiveTableRelation => {
// hive metastore table
// 只有一个单表
}
case v: View => {
// hive metastore view
// 只有一个单视图
}
case UnresolvedRelation(tblId) => {
// case class UnresolvedRelation(tableIdentifier: TableIdentifier) extends LeafNode extends IdentifierWithDatabase
// 就是表信息,但是不含列信息等 case class TableIdentifier(table: String, database: Option[String]),含表名称,可能含数据库名称
// 如果需要表详细信息可从SparkSession获取。
}
case JDBCParser(jdbcs) => {
// jdbc source
// 可能多个jdbc表或者模糊匹配
}
case KafkaParser(kafkas) => {
// kafka source
// 可能多个topic或者模糊匹配
}
case LogicalRelation(relation, _, catalogTable, _) =>
// must be at last!!!
if (catalogTable.isDefined) {
// metastore 表信息
} else relation match {
case fileRelation: FileRelation => // hdfs路径:文件或者文件夹信息
case _ => Seq.empty
}
case l: LocalRelation =>{
// 无法被识别到的认定是本地输入 ,即写死的数据。
}
case e =>
LOG.warn(s"Missing unknown leaf node: $e")
Seq.empty
}
res
}
解析jdbc和kafka信息比较复杂,spark实体类可能是包级private,无法直接访问需要使用反射获取成员内容,具体参考spark-atlas-connector项目。com.hortonworks.spark.atlas.sql.CommandsHarvester.JDBCEntities
com.hortonworks.spark.atlas.sql.CommandsHarvester.KafkaEntities
本例只是利用了scala的match case - unapply的模式匹配特性使代码更好看,unapply方法返回Some(实体类)则匹配成功,返回None则匹配失败,会继续下一个case情形。
object JDBCParser {
private val JDBC_RELATION_CLASS_NAME =
"org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation"
private val JDBC_PROVIDER_CLASS_NAME =
"org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider"
def unapply(plan: LogicalPlan): Option[自定义Table实体类] = {
// 具体参考altas
}
4.5 执行过程之DataSourceWriter子类解析
分析下WriteToDataSourceV2Exec
的成员DataSourceWriter
case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan
public interface DataSourceWriter{ ... }
// stream的写入类信息,
class MicroBatchWriter(batchId: Long, val writer: StreamWriter) extends DataSourceWriter
// StreamWriter的实现之一就是KafkaWriterCommitMessage,能获取到topic和boostrapServers
class KafkaStreamWriter(topic: Option[String], producerParams: Map[String, String], schema: StructType) StreamWriter
// 可以获取到topic和boostrapServers。
class KafkaStreamWriter(topic: Option[String], producerParams: Map[String, String], schema: StructType) StreamWriter
}
// 控制台输出,也是以stream方式
class ConsoleWriter(schema: StructType, options: DataSourceOptions)
extends StreamWriter
4.6 流处理血缘实现
实现org.apache.spark.sql.streaming.StreamingQueryListener
即可。
需要注意的是:
流执行过程中每隔一段时间onQueryProgress
就会触发一次,所以需增加设计,记录首次触发,忽略后续所有触发。
// 只能使用onQueryProgress方法,onQueryStarted和onQueryTerminated获取不到血缘信息
def onQueryProgress(event: QueryProgressEvent): Unit={
val query: StreamingQuery = SparkSession.active.streams.get(event.progress.id)
if (query != null) {
val qd = query match {
case query: StreamingQueryWrapper =>
...
case query: StreamExecution =>
...
case _ =>
LOG.warn(s"Unexpected type of streaming query: ${query.getClass}")
None
}}
4.7 spark血缘的debug方式
十分简单,本地debug即可,以idea为例:
因为spark driver运行在本地debug的jvm
如下内容即可:
val builder = SparkSession.builder
builder
.appName("lk-spark-local")
.master("local[*]")
.config("spark.sql.queryExecutionListeners", "com.test.MySparkEventTracker")
// .config("spark.extraListeners", "com.test.MySparkEventTracker")
.config("spark.sql.streaming.streamingQueryListeners", "com.test.MySparkStreamingEventTracker")
.config("spark.sql.streaming.checkpointLocation","hdfs:///tmp/spark/chkp") // 本地debug需要设置此项否则会出现checkpoint找不到情况。
.enableHiveSupport() //开启hive metastore支持
val spark = builder.getOrCreate()
spark.sql("your sql")
// sql or dsl are all ok to run!
val jdbcDataframe = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true")
.option("dbtable", "test.sometbl")
.option("user", "root")
.option("password", "123456")
.load()
// 注意以下内容!!!
spark.sql("some big query should follow your lineage test sql or code!!!")
需要注意的是:
因为本地调用spark在spark执行后会直接退出spark session主程序,为了延缓spark session退出,需要在测试血缘的sql或者code后,跟一个大sql,运行时间最好控制在2s以上。
4.8 集群部署
所有节点添加jar包,并修改spark-defaults.conf
添加配置项
# spark.extraListeners com.test.MySparkEventTracker
spark.sql.queryExecutionListeners com.test.MySparkEventTracker
spark.sql.streaming.streamingQueryListeners com.test.MySparkStreamingEventTracker
4.8 spark血缘后语
类org.apache.spark.scheduler.SparkListener
也是监听器,但其主要监听任务信息,其onOtherEvent
也能监听query等事件,稍麻烦,本例不采用。
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
五、参考文章
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 【hive和spark】hive和spark数据lineage血缘实现思路
发表评论 取消回复