service.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # -*- coding: utf-8 -*-
  2. from app.core.exceptions import CustomException
  3. from app.utils.cron_util import CronUtil
  4. from app.utils.excel_util import ExcelUtil
  5. from app.api.v1.module_system.auth.schema import AuthSchema
  6. from .tools.ap_scheduler import SchedulerUtil
  7. from .crud import JobCRUD, JobLogCRUD
  8. from .schema import (
  9. JobCreateSchema,
  10. JobUpdateSchema,
  11. JobOutSchema,
  12. JobLogOutSchema,
  13. JobQueryParam,
  14. JobLogQueryParam
  15. )
  16. class JobService:
  17. """
  18. 定时任务管理模块服务层
  19. """
  20. @classmethod
  21. async def get_job_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  22. """
  23. 获取定时任务详情
  24. 参数:
  25. - auth (AuthSchema): 认证信息模型
  26. - id (int): 定时任务ID
  27. 返回:
  28. - Dict: 定时任务详情字典
  29. """
  30. obj = await JobCRUD(auth).get_obj_by_id_crud(id=id)
  31. return JobOutSchema.model_validate(obj).model_dump()
  32. @classmethod
  33. async def get_job_list_service(cls, auth: AuthSchema, search: JobQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
  34. """
  35. 获取定时任务列表
  36. 参数:
  37. - auth (AuthSchema): 认证信息模型
  38. - search (JobQueryParam | None): 查询参数模型
  39. - order_by (list[dict[str, str]] | None): 排序参数列表
  40. 返回:
  41. - List[Dict]: 定时任务详情字典列表
  42. """
  43. obj_list = await JobCRUD(auth).get_obj_list_crud(search=search.__dict__, order_by=order_by)
  44. return [JobOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  45. @classmethod
  46. async def create_job_service(cls, auth: AuthSchema, data: JobCreateSchema) -> dict:
  47. """
  48. 创建定时任务
  49. 参数:
  50. - auth (AuthSchema): 认证信息模型
  51. - data (JobCreateSchema): 定时任务创建模型
  52. 返回:
  53. - Dict: 定时任务详情字典
  54. """
  55. exist_obj = await JobCRUD(auth).get(name=data.name)
  56. if exist_obj:
  57. raise CustomException(msg='创建失败,该定时任务已存在')
  58. obj = await JobCRUD(auth).create_obj_crud(data=data)
  59. if not obj:
  60. raise CustomException(msg='创建失败,该数据定时任务不存在')
  61. SchedulerUtil().add_job(job_info=obj)
  62. return JobOutSchema.model_validate(obj).model_dump()
  63. @classmethod
  64. async def update_job_service(cls, auth: AuthSchema, id:int, data: JobUpdateSchema) -> dict:
  65. """
  66. 更新定时任务
  67. 参数:
  68. - auth (AuthSchema): 认证信息模型
  69. - id (int): 定时任务ID
  70. - data (JobUpdateSchema): 定时任务更新模型
  71. 返回:
  72. - dict: 定时任务详情字典
  73. """
  74. exist_obj = await JobCRUD(auth).get_obj_by_id_crud(id=id)
  75. if not exist_obj:
  76. raise CustomException(msg='更新失败,该定时任务不存在')
  77. if data.trigger == 'cron' and data.trigger_args and not CronUtil.validate_cron_expression(data.trigger_args):
  78. raise CustomException(msg=f'新增定时任务{data.name}失败, Cron表达式不正确')
  79. obj = await JobCRUD(auth).update_obj_crud(id=id, data=data)
  80. if not obj:
  81. raise CustomException(msg='更新失败,该数据定时任务不存在')
  82. SchedulerUtil().modify_job(job_id=obj.id)
  83. return JobOutSchema.model_validate(obj).model_dump()
  84. @classmethod
  85. async def delete_job_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  86. """
  87. 删除定时任务
  88. 参数:
  89. - auth (AuthSchema): 认证信息模型
  90. - ids (list[int]): 定时任务ID列表
  91. """
  92. if len(ids) < 1:
  93. raise CustomException(msg='删除失败,删除对象不能为空')
  94. for id in ids:
  95. exist_obj = await JobCRUD(auth).get_obj_by_id_crud(id=id)
  96. if not exist_obj:
  97. raise CustomException(msg='删除失败,该数据定时任务不存在')
  98. obj = await JobLogCRUD(auth).get(job_id=id)
  99. if obj:
  100. raise CustomException(msg=f'删除失败,该定时任务存 {exist_obj.name} 在日志记录')
  101. SchedulerUtil().remove_job(job_id=id)
  102. await JobCRUD(auth).delete_obj_crud(ids=ids)
  103. @classmethod
  104. async def clear_job_service(cls, auth: AuthSchema) -> None:
  105. """
  106. 清空所有定时任务
  107. 参数:
  108. - auth (AuthSchema): 认证信息模型
  109. """
  110. SchedulerUtil().clear_jobs()
  111. await JobLogCRUD(auth).clear_obj_log_crud()
  112. await JobCRUD(auth).clear_obj_crud()
  113. @classmethod
  114. async def option_job_service(cls, auth: AuthSchema, id: int, option: int) -> None:
  115. """
  116. 操作定时任务
  117. 参数:
  118. - auth (AuthSchema): 认证信息模型
  119. - id (int): 定时任务ID
  120. - option (int): 操作类型, 1: 暂停 2: 恢复 3: 重启
  121. """
  122. # 1: 暂停 2: 恢复 3: 重启
  123. obj = await JobCRUD(auth).get_obj_by_id_crud(id=id)
  124. if not obj:
  125. raise CustomException(msg='操作失败,该数据定时任务不存在')
  126. if option == 1:
  127. SchedulerUtil().pause_job(job_id=id)
  128. await JobCRUD(auth).set_obj_field_crud(ids=[id], status=False)
  129. elif option == 2:
  130. SchedulerUtil().resume_job(job_id=id)
  131. await JobCRUD(auth).set_obj_field_crud(ids=[id], status=True)
  132. elif option == 3:
  133. # 重启任务:先移除再添加,确保使用最新的任务配置
  134. SchedulerUtil().remove_job(job_id=id)
  135. # 获取最新的任务配置
  136. updated_job = await JobCRUD(auth).get_obj_by_id_crud(id=id)
  137. if updated_job:
  138. # 重新添加任务
  139. SchedulerUtil.add_job(job_info=updated_job)
  140. # 设置状态为运行中
  141. await JobCRUD(auth).set_obj_field_crud(ids=[id], status=True)
  142. @classmethod
  143. async def export_job_service(cls, data_list: list[dict]) -> bytes:
  144. """
  145. 导出定时任务列表
  146. 参数:
  147. - data_list (list[dict]): 定时任务列表
  148. 返回:
  149. - bytes: Excel文件字节流
  150. """
  151. mapping_dict = {
  152. 'id': '编号',
  153. 'name': '任务名称',
  154. 'func': '任务函数',
  155. 'trigger': '触发器',
  156. 'args': '位置参数',
  157. 'kwargs': '关键字参数',
  158. 'coalesce': '是否合并运行',
  159. 'max_instances': '最大实例数',
  160. 'jobstore': '任务存储',
  161. 'executor': '任务执行器',
  162. 'trigger_args': '触发器参数',
  163. 'status': '任务状态',
  164. 'message': '日志信息',
  165. 'description': '备注',
  166. 'created_time': '创建时间',
  167. 'updated_time': '更新时间',
  168. 'created_id': '创建者ID',
  169. 'updated_id': '更新者ID',
  170. }
  171. # 复制数据并转换状态
  172. data = data_list.copy()
  173. for item in data:
  174. item['status'] = '已完成' if item['status'] == '0' else '运行中' if item['status'] == '1' else '暂停'
  175. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  176. class JobLogService:
  177. """
  178. 定时任务日志管理模块服务层
  179. """
  180. @classmethod
  181. async def get_job_log_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  182. """
  183. 获取定时任务日志详情
  184. 参数:
  185. - auth (AuthSchema): 认证信息模型
  186. - id (int): 定时任务日志ID
  187. 返回:
  188. - dict: 定时任务日志详情字典
  189. """
  190. obj = await JobLogCRUD(auth).get_obj_log_by_id_crud(id=id)
  191. return JobLogOutSchema.model_validate(obj).model_dump()
  192. @classmethod
  193. async def get_job_log_list_service(cls, auth: AuthSchema, search: JobLogQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  194. """
  195. 获取定时任务日志列表
  196. 参数:
  197. - auth (AuthSchema): 认证信息模型
  198. - search (JobLogQueryParam | None): 查询参数模型, 包含分页信息和查询条件
  199. - order_by (list[dict] | None): 排序参数列表, 每个元素为一个字典, 包含字段名和排序方向
  200. 返回:
  201. - list[dict]: 定时任务日志详情字典列表
  202. """
  203. obj_list = await JobLogCRUD(auth).get_obj_log_list_crud(search=search.__dict__, order_by=order_by)
  204. return [JobLogOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  205. @classmethod
  206. async def delete_job_log_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  207. """
  208. 删除定时任务日志
  209. 参数:
  210. - auth (AuthSchema): 认证信息模型
  211. - ids (list[int]): 定时任务日志ID列表
  212. """
  213. if len(ids) < 1:
  214. raise CustomException(msg='删除失败,删除对象不能为空')
  215. for id in ids:
  216. exist_obj = await JobLogCRUD(auth).get_obj_log_by_id_crud(id=id)
  217. if not exist_obj:
  218. raise CustomException(msg=f'删除失败,该定时任务日志ID为{id}的记录不存在')
  219. await JobLogCRUD(auth).delete_obj_log_crud(ids=ids)
  220. @classmethod
  221. async def clear_job_log_service(cls, auth: AuthSchema) -> None:
  222. """
  223. 清空定时任务日志
  224. 参数:
  225. - auth (AuthSchema): 认证信息模型
  226. """
  227. # 获取所有日志ID并批量删除
  228. all_logs = await JobLogCRUD(auth).get_obj_log_list_crud()
  229. if all_logs:
  230. ids = [log.id for log in all_logs]
  231. await JobLogCRUD(auth).delete_obj_log_crud(ids=ids)
  232. @classmethod
  233. async def export_job_log_service(cls, data_list: list[dict]) -> bytes:
  234. """
  235. 导出定时任务日志列表
  236. 参数:
  237. - data_list (List[Dict[str, Any]]): 定时任务日志列表
  238. 返回:
  239. - bytes: Excel文件字节流
  240. """
  241. mapping_dict = {
  242. 'id': '编号',
  243. 'job_name': '任务名称',
  244. 'job_group': '任务组名',
  245. 'job_executor': '任务执行器',
  246. 'invoke_target': '调用目标字符串',
  247. 'job_args': '位置参数',
  248. 'job_kwargs': '关键字参数',
  249. 'job_trigger': '任务触发器',
  250. 'job_message': '日志信息',
  251. 'exception_info': '异常信息',
  252. 'status': '执行状态',
  253. 'created_time': '创建时间',
  254. 'updated_time': '更新时间',
  255. }
  256. # 复制数据并转换状态
  257. data = data_list.copy()
  258. for item in data:
  259. item['status'] = '成功' if item.get('status') == '0' else '失败'
  260. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)