123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- package com.kcim.service.impl;
- import com.kcim.common.constants.NumberConstant;
- import com.kcim.common.constants.SQLParameter;
- import com.kcim.common.exception.CostException;
- import com.kcim.common.util.SnowflakeUtil;
- import com.kcim.common.util.UserContext;
- import com.kcim.dao.model.Sql;
- import com.kcim.dao.repository.SqlRepository;
- import com.kcim.service.CenterService;
- import com.kcim.service.SqlService;
- import com.kcim.vo.DictDataVo;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.ibatis.jdbc.SqlRunner;
- import org.pentaho.di.core.KettleEnvironment;
- import org.pentaho.di.core.exception.KettleException;
- import org.pentaho.di.core.logging.LogLevel;
- import org.pentaho.di.core.parameters.UnknownParamException;
- import org.pentaho.di.job.Job;
- import org.pentaho.di.job.JobMeta;
- import org.pentaho.di.trans.Trans;
- import org.pentaho.di.trans.TransMeta;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import org.springframework.util.CollectionUtils;
- import org.springframework.util.ObjectUtils;
- import org.springframework.util.StringUtils;
- import java.io.File;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.SQLException;
- import java.util.*;
- import java.util.stream.Collectors;
- import static com.kcim.common.constants.Constant.CUSTOMIZE_SQL_TYPE;
- /**
- * @program: CostAccount
- * @description:
- * @author: Wang.YS
- * @create: 2024-03-19 11:10
- **/
- @Service("SqlService")
- @Slf4j
- public class SqlServiceImpl implements SqlService {
- @Value("${spring.datasource.driver-class-name}")
- private String driver;
- @Value("${spring.datasource.url}")
- private String url;
- @Value("${spring.datasource.username}")
- private String username;
- @Value("${spring.datasource.password}")
- private String password;
- public SqlServiceImpl(SqlRepository repository, CenterService centerService) {
- this.repository = repository;
- this.centerService = centerService;
- }
- SqlRepository repository;
- CenterService centerService;
- /**
- * @return
- */
- @Override
- public Object getSqlType() {
- DictDataVo dict = centerService.getDict(CUSTOMIZE_SQL_TYPE);
- if(Objects.isNull(dict)){
- throw new CostException("未找到【取数项目类型】字典");
- }
- List<DictDataVo> dataVoList = dict.getDataVoList();
- if(CollectionUtils.isEmpty(dataVoList)){
- throw new CostException("未找到【取数项目类型】字典");
- }
- return dataVoList;
- }
- /**
- * @param sqlType
- * @param sqlDefinition
- * @return
- */
- @Override
- public Object getSql(String sqlType, String sqlDefinition) {
- List<Sql> byFilter = repository.getByFilter(sqlType, sqlDefinition);
- if(!CollectionUtils.isEmpty(byFilter)){
- byFilter.sort(Comparator.comparing(Sql::getSqlType).thenComparing(Sql::getSort));
- return byFilter;
- }
- return new ArrayList<>();
- }
- /**
- * @param sql
- */
- @Override
- public void addSql(Sql sql) {
- sql.setHospId(UserContext.getHospId());
- sql.setCreateTime(new Date());
- sql.setCreateUser(String.valueOf(UserContext.getCurrentUser().getId()));
- sql.setStatus(NumberConstant.ONE);
- sql.setCode(SnowflakeUtil.getId());
- repository.save(sql);
- }
- /**
- * @param sql
- */
- @Override
- public void editSql(Sql sql) {
- Integer id = sql.getId();
- if(id ==null){
- throw new CostException("编辑时主键必填");
- }
- Sql byId = repository.getById(id);
- if(Objects.nonNull(byId)){
- byId.setSqlCode(sql.getSqlCode());
- byId.setSqlDefinition(sql.getSqlDefinition());
- byId.setSql(sql.getSql());
- byId.setStatus(sql.getStatus());
- byId.setSort(sql.getSort());
- byId.setUpdateUser(String.valueOf(UserContext.getCurrentUser().getId()));
- byId.setUpdateTime(new Date());
- repository.updateById(byId);
- }
- }
- /**
- * @param id
- */
- @Override
- public void deleteSql(Integer id) {
- if(id ==null){
- throw new CostException("删除时主键必填");
- }
- repository.deleteSql(id);
- }
- /**
- * @param sql
- */
- @Override
- public void sortSql(List<Sql> sql) {
- List<Sql> collect = sql.stream().filter(f -> f.getId() != null).collect(Collectors.toList());
- sql.removeAll(collect);
- if(!CollectionUtils.isEmpty(sql)){
- //插入的数据
- sql.forEach(this::addSql);
- }
- if(!CollectionUtils.isEmpty(collect)){
- for (Sql sql1 : collect) {
- sql1.setUpdateUser(String.valueOf(UserContext.getCurrentUser().getId()));
- sql1.setUpdateTime(new Date());
- }
- repository.updateBatchById(collect);
- }
- }
- @Override
- public List<Sql> getSqlBySqlType(String sqlType) {
- return repository.getSqlBySqlType(sqlType);
- }
- /**
- * 调用自定义SQL
- * @param sqlType
- * @param parameter
- */
- @Override
- public void autoExecuteSql(String sqlType, Map<String, String> parameter) {
- try{
- List<Sql> sqlList = getSqlBySqlType(sqlType);
- //取出需要执行的sql
- if(!CollectionUtils.isEmpty(sqlList)){
- sqlList.sort(Comparator.comparing(Sql::getSort,Comparator.nullsLast(Integer::compareTo)));
- for(Sql sql:sqlList){
- if(sql.getDataSourceType().equals(NumberConstant.TWO)) {
- //kettle转换
- execKettleTrans(sql, sqlType, parameter);
- }else if(sql.getDataSourceType().equals(NumberConstant.THREE)){
- //kettle作业
- execKettleJobs(sql, sqlType, parameter);
- }else{
- //自定义SQL脚本
- execSqlScript(sql, sqlType, parameter);
- }
- }
- }
- } catch (KettleException e) {
- throw new CostException(String.format("%s执行失败:%s",sqlType,e.getMessage()));
- }
- }
- /**
- * 执行kettle转换
- * @param sql
- * @param sqlType
- * @param parameter
- */
- public void execKettleTrans(Sql sql,String sqlType, Map<String, String> parameter) throws KettleException {
- // 初始化Kettle环境
- KettleEnvironment.init();
- // 指定作业文件的路径
- String filePath = sql.getSql();
- File transFile = new File(filePath);
- // 检查作业文件是否存在
- if (!transFile.exists()) {
- throw new CostException(String.format("%s转换文件不存在!",filePath));
- }
- // 加载转换文件
- TransMeta transMeta = new TransMeta(filePath);
- Trans trans = new Trans(transMeta);
- //添加参数
- parameter.forEach((key,value)-> {
- try {
- trans.setParameterValue(key,value );
- } catch (UnknownParamException e) {
- e.printStackTrace();
- }
- });
- //必须要有ParentJob,不然执行会报错
- if(ObjectUtils.isEmpty(trans.getParentJob())) {
- Job job = new Job();
- job.setName("rootJob");
- trans.setParentJob(job);
- }
- // 设置日志级别
- trans.setLogLevel(LogLevel.BASIC);
- // 执行转换
- trans.execute(null);
- trans.waitUntilFinished();
- // 检查转换执行情况
- if (trans.getErrors() > 0) {
- throw new CostException(String.format("%s转换执行出错:%s",sql.getSqlDefinition(),trans.getErrors()));
- }
- }
- /**
- * 执行kettle作业
- * @param sql
- * @param sqlType
- * @param parameter
- */
- public void execKettleJobs(Sql sql,String sqlType, Map<String, String> parameter) throws KettleException {
- // 初始化 Kettle 环境
- KettleEnvironment.init();
- // 指定作业文件的路径
- String jobFilePath = sql.getSql();
- File jobFile = new File(jobFilePath);
- // 检查作业文件是否存在
- if (!jobFile.exists()) {
- throw new CostException(String.format("%s作业文件不存在!",jobFilePath));
- }
- // 加载作业元数据
- JobMeta jobMeta = new JobMeta(jobFilePath, null);
- // 创建作业对象
- Job job = new Job(null, jobMeta);
- //添加参数
- parameter.forEach((key,value)-> {
- try {
- job.setParameterValue(key,value );
- } catch (UnknownParamException e) {
- e.printStackTrace();
- }
- });
- // 执行作业
- job.start();
- // 等待作业完成
- job.waitUntilFinished();
- // 检查作业是否成功
- if (job.getErrors() > 0) {
- throw new CostException(String.format("%s作业执行出错:%s",sql.getSqlDefinition(),job.getErrors()));
- }
- }
- /**
- * 执行自定义SQL脚本
- * @param sql
- * @param sqlType
- * @param parameter
- */
- public void execSqlScript(Sql sql,String sqlType, Map<String, String> parameter) {
- String executeSql = sql.getSql();
- executeSql = MatchSystemParameter(executeSql);
- //替换传参
- if(!CollectionUtils.isEmpty(parameter)){
- for(String s:parameter.keySet()){
- //拼接 #
- String sqlFilter = "#" + s;
- if (executeSql.contains(sqlFilter)) {
- executeSql = executeSql.replace(sqlFilter, parameter.get(s));
- }
- }
- }
- try {
- SqlRunner sqlRunner = new SqlRunner(getConnection());
- log.info("执行的语句"+sqlType+":"+executeSql);
- sqlRunner.run(executeSql);
- } catch (SQLException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- private Connection getConnection() throws SQLException, ClassNotFoundException {
- Class.forName(driver);
- return DriverManager.getConnection(url, username, password);
- }
- private static String MatchSystemParameter(String sql) {
- if(StringUtils.isEmpty(sql)){
- throw new CostException("无效自定义sql语句");
- }
- if (sql.contains(SQLParameter.HOSP_ID_CODE)) {
- sql = sql.replace(SQLParameter.HOSP_ID_CODE, String.valueOf(UserContext.getCurrentLoginHospId()));
- }
- return sql;
- }
- }
|