Просмотр исходного кода

添加kettle作业及转换的调用功能

JammeyJiang 8 месяцев назад
Родитель
Сommit
b442c7a1e3

+ 21 - 1
pom.xml

@@ -314,7 +314,27 @@
             <artifactId>logstash-logback-encoder</artifactId>
             <version>5.3</version>
         </dependency>
-
+        <!--集成kettle-->
+        <dependency>
+            <groupId>com.kcim.third-party-libs</groupId>
+            <artifactId>kettle-core</artifactId>
+            <version>8.3.0.0-0.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.kcim.third-party-libs</groupId>
+            <artifactId>kettle-engine</artifactId>
+            <version>8.3.0.0-0.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.abashev</groupId>
+            <artifactId>commons-vfs2</artifactId>
+            <version>2.4.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.kcim.third-party-libs</groupId>
+            <artifactId>metastore</artifactId>
+            <version>8.3.0.0-371</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 9 - 4
src/main/java/com/kcim/dao/model/Sql.java

@@ -4,16 +4,15 @@ import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableLogic;
 import com.baomidou.mybatisplus.annotation.TableName;
-
-import java.io.Serializable;
-import java.util.Date;
-
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
 
+import java.io.Serializable;
+import java.util.Date;
+
 /**
  * sql定义表
  * 
@@ -46,6 +45,12 @@ public class Sql implements Serializable {
 	 * sql代码(新增时定义修改不变)
 	 */
 	private String sqlCode;
+
+	/**
+	 * 取数类型 1自定义SQL 2Kettle转换 3Kettle作业
+	 */
+	private Integer dataSourceType;
+
 	/**
 	 * 语句
 	 */

+ 146 - 20
src/main/java/com/kcim/service/impl/SqlServiceImpl.java

@@ -12,11 +12,21 @@ import com.kcim.service.SqlService;
 import com.kcim.vo.DictDataVo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.ibatis.jdbc.SqlRunner;
+import org.pentaho.di.core.KettleEnvironment;
+import org.pentaho.di.core.exception.KettleException;
+import org.pentaho.di.core.logging.LogLevel;
+import org.pentaho.di.core.parameters.UnknownParamException;
+import org.pentaho.di.job.Job;
+import org.pentaho.di.job.JobMeta;
+import org.pentaho.di.trans.Trans;
+import org.pentaho.di.trans.TransMeta;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
+import org.springframework.util.ObjectUtils;
 import org.springframework.util.StringUtils;
 
+import java.io.File;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -155,36 +165,152 @@ public class SqlServiceImpl implements SqlService {
         return repository.getSqlBySqlType(sqlType);
     }
 
+    /**
+     * 调用自定义SQL
+     * @param sqlType
+     * @param parameter
+     */
     @Override
     public void autoExecuteSql(String sqlType, Map<String, String> parameter) {
-        List<Sql> sqlList = getSqlBySqlType(sqlType);
-        //取出需要执行的sql
-        if(!CollectionUtils.isEmpty(sqlList)){
-            sqlList.sort(Comparator.comparing(Sql::getSort,Comparator.nullsLast(Integer::compareTo)));
-            for(Sql sql:sqlList){
-                String executeSql = sql.getSql();
-                executeSql = MatchSystemParameter(executeSql);
-                //替换传参
-                if(!CollectionUtils.isEmpty(parameter)){
-                    for(String s:parameter.keySet()){
-                        //拼接 #
-                        String sqlFilter = "#" + s;
-                        if (executeSql.contains(sqlFilter)) {
-                            executeSql = executeSql.replace(sqlFilter, parameter.get(s));
-                        }
+        try{
+            List<Sql> sqlList = getSqlBySqlType(sqlType);
+            //取出需要执行的sql
+            if(!CollectionUtils.isEmpty(sqlList)){
+                sqlList.sort(Comparator.comparing(Sql::getSort,Comparator.nullsLast(Integer::compareTo)));
+                for(Sql sql:sqlList){
+                    if(sql.getDataSourceType().equals(NumberConstant.TWO)) {
+                        //kettle转换
+                        execKettleTrans(sql, sqlType, parameter);
+                    }else if(sql.getDataSourceType().equals(NumberConstant.THREE)){
+                        //kettle作业
+                        execKettleJobs(sql, sqlType, parameter);
+                    }else{
+                        //自定义SQL脚本
+                        execSqlScript(sql, sqlType, parameter);
                     }
                 }
+            }
+        } catch (KettleException e) {
+            throw new CostException(String.format("%s执行失败:%s",sqlType,e.getMessage()));
+        }
+
+    }
+
+    /**
+     * 执行kettle转换
+     * @param sql
+     * @param sqlType
+     * @param parameter
+     */
+    public void execKettleTrans(Sql sql,String sqlType, Map<String, String> parameter) throws KettleException {
+            // 初始化Kettle环境
+            KettleEnvironment.init();
+            // 指定作业文件的路径
+            String filePath = sql.getSql();
+            File transFile = new File(filePath);
+
+            // 检查作业文件是否存在
+            if (!transFile.exists()) {
+                throw new CostException(String.format("%s转换文件不存在!",filePath));
+            }
+            // 加载转换文件
+            TransMeta transMeta = new TransMeta(filePath);
+            Trans trans = new Trans(transMeta);
+            //添加参数
+            parameter.forEach((key,value)-> {
                 try {
-                    SqlRunner sqlRunner = new SqlRunner(getConnection());
-                    log.info("执行的语句"+sqlType+":"+executeSql);
-                    sqlRunner.run(executeSql);
-                } catch (SQLException | ClassNotFoundException e) {
-                    throw new RuntimeException(e);
+                    trans.setParameterValue(key,value );
+                } catch (UnknownParamException e) {
+                    e.printStackTrace();
                 }
+            });
+            //必须要有ParentJob,不然执行会报错
+            if(ObjectUtils.isEmpty(trans.getParentJob())) {
+                Job job = new Job();
+                job.setName("rootJob");
+                trans.setParentJob(job);
+            }
+            // 设置日志级别
+            trans.setLogLevel(LogLevel.BASIC);
+            // 执行转换
+            trans.execute(null);
+            trans.waitUntilFinished();
+            // 检查转换执行情况
+            if (trans.getErrors() > 0) {
+                throw new CostException(String.format("%s转换执行出错:%s",sql.getSqlDefinition(),trans.getErrors()));
             }
+    }
+
+    /**
+     * 执行kettle作业
+     * @param sql
+     * @param sqlType
+     * @param parameter
+     */
+    public void execKettleJobs(Sql sql,String sqlType, Map<String, String> parameter) throws KettleException {
+        // 初始化 Kettle 环境
+        KettleEnvironment.init();
+
+        // 指定作业文件的路径
+        String jobFilePath = sql.getSql();
+        File jobFile = new File(jobFilePath);
+
+        // 检查作业文件是否存在
+        if (!jobFile.exists()) {
+            throw new CostException(String.format("%s作业文件不存在!",jobFilePath));
+        }
+        // 加载作业元数据
+        JobMeta jobMeta = new JobMeta(jobFilePath, null);
+        // 创建作业对象
+        Job job = new Job(null, jobMeta);
+        //添加参数
+        parameter.forEach((key,value)-> {
+            try {
+                job.setParameterValue(key,value );
+            } catch (UnknownParamException e) {
+                e.printStackTrace();
+            }
+        });
+        // 执行作业
+        job.start();
+        // 等待作业完成
+        job.waitUntilFinished();
+        // 检查作业是否成功
+        if (job.getErrors() > 0) {
+            throw new CostException(String.format("%s作业执行出错:%s",sql.getSqlDefinition(),job.getErrors()));
+        }
+    }
+
+    /**
+     * 执行自定义SQL脚本
+     * @param sql
+     * @param sqlType
+     * @param parameter
+     */
+    public void execSqlScript(Sql sql,String sqlType, Map<String, String> parameter) {
+        String executeSql = sql.getSql();
+        executeSql = MatchSystemParameter(executeSql);
+        //替换传参
+        if(!CollectionUtils.isEmpty(parameter)){
+            for(String s:parameter.keySet()){
+                //拼接 #
+                String sqlFilter = "#" + s;
+                if (executeSql.contains(sqlFilter)) {
+                    executeSql = executeSql.replace(sqlFilter, parameter.get(s));
+                }
+            }
+        }
+        try {
+            SqlRunner sqlRunner = new SqlRunner(getConnection());
+            log.info("执行的语句"+sqlType+":"+executeSql);
+            sqlRunner.run(executeSql);
+        } catch (SQLException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
         }
+
     }
 
+
     private Connection getConnection() throws SQLException, ClassNotFoundException {
         Class.forName(driver);
         return DriverManager.getConnection(url, username, password);