前言

公司购买的数语科技的商业服务的血缘分析系统,然而由于对产品定位的原因,导致其对简单的类似于select * 的语法并不兼容。而且其功能始终有点小问题如鲠在喉。因此起了自研的心思。然后和之前专门负责技术模块的人讨论一下,之前也确认是有这样的想法,但是由于难度过大就没有搞了。正好这边也实现一下。

目标

由于数仓的特殊性,它并不像传统java开发是可以切割得非常独立的,往往是具有上下游依赖的特点。因此梳理清楚上下游依赖便就是数仓日常运维中一个非常重要的一环。虽然说调度的本身就是一个依赖抽象,但还是很难保证依赖本身有没有问题。

方式

从血缘粒度的角度,可以做到表粒度或者字段粒度。表粒度顾名思义,就是仅解析到表跟表之间关系,从粒度来说没那么细,但好处就是解析难度没那么大,且不容易因为过于复杂的关系绘制导致前端爆炸。如果做到字段粒度的话就是到表之间的字段与字段之间关系了,好处就是你能够掌握到最底层的字段依赖,如果你需要修改那一块逻辑的可以很方便的查询到字段关系。但问题是当后期表的复杂度上来之后,容易到导致过多的关系进而导致前端的绘制的压力爆炸。另外实现的复杂度也要比基于表的血缘分析复杂很多。
从技术上来说分为两种实现方式。一种是基于动态运行的方式来执行,典型代表就是hadoop体系下的血缘分析工具atlas。优点就是基于动态方式获得的血缘一定是最准的,而且可以直接脱离作业要脚本,是最客观的。但弊病是对环境的依赖非常严格,hive版本和atlas的版本稍有对应不上可能就整体不可用,而且也并不是全部的环境都可以用这种动态分析的执行方法来执行血缘分析。比如像一些原始的gauss数仓或者doris数仓就不存在这种功能。
然后基于脚本代码的静态解析方式。有点就是不依赖执行环境,只要有脚本就可以运行。但弊端正好也就是动态方式的优点,另外基于静态解析的话由于其依旧是使用编码的方式,所以也依赖于环境版本,比如环境升级了导致脚本中支持了一些原来不兼容解析的脚本,就导致血缘关系的解析失败了。不过整体来说,静态的方式适用面还是更广,实际上这也是更多商业公司实现的策略。我也采用静态解析的方式来进行脚本分析。

组件调研

之前在做了解hadoop的时侯了解到hadoop实现sql解析的解析工具使用calcite。然后还有一个很著名的连接池druid,其中里面也实现了sql解析功能。因此主要就是围绕这个来进行设计与开发的。

开发

基于calicite的方式

加入依赖

    <dependency>
      <groupId>org.apache.calcite</groupId>
      <artifactId>calcite-core</artifactId>
      <version>1.34.0</version>
    </dependency>
    <dependency>
      <groupId>com.mysql</groupId>
      <artifactId>mysql-connector-j</artifactId>
      <version>8.0.33</version> 
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.2.26</version> 
    </dependency>
    <dependency>

上述依赖同时引入calcite和druid。

sqlAnalCalicate

package org.example.impl;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlConformanceEnum;

public class sqlAnalCalicate {
    public static boolean containsWord(String text, String word) {
        String pattern = "\\b" + word + "\\b";
        return text.matches("(?i).*" + pattern + ".*"); // (?i) 表示忽略大小写
    }
    public static SqlNode GetAnal(String key,String scripts){
        SqlParser.Config parserConfig = SqlParser.config()
                .withConformance(SqlConformanceEnum.DEFAULT); // 设置SQL规范
        scripts=scripts.replace("!=","<>");
        SqlParser parser = SqlParser.create(scripts, parserConfig);
        SqlNode sqlNode=null;
        try{
            sqlNode = parser.parseQuery();
        }catch (SqlParseException e) {
            System.out.println(key);
            e.printStackTrace();
        }
        return sqlNode;
    }
    public static void basechannle(String name,String[] scrips){
        for (String script :scrips){
            if(script !=null) {
                if ( containsWord(script, "truncate") || containsWord(script,"delete")){
                    //不分析清空表或删除语句
                    continue;
                }
                SqlNode sqlNode=GetAnal(name,script);
                System.out.println(sqlNode.toString());
            }
        }
    }
}

其中,代码中的SqlParser.config() .withConformance(SqlConformanceEnum.DEFAULT 就可以指定需要使用的方言了。
其解析出来的结构是一个嵌套型的树状结构,在这一点上其calcite和druid都差不多。但是calcite有个非常扯淡的地方是它并不原生支持utf-8编码,这就很扯。这也是为什么后期转用durid的原因。

druid方式

sqlAnalDruid

package org.example.impl;
import com.alibaba.druid.DbType;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUseStatement;
import com.alibaba.druid.sql.dialect.hive.parser.HiveStatementParser;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.druid.sql.parser.ParserException;
import com.alibaba.druid.sql.parser.SQLParserUtils;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.alibaba.druid.sql.visitor.SchemaStatVisitor;
import com.alibaba.druid.stat.TableStat;
import com.alibaba.druid.util.JdbcConstants;
import org.example.visitor.CustomHiveSchemaStatVisitor;

import java.util.*;

public class sqlAnalDruid {
    public static boolean containsWord(String text, String word) {
        String pattern = "\\b" + word + "\\b";
        return text.matches("(?i).*" + pattern + ".*"); // (?i) 表示忽略大小写
    }
    public static List<SQLInsertStatement> GetAnal(String key, String scripts){
        List<SQLInsertStatement> result = new ArrayList<>();
        SQLStatement selectStmt=null;
//            SQLStatementParser parser = SQLParserUtils.createSQLStatementParser(scripts, DbType.hive);
//            List<SQLStatement>  aa = parser.parseStatement();
//             selectStmt =  parser.parseStatement();
            List<SQLStatement> statementList = SQLUtils.parseStatements(scripts, DbType.hive);
            for (SQLStatement statement : statementList){
                try{
                    SQLInsertStatement temp = (SQLInsertStatement) statement;
                    result.add(temp);
                }
                catch (Exception e ){
                    continue;
                }
//                System.out.println(statement.toString());
            }

        return result;
    }

    public static void getFromTo (String sql,Map<String, Set<String>> result) throws ParserException {
        HiveStatementParser parser = new HiveStatementParser(sql);
        List<SQLStatement> stmts = parser.parseStatementList();
        TreeSet<String> fromSet = new TreeSet<>();
        TreeSet<String> toSet = new TreeSet<>();
//        if (stmts == null) {
//
//        }

        String database = "DEFAULT";
        for (SQLStatement stmt : stmts) {
            CustomHiveSchemaStatVisitor statVisitor = new CustomHiveSchemaStatVisitor();
            if (stmt instanceof SQLUseStatement) {
                database = ((SQLUseStatement) stmt).getDatabase().getSimpleName().toUpperCase();
            }
            stmt.accept(statVisitor);
            Map<TableStat.Name, TableStat> tables = statVisitor.getTables();
            if (tables != null) {
                final String db = database;
//                for(for)
                String toTable=null ;
                Set<String> fromTables= new HashSet<>();
                for(TableStat.Name key: tables.keySet()){
                    TableStat temp =tables.get(key);
                    if(temp.getCreateCount()>0||temp.getInsertCount()>0){
                        toTable= key.getName().toUpperCase();
                    }else if (temp.getSelectCount() > 0){
                        fromTables.add(key.getName().toUpperCase());
                    }

                }
                Set<String> tt = result.get(toTable);
                if(tt ==null){
                    result.put(toTable,fromTables);
                }else {
                    for(String i:fromTables){
                        tt.add(i);
                    }
                }
//                tables.forEach((tableName, stat) -> {
//                    if (stat.getCreateCount() > 0 || stat.getInsertCount() > 0) {
//                        String to = tableName.getName().toUpperCase();
//                        if (!to.contains("."))
//                            to = db + "." + to;
//                        toSet.add(to);
//
//                    } else if (stat.getSelectCount() > 0) {
//                        String from = tableName.getName().toUpperCase();
//                        if (!from.contains(".")){
//                            from = db + "." + from;}
//                        fromSet.add(from);
//                    }
//                });
            }
        }
//        System.out.println(1);

    }



        public static void basechannle(String name,String scrips){
        List<SQLInsertStatement> sqlNode=GetAnal(name,scrips);
        System.out.println(sqlNode.toString());

    }
}

CustomHiveSchemaStatVisitor

package org.example.visitor;


import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr;
import com.alibaba.druid.sql.dialect.hive.visitor.HiveSchemaStatVisitor;

public class CustomHiveSchemaStatVisitor extends HiveSchemaStatVisitor {

    public CustomHiveSchemaStatVisitor() {
        super(); // 不需要显式传入 HiveASTVisitorImpl
    }

    /**
     * 重写 visit 方法,忽略自定义 UDF/UDTF
     */
    @Override
    public boolean visit(SQLMethodInvokeExpr x) {
        // 替换为你的自定义 UDF/UDTF 名称
        if ("DAY".equals(x.getMethodName()) ||
                "YEAR".equals(x.getMethodName())||
        "MONTH".equals(x.getMethodName())) {
            return true; // 标记为已处理,跳过默认逻辑
        }
        return super.visit(x);
    }
}

对于druid来说,就灵活一些了,你会发现在主程序sqlAnalDruid 中实现了两个方法,一个是GetAnal使用这种方式生成出来的方式就是类似于calcite的那种查询体解析集合。如果需要做深度的血缘开发,用这个方法信息带出来的内容会更全面的。
另一个则是使用visitor方法来进行遍历的getFromTo方法。使用getFormTo方法的话则会更简单,但是信息相对就比较少,在这里我就只提取了表级别的信息。
另外还有一点是这里使用自定义的visitor方法,由于我们解析的不是hql,而是和hql很相近的doris sql,但毕竟还是有一些方言上的区别,因此需要无视一定方言上区别。这里将不兼容的语句视为udf来处理了。
最后项目整理后会放上git,敬请期待.jpg