controller.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. # -*- coding: utf-8 -*-
  2. from fastapi import APIRouter, Body, Depends, Path
  3. from fastapi.responses import JSONResponse, StreamingResponse
  4. from app.common.response import StreamResponse, SuccessResponse
  5. from app.common.request import PaginationService
  6. from app.core.router_class import OperationLogRoute
  7. from app.utils.common_util import bytes2file_response
  8. from app.core.base_params import PaginationQueryParam
  9. from app.core.dependencies import AuthPermission
  10. from app.core.logger import log
  11. from app.api.v1.module_system.auth.schema import AuthSchema
  12. from .tools.ap_scheduler import SchedulerUtil
  13. from .service import JobService, JobLogService
  14. from .schema import (
  15. JobCreateSchema,
  16. JobUpdateSchema,
  17. JobQueryParam,
  18. JobLogQueryParam
  19. )
  20. JobRouter = APIRouter(route_class=OperationLogRoute, prefix="/job", tags=["定时任务"])
  21. @JobRouter.get("/detail/{id}", summary="获取定时任务详情", description="获取定时任务详情")
  22. async def get_obj_detail_controller(
  23. id: int = Path(..., description="定时任务ID"),
  24. auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
  25. ) -> JSONResponse:
  26. """
  27. 获取定时任务详情
  28. 参数:
  29. - id (int): 定时任务ID
  30. - auth (AuthSchema): 认证信息模型
  31. 返回:
  32. - JSONResponse: 包含定时任务详情的JSON响应
  33. """
  34. result_dict = await JobService.get_job_detail_service(id=id, auth=auth)
  35. log.info(f"获取定时任务详情成功 {id}")
  36. return SuccessResponse(data=result_dict, msg="获取定时任务详情成功")
  37. @JobRouter.get("/list", summary="查询定时任务", description="查询定时任务")
  38. async def get_obj_list_controller(
  39. page: PaginationQueryParam = Depends(),
  40. search: JobQueryParam = Depends(),
  41. auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
  42. ) -> JSONResponse:
  43. """
  44. 查询定时任务
  45. 参数:
  46. - page (PaginationQueryParam): 分页查询参数模型
  47. - search (JobQueryParam): 查询参数模型
  48. - auth (AuthSchema): 认证信息模型
  49. 返回:
  50. - JSONResponse: 包含分页后的定时任务列表的JSON响应
  51. """
  52. result_dict_list = await JobService.get_job_list_service(auth=auth, search=search, order_by=page.order_by)
  53. result_dict = await PaginationService.paginate(data_list= result_dict_list, page_no= page.page_no, page_size = page.page_size)
  54. log.info(f"查询定时任务列表成功")
  55. return SuccessResponse(data=result_dict, msg="查询定时任务列表成功")
  56. @JobRouter.post("/create", summary="创建定时任务", description="创建定时任务")
  57. async def create_obj_controller(
  58. data: JobCreateSchema,
  59. auth: AuthSchema = Depends(AuthPermission(["module_application:job:create"]))
  60. ) -> JSONResponse:
  61. """
  62. 创建定时任务
  63. 参数:
  64. - data (JobCreateSchema): 创建参数模型
  65. - auth (AuthSchema): 认证信息模型
  66. 返回:
  67. - JSONResponse: 包含创建定时任务结果的JSON响应
  68. """
  69. result_dict = await JobService.create_job_service(auth=auth, data=data)
  70. log.info(f"创建定时任务成功: {result_dict}")
  71. return SuccessResponse(data=result_dict, msg="创建定时任务成功")
  72. @JobRouter.put("/update/{id}", summary="修改定时任务", description="修改定时任务")
  73. async def update_obj_controller(
  74. data: JobUpdateSchema,
  75. id: int = Path(..., description="定时任务ID"),
  76. auth: AuthSchema = Depends(AuthPermission(["module_application:job:update"]))
  77. ) -> JSONResponse:
  78. """
  79. 修改定时任务
  80. 参数:
  81. - data (JobUpdateSchema): 更新参数模型
  82. - id (int): 定时任务ID
  83. - auth (AuthSchema): 认证信息模型
  84. 返回:
  85. - JSONResponse: 包含修改定时任务结果的JSON响应
  86. """
  87. result_dict = await JobService.update_job_service(auth=auth, id=id, data=data)
  88. log.info(f"修改定时任务成功: {result_dict}")
  89. return SuccessResponse(data=result_dict, msg="修改定时任务成功")
  90. @JobRouter.delete("/delete", summary="删除定时任务", description="删除定时任务")
  91. async def delete_obj_controller(
  92. ids: list[int] = Body(..., description="ID列表"),
  93. auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
  94. ) -> JSONResponse:
  95. """
  96. 删除定时任务
  97. 参数:
  98. - ids (list[int]): ID列表
  99. - auth (AuthSchema): 认证信息模型
  100. 返回:
  101. - JSONResponse: 包含删除定时任务结果的JSON响应
  102. """
  103. await JobService.delete_job_service(auth=auth, ids=ids)
  104. log.info(f"删除定时任务成功: {ids}")
  105. return SuccessResponse(msg="删除定时任务成功")
  106. @JobRouter.post('/export', summary="导出定时任务", description="导出定时任务")
  107. async def export_obj_list_controller(
  108. search: JobQueryParam = Depends(),
  109. auth: AuthSchema = Depends(AuthPermission(["module_application:job:export"]))
  110. ) -> StreamingResponse:
  111. """
  112. 导出定时任务
  113. 参数:
  114. - search (JobQueryParam): 查询参数模型
  115. - auth (AuthSchema): 认证信息模型
  116. 返回:
  117. - StreamingResponse: 包含导出定时任务结果的流式响应
  118. """
  119. result_dict_list = await JobService.get_job_list_service(search=search, auth=auth)
  120. export_result = await JobService.export_job_service(data_list=result_dict_list)
  121. log.info('导出定时任务成功')
  122. return StreamResponse(
  123. data=bytes2file_response(export_result),
  124. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  125. headers = {
  126. 'Content-Disposition': 'attachment; filename=job.xlsx'
  127. }
  128. )
  129. @JobRouter.delete("/clear", summary="清空定时任务日志", description="清空定时任务日志")
  130. async def clear_obj_log_controller(
  131. auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
  132. ) -> JSONResponse:
  133. """
  134. 清空定时任务日志
  135. 参数:
  136. - auth (AuthSchema): 认证信息模型
  137. 返回:
  138. - JSONResponse: 包含清空定时任务结果的JSON响应
  139. """
  140. await JobService.clear_job_service(auth=auth)
  141. log.info(f"清空定时任务成功")
  142. return SuccessResponse(msg="清空定时任务成功")
  143. @JobRouter.put("/option", summary="暂停/恢复/重启定时任务", description="暂停/恢复/重启定时任务")
  144. async def option_obj_controller(
  145. id: int = Body(..., description="定时任务ID"),
  146. option: int = Body(..., description="操作类型 1: 暂停 2: 恢复 3: 重启"),
  147. auth: AuthSchema = Depends(AuthPermission(["module_application:job:update"]))
  148. ) -> JSONResponse:
  149. """
  150. 暂停/恢复/重启定时任务
  151. 参数:
  152. - id (int): 定时任务ID
  153. - option (int): 操作类型 1: 暂停 2: 恢复 3: 重启
  154. - auth (AuthSchema): 认证信息模型
  155. 返回:
  156. - JSONResponse: 包含操作定时任务结果的JSON响应
  157. """
  158. await JobService.option_job_service(auth=auth, id=id, option=option)
  159. log.info(f"操作定时任务成功: {id}")
  160. return SuccessResponse(msg="操作定时任务成功")
  161. @JobRouter.get("/log", summary="获取定时任务日志", description="获取定时任务日志", dependencies=[Depends(AuthPermission(["module_application:job:query"]))])
  162. async def get_job_log_controller():
  163. """
  164. 获取定时任务日志
  165. 返回:
  166. - JSONResponse: 获取定时任务日志的JSON响应
  167. """
  168. data = [
  169. {
  170. "id": i.id,
  171. "name": i.name,
  172. "trigger": i.trigger.__class__.__name__,
  173. "executor": i.executor,
  174. "func": i.func,
  175. "func_ref": i.func_ref,
  176. "args": i.args,
  177. "kwargs": i.kwargs,
  178. "misfire_grace_time": i.misfire_grace_time,
  179. "coalesce": i.coalesce,
  180. "max_instances": i.max_instances,
  181. "next_run_time": i.next_run_time,
  182. "state": SchedulerUtil.get_single_job_status(job_id=i.id)
  183. }
  184. for i in SchedulerUtil.get_all_jobs()
  185. ]
  186. return SuccessResponse(msg="获取定时任务日志成功", data=data)
  187. # 定时任务日志管理接口
  188. @JobRouter.get("/log/detail/{id}", summary="获取定时任务日志详情", description="获取定时任务日志详情")
  189. async def get_job_log_detail_controller(
  190. id: int = Path(..., description="定时任务日志ID"),
  191. auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
  192. ) -> JSONResponse:
  193. """
  194. 获取定时任务日志详情
  195. 参数:
  196. - id (int): 定时任务日志ID
  197. - auth (AuthSchema): 认证信息模型
  198. 返回:
  199. - JSONResponse: 获取定时任务日志详情的JSON响应
  200. """
  201. result_dict = await JobLogService.get_job_log_detail_service(id=id, auth=auth)
  202. log.info(f"获取定时任务日志详情成功 {id}")
  203. return SuccessResponse(data=result_dict, msg="获取定时任务日志详情成功")
  204. @JobRouter.get("/log/list", summary="查询定时任务日志", description="查询定时任务日志")
  205. async def get_job_log_list_controller(
  206. page: PaginationQueryParam = Depends(),
  207. search: JobLogQueryParam = Depends(),
  208. auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
  209. ) -> JSONResponse:
  210. """
  211. 查询定时任务日志
  212. 参数:
  213. - page (PaginationQueryParam): 分页查询参数模型
  214. - search (JobLogQueryParam): 查询参数模型
  215. - auth (AuthSchema): 认证信息模型
  216. 返回:
  217. - JSONResponse: 查询定时任务日志列表的JSON响应
  218. """
  219. order_by = [{"created_time": "desc"}]
  220. result_dict_list = await JobLogService.get_job_log_list_service(auth=auth, search=search, order_by=order_by)
  221. result_dict = await PaginationService.paginate(data_list=result_dict_list, page_no=page.page_no, page_size=page.page_size)
  222. log.info(f"查询定时任务日志列表成功")
  223. return SuccessResponse(data=result_dict, msg="查询定时任务日志列表成功")
  224. @JobRouter.delete("/log/delete", summary="删除定时任务日志", description="删除定时任务日志")
  225. async def delete_job_log_controller(
  226. ids: list[int] = Body(..., description="ID列表"),
  227. auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
  228. ) -> JSONResponse:
  229. """
  230. 删除定时任务日志
  231. 参数:
  232. - ids (list[int]): ID列表
  233. - auth (AuthSchema): 认证信息模型
  234. 返回:
  235. - JSONResponse: 包含删除定时任务日志结果的JSON响应
  236. """
  237. await JobLogService.delete_job_log_service(auth=auth, ids=ids)
  238. log.info(f"删除定时任务日志成功: {ids}")
  239. return SuccessResponse(msg="删除定时任务日志成功")
  240. @JobRouter.delete("/log/clear", summary="清空定时任务日志", description="清空定时任务日志")
  241. async def clear_job_log_controller(
  242. auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
  243. ) -> JSONResponse:
  244. """
  245. 清空定时任务日志
  246. 参数:
  247. - auth (AuthSchema): 认证信息模型
  248. 返回:
  249. - JSONResponse: 包含清空定时任务日志结果的JSON响应
  250. """
  251. await JobLogService.clear_job_log_service(auth=auth)
  252. log.info(f"清空定时任务日志成功")
  253. return SuccessResponse(msg="清空定时任务日志成功")
  254. @JobRouter.post('/log/export', summary="导出定时任务日志", description="导出定时任务日志")
  255. async def export_job_log_list_controller(
  256. search: JobLogQueryParam = Depends(),
  257. auth: AuthSchema = Depends(AuthPermission(["module_application:job:export"]))
  258. ) -> StreamingResponse:
  259. """
  260. 导出定时任务日志
  261. 参数:
  262. - search (JobLogQueryParam): 查询参数模型
  263. - auth (AuthSchema): 认证信息模型
  264. 返回:
  265. - StreamingResponse: 包含导出定时任务日志结果的流式响应
  266. """
  267. result_dict_list = await JobLogService.get_job_log_list_service(search=search, auth=auth)
  268. export_result = await JobLogService.export_job_log_service(data_list=result_dict_list)
  269. log.info('导出定时任务日志成功')
  270. return StreamResponse(
  271. data=bytes2file_response(export_result),
  272. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  273. headers={
  274. 'Content-Disposition': 'attachment; filename=job_log.xlsx'
  275. }
  276. )