package com.kcim.service.impl; import com.kcim.common.constants.NumberConstant; import com.kcim.common.constants.SQLParameter; import com.kcim.common.exception.CostException; import com.kcim.common.util.SnowflakeUtil; import com.kcim.common.util.UserContext; import com.kcim.dao.model.Sql; import com.kcim.dao.repository.SqlRepository; import com.kcim.service.CenterService; import com.kcim.service.SqlService; import com.kcim.vo.DictDataVo; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.jdbc.SqlRunner; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LogLevel; import org.pentaho.di.core.parameters.UnknownParamException; import org.pentaho.di.job.Job; import org.pentaho.di.job.JobMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import java.io.File; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; import static com.kcim.common.constants.Constant.CUSTOMIZE_SQL_TYPE; /** * @program: CostAccount * @description: * @author: Wang.YS * @create: 2024-03-19 11:10 **/ @Service("SqlService") @Slf4j public class SqlServiceImpl implements SqlService { @Value("${spring.datasource.driver-class-name}") private String driver; @Value("${spring.datasource.url}") private String url; @Value("${spring.datasource.username}") private String username; @Value("${spring.datasource.password}") private String password; public SqlServiceImpl(SqlRepository repository, CenterService centerService) { this.repository = repository; this.centerService = centerService; } SqlRepository repository; CenterService centerService; /** * @return */ @Override public Object getSqlType() { DictDataVo dict = centerService.getDict(CUSTOMIZE_SQL_TYPE); if(Objects.isNull(dict)){ throw new CostException("未找到【取数项目类型】字典"); } List dataVoList = dict.getDataVoList(); if(CollectionUtils.isEmpty(dataVoList)){ throw new CostException("未找到【取数项目类型】字典"); } return dataVoList; } /** * @param sqlType * @param sqlDefinition * @return */ @Override public Object getSql(String sqlType, String sqlDefinition) { List byFilter = repository.getByFilter(sqlType, sqlDefinition); if(!CollectionUtils.isEmpty(byFilter)){ byFilter.sort(Comparator.comparing(Sql::getSqlType).thenComparing(Sql::getSort)); return byFilter; } return new ArrayList<>(); } /** * @param sql */ @Override public void addSql(Sql sql) { sql.setHospId(UserContext.getHospId()); sql.setCreateTime(new Date()); sql.setCreateUser(String.valueOf(UserContext.getCurrentUser().getId())); sql.setStatus(NumberConstant.ONE); sql.setCode(SnowflakeUtil.getId()); repository.save(sql); } /** * @param sql */ @Override public void editSql(Sql sql) { Integer id = sql.getId(); if(id ==null){ throw new CostException("编辑时主键必填"); } Sql byId = repository.getById(id); if(Objects.nonNull(byId)){ byId.setSqlCode(sql.getSqlCode()); byId.setSqlDefinition(sql.getSqlDefinition()); byId.setSql(sql.getSql()); byId.setStatus(sql.getStatus()); byId.setSort(sql.getSort()); byId.setUpdateUser(String.valueOf(UserContext.getCurrentUser().getId())); byId.setUpdateTime(new Date()); repository.updateById(byId); } } /** * @param id */ @Override public void deleteSql(Integer id) { if(id ==null){ throw new CostException("删除时主键必填"); } repository.deleteSql(id); } /** * @param sql */ @Override public void sortSql(List sql) { List collect = sql.stream().filter(f -> f.getId() != null).collect(Collectors.toList()); sql.removeAll(collect); if(!CollectionUtils.isEmpty(sql)){ //插入的数据 sql.forEach(this::addSql); } if(!CollectionUtils.isEmpty(collect)){ for (Sql sql1 : collect) { sql1.setUpdateUser(String.valueOf(UserContext.getCurrentUser().getId())); sql1.setUpdateTime(new Date()); } repository.updateBatchById(collect); } } @Override public List getSqlBySqlType(String sqlType) { return repository.getSqlBySqlType(sqlType); } /** * 调用自定义SQL * @param sqlType * @param parameter */ @Override public void autoExecuteSql(String sqlType, Map parameter) { try{ List sqlList = getSqlBySqlType(sqlType); //取出需要执行的sql if(!CollectionUtils.isEmpty(sqlList)){ sqlList.sort(Comparator.comparing(Sql::getSort,Comparator.nullsLast(Integer::compareTo))); for(Sql sql:sqlList){ if(sql.getDataSourceType().equals(NumberConstant.TWO)) { //kettle转换 execKettleTrans(sql, sqlType, parameter); }else if(sql.getDataSourceType().equals(NumberConstant.THREE)){ //kettle作业 execKettleJobs(sql, sqlType, parameter); }else{ //自定义SQL脚本 execSqlScript(sql, sqlType, parameter); } } } } catch (KettleException e) { throw new CostException(String.format("%s执行失败:%s",sqlType,e.getMessage())); } } /** * 执行kettle转换 * @param sql * @param sqlType * @param parameter */ public void execKettleTrans(Sql sql,String sqlType, Map parameter) throws KettleException { // 初始化Kettle环境 KettleEnvironment.init(); // 指定作业文件的路径 String filePath = sql.getSql(); File transFile = new File(filePath); // 检查作业文件是否存在 if (!transFile.exists()) { throw new CostException(String.format("%s转换文件不存在!",filePath)); } // 加载转换文件 TransMeta transMeta = new TransMeta(filePath); Trans trans = new Trans(transMeta); //添加参数 parameter.forEach((key,value)-> { try { trans.setParameterValue(key,value ); } catch (UnknownParamException e) { e.printStackTrace(); } }); //必须要有ParentJob,不然执行会报错 if(ObjectUtils.isEmpty(trans.getParentJob())) { Job job = new Job(); job.setName("rootJob"); trans.setParentJob(job); } // 设置日志级别 trans.setLogLevel(LogLevel.BASIC); // 执行转换 trans.execute(null); trans.waitUntilFinished(); // 检查转换执行情况 if (trans.getErrors() > 0) { throw new CostException(String.format("%s转换执行出错:%s",sql.getSqlDefinition(),trans.getErrors())); } } /** * 执行kettle作业 * @param sql * @param sqlType * @param parameter */ public void execKettleJobs(Sql sql,String sqlType, Map parameter) throws KettleException { // 初始化 Kettle 环境 KettleEnvironment.init(); // 指定作业文件的路径 String jobFilePath = sql.getSql(); File jobFile = new File(jobFilePath); // 检查作业文件是否存在 if (!jobFile.exists()) { throw new CostException(String.format("%s作业文件不存在!",jobFilePath)); } // 加载作业元数据 JobMeta jobMeta = new JobMeta(jobFilePath, null); // 创建作业对象 Job job = new Job(null, jobMeta); //添加参数 parameter.forEach((key,value)-> { try { job.setParameterValue(key,value ); } catch (UnknownParamException e) { e.printStackTrace(); } }); // 执行作业 job.start(); // 等待作业完成 job.waitUntilFinished(); // 检查作业是否成功 if (job.getErrors() > 0) { throw new CostException(String.format("%s作业执行出错:%s",sql.getSqlDefinition(),job.getErrors())); } } /** * 执行自定义SQL脚本 * @param sql * @param sqlType * @param parameter */ public void execSqlScript(Sql sql,String sqlType, Map parameter) { String executeSql = sql.getSql(); executeSql = MatchSystemParameter(executeSql); //替换传参 if(!CollectionUtils.isEmpty(parameter)){ for(String s:parameter.keySet()){ //拼接 # String sqlFilter = "#" + s; if (executeSql.contains(sqlFilter)) { executeSql = executeSql.replace(sqlFilter, parameter.get(s)); } } } try { SqlRunner sqlRunner = new SqlRunner(getConnection()); log.info("执行的语句"+sqlType+":"+executeSql); sqlRunner.run(executeSql); } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } } private Connection getConnection() throws SQLException, ClassNotFoundException { Class.forName(driver); return DriverManager.getConnection(url, username, password); } private static String MatchSystemParameter(String sql) { if(StringUtils.isEmpty(sql)){ throw new CostException("无效自定义sql语句"); } if (sql.contains(SQLParameter.HOSP_ID_CODE)) { sql = sql.replace(SQLParameter.HOSP_ID_CODE, String.valueOf(UserContext.getCurrentLoginHospId())); } return sql; } }