SqlServiceImpl.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package com.kcim.service.impl;
  2. import com.kcim.common.constants.NumberConstant;
  3. import com.kcim.common.constants.SQLParameter;
  4. import com.kcim.common.exception.CostException;
  5. import com.kcim.common.util.SnowflakeUtil;
  6. import com.kcim.common.util.UserContext;
  7. import com.kcim.dao.model.Sql;
  8. import com.kcim.dao.repository.SqlRepository;
  9. import com.kcim.service.CenterService;
  10. import com.kcim.service.SqlService;
  11. import com.kcim.vo.DictDataVo;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.ibatis.jdbc.SqlRunner;
  14. import org.pentaho.di.core.KettleEnvironment;
  15. import org.pentaho.di.core.exception.KettleException;
  16. import org.pentaho.di.core.logging.LogLevel;
  17. import org.pentaho.di.core.parameters.UnknownParamException;
  18. import org.pentaho.di.job.Job;
  19. import org.pentaho.di.job.JobMeta;
  20. import org.pentaho.di.trans.Trans;
  21. import org.pentaho.di.trans.TransMeta;
  22. import org.springframework.beans.factory.annotation.Value;
  23. import org.springframework.stereotype.Service;
  24. import org.springframework.util.CollectionUtils;
  25. import org.springframework.util.ObjectUtils;
  26. import org.springframework.util.StringUtils;
  27. import java.io.File;
  28. import java.sql.Connection;
  29. import java.sql.DriverManager;
  30. import java.sql.SQLException;
  31. import java.util.*;
  32. import java.util.stream.Collectors;
  33. import static com.kcim.common.constants.Constant.CUSTOMIZE_SQL_TYPE;
  34. /**
  35. * @program: CostAccount
  36. * @description:
  37. * @author: Wang.YS
  38. * @create: 2024-03-19 11:10
  39. **/
  40. @Service("SqlService")
  41. @Slf4j
  42. public class SqlServiceImpl implements SqlService {
  43. @Value("${spring.datasource.driver-class-name}")
  44. private String driver;
  45. @Value("${spring.datasource.url}")
  46. private String url;
  47. @Value("${spring.datasource.username}")
  48. private String username;
  49. @Value("${spring.datasource.password}")
  50. private String password;
  51. public SqlServiceImpl(SqlRepository repository, CenterService centerService) {
  52. this.repository = repository;
  53. this.centerService = centerService;
  54. }
  55. SqlRepository repository;
  56. CenterService centerService;
  57. /**
  58. * @return
  59. */
  60. @Override
  61. public Object getSqlType() {
  62. DictDataVo dict = centerService.getDict(CUSTOMIZE_SQL_TYPE);
  63. if(Objects.isNull(dict)){
  64. throw new CostException("未找到【取数项目类型】字典");
  65. }
  66. List<DictDataVo> dataVoList = dict.getDataVoList();
  67. if(CollectionUtils.isEmpty(dataVoList)){
  68. throw new CostException("未找到【取数项目类型】字典");
  69. }
  70. return dataVoList;
  71. }
  72. /**
  73. * @param sqlType
  74. * @param sqlDefinition
  75. * @return
  76. */
  77. @Override
  78. public Object getSql(String sqlType, String sqlDefinition) {
  79. List<Sql> byFilter = repository.getByFilter(sqlType, sqlDefinition);
  80. if(!CollectionUtils.isEmpty(byFilter)){
  81. byFilter.sort(Comparator.comparing(Sql::getSqlType).thenComparing(Sql::getSort));
  82. return byFilter;
  83. }
  84. return new ArrayList<>();
  85. }
  86. /**
  87. * @param sql
  88. */
  89. @Override
  90. public void addSql(Sql sql) {
  91. sql.setHospId(UserContext.getHospId());
  92. sql.setCreateTime(new Date());
  93. sql.setCreateUser(String.valueOf(UserContext.getCurrentUser().getId()));
  94. sql.setStatus(NumberConstant.ONE);
  95. sql.setCode(SnowflakeUtil.getId());
  96. repository.save(sql);
  97. }
  98. /**
  99. * @param sql
  100. */
  101. @Override
  102. public void editSql(Sql sql) {
  103. Integer id = sql.getId();
  104. if(id ==null){
  105. throw new CostException("编辑时主键必填");
  106. }
  107. Sql byId = repository.getById(id);
  108. if(Objects.nonNull(byId)){
  109. byId.setSqlCode(sql.getSqlCode());
  110. byId.setSqlDefinition(sql.getSqlDefinition());
  111. byId.setSql(sql.getSql());
  112. byId.setStatus(sql.getStatus());
  113. byId.setSort(sql.getSort());
  114. byId.setUpdateUser(String.valueOf(UserContext.getCurrentUser().getId()));
  115. byId.setUpdateTime(new Date());
  116. repository.updateById(byId);
  117. }
  118. }
  119. /**
  120. * @param id
  121. */
  122. @Override
  123. public void deleteSql(Integer id) {
  124. if(id ==null){
  125. throw new CostException("删除时主键必填");
  126. }
  127. repository.deleteSql(id);
  128. }
  129. /**
  130. * @param sql
  131. */
  132. @Override
  133. public void sortSql(List<Sql> sql) {
  134. List<Sql> collect = sql.stream().filter(f -> f.getId() != null).collect(Collectors.toList());
  135. sql.removeAll(collect);
  136. if(!CollectionUtils.isEmpty(sql)){
  137. //插入的数据
  138. sql.forEach(this::addSql);
  139. }
  140. if(!CollectionUtils.isEmpty(collect)){
  141. for (Sql sql1 : collect) {
  142. sql1.setUpdateUser(String.valueOf(UserContext.getCurrentUser().getId()));
  143. sql1.setUpdateTime(new Date());
  144. }
  145. repository.updateBatchById(collect);
  146. }
  147. }
  148. @Override
  149. public List<Sql> getSqlBySqlType(String sqlType) {
  150. return repository.getSqlBySqlType(sqlType);
  151. }
  152. /**
  153. * 调用自定义SQL
  154. * @param sqlType
  155. * @param parameter
  156. */
  157. @Override
  158. public void autoExecuteSql(String sqlType, Map<String, String> parameter) {
  159. try{
  160. List<Sql> sqlList = getSqlBySqlType(sqlType);
  161. //取出需要执行的sql
  162. if(!CollectionUtils.isEmpty(sqlList)){
  163. sqlList.sort(Comparator.comparing(Sql::getSort,Comparator.nullsLast(Integer::compareTo)));
  164. for(Sql sql:sqlList){
  165. if(sql.getDataSourceType().equals(NumberConstant.TWO)) {
  166. //kettle转换
  167. execKettleTrans(sql, sqlType, parameter);
  168. }else if(sql.getDataSourceType().equals(NumberConstant.THREE)){
  169. //kettle作业
  170. execKettleJobs(sql, sqlType, parameter);
  171. }else{
  172. //自定义SQL脚本
  173. execSqlScript(sql, sqlType, parameter);
  174. }
  175. }
  176. }
  177. } catch (KettleException e) {
  178. throw new CostException(String.format("%s执行失败:%s",sqlType,e.getMessage()));
  179. }
  180. }
  181. /**
  182. * 执行kettle转换
  183. * @param sql
  184. * @param sqlType
  185. * @param parameter
  186. */
  187. public void execKettleTrans(Sql sql,String sqlType, Map<String, String> parameter) throws KettleException {
  188. // 初始化Kettle环境
  189. KettleEnvironment.init();
  190. // 指定作业文件的路径
  191. String filePath = sql.getSql();
  192. File transFile = new File(filePath);
  193. // 检查作业文件是否存在
  194. if (!transFile.exists()) {
  195. throw new CostException(String.format("%s转换文件不存在!",filePath));
  196. }
  197. // 加载转换文件
  198. TransMeta transMeta = new TransMeta(filePath);
  199. Trans trans = new Trans(transMeta);
  200. //添加参数
  201. parameter.forEach((key,value)-> {
  202. try {
  203. trans.setParameterValue(key,value );
  204. } catch (UnknownParamException e) {
  205. e.printStackTrace();
  206. }
  207. });
  208. //必须要有ParentJob,不然执行会报错
  209. if(ObjectUtils.isEmpty(trans.getParentJob())) {
  210. Job job = new Job();
  211. job.setName("rootJob");
  212. trans.setParentJob(job);
  213. }
  214. // 设置日志级别
  215. trans.setLogLevel(LogLevel.BASIC);
  216. // 执行转换
  217. trans.execute(null);
  218. trans.waitUntilFinished();
  219. // 检查转换执行情况
  220. if (trans.getErrors() > 0) {
  221. throw new CostException(String.format("%s转换执行出错:%s",sql.getSqlDefinition(),trans.getErrors()));
  222. }
  223. }
  224. /**
  225. * 执行kettle作业
  226. * @param sql
  227. * @param sqlType
  228. * @param parameter
  229. */
  230. public void execKettleJobs(Sql sql,String sqlType, Map<String, String> parameter) throws KettleException {
  231. // 初始化 Kettle 环境
  232. KettleEnvironment.init();
  233. // 指定作业文件的路径
  234. String jobFilePath = sql.getSql();
  235. File jobFile = new File(jobFilePath);
  236. // 检查作业文件是否存在
  237. if (!jobFile.exists()) {
  238. throw new CostException(String.format("%s作业文件不存在!",jobFilePath));
  239. }
  240. // 加载作业元数据
  241. JobMeta jobMeta = new JobMeta(jobFilePath, null);
  242. // 创建作业对象
  243. Job job = new Job(null, jobMeta);
  244. //添加参数
  245. parameter.forEach((key,value)-> {
  246. try {
  247. job.setParameterValue(key,value );
  248. } catch (UnknownParamException e) {
  249. e.printStackTrace();
  250. }
  251. });
  252. // 执行作业
  253. job.start();
  254. // 等待作业完成
  255. job.waitUntilFinished();
  256. // 检查作业是否成功
  257. if (job.getErrors() > 0) {
  258. throw new CostException(String.format("%s作业执行出错:%s",sql.getSqlDefinition(),job.getErrors()));
  259. }
  260. }
  261. /**
  262. * 执行自定义SQL脚本
  263. * @param sql
  264. * @param sqlType
  265. * @param parameter
  266. */
  267. public void execSqlScript(Sql sql,String sqlType, Map<String, String> parameter) {
  268. String executeSql = sql.getSql();
  269. executeSql = MatchSystemParameter(executeSql);
  270. //替换传参
  271. if(!CollectionUtils.isEmpty(parameter)){
  272. for(String s:parameter.keySet()){
  273. //拼接 #
  274. String sqlFilter = "#" + s;
  275. if (executeSql.contains(sqlFilter)) {
  276. executeSql = executeSql.replace(sqlFilter, parameter.get(s));
  277. }
  278. }
  279. }
  280. try {
  281. SqlRunner sqlRunner = new SqlRunner(getConnection());
  282. log.info("执行的语句"+sqlType+":"+executeSql);
  283. sqlRunner.run(executeSql);
  284. } catch (SQLException | ClassNotFoundException e) {
  285. throw new RuntimeException(e);
  286. }
  287. }
  288. private Connection getConnection() throws SQLException, ClassNotFoundException {
  289. Class.forName(driver);
  290. return DriverManager.getConnection(url, username, password);
  291. }
  292. private static String MatchSystemParameter(String sql) {
  293. if(StringUtils.isEmpty(sql)){
  294. throw new CostException("无效自定义sql语句");
  295. }
  296. if (sql.contains(SQLParameter.HOSP_ID_CODE)) {
  297. sql = sql.replace(SQLParameter.HOSP_ID_CODE, String.valueOf(UserContext.getCurrentLoginHospId()));
  298. }
  299. return sql;
  300. }
  301. }