| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Body, Depends, Path
- from fastapi.responses import JSONResponse, StreamingResponse
- from app.common.response import StreamResponse, SuccessResponse
- from app.common.request import PaginationService
- from app.core.router_class import OperationLogRoute
- from app.utils.common_util import bytes2file_response
- from app.core.base_params import PaginationQueryParam
- from app.core.dependencies import AuthPermission
- from app.core.logger import log
- from app.api.v1.module_system.auth.schema import AuthSchema
- from .tools.ap_scheduler import SchedulerUtil
- from .service import JobService, JobLogService
- from .schema import (
- JobCreateSchema,
- JobUpdateSchema,
- JobQueryParam,
- JobLogQueryParam
- )
- JobRouter = APIRouter(route_class=OperationLogRoute, prefix="/job", tags=["定时任务"])
- @JobRouter.get("/detail/{id}", summary="获取定时任务详情", description="获取定时任务详情")
- async def get_obj_detail_controller(
- id: int = Path(..., description="定时任务ID"),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
- ) -> JSONResponse:
- """
- 获取定时任务详情
-
- 参数:
- - id (int): 定时任务ID
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含定时任务详情的JSON响应
- """
- result_dict = await JobService.get_job_detail_service(id=id, auth=auth)
- log.info(f"获取定时任务详情成功 {id}")
- return SuccessResponse(data=result_dict, msg="获取定时任务详情成功")
- @JobRouter.get("/list", summary="查询定时任务", description="查询定时任务")
- async def get_obj_list_controller(
- page: PaginationQueryParam = Depends(),
- search: JobQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
- ) -> JSONResponse:
- """
- 查询定时任务
-
- 参数:
- - page (PaginationQueryParam): 分页查询参数模型
- - search (JobQueryParam): 查询参数模型
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含分页后的定时任务列表的JSON响应
- """
- result_dict_list = await JobService.get_job_list_service(auth=auth, search=search, order_by=page.order_by)
- result_dict = await PaginationService.paginate(data_list= result_dict_list, page_no= page.page_no, page_size = page.page_size)
- log.info(f"查询定时任务列表成功")
- return SuccessResponse(data=result_dict, msg="查询定时任务列表成功")
- @JobRouter.post("/create", summary="创建定时任务", description="创建定时任务")
- async def create_obj_controller(
- data: JobCreateSchema,
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:create"]))
- ) -> JSONResponse:
- """
- 创建定时任务
-
- 参数:
- - data (JobCreateSchema): 创建参数模型
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含创建定时任务结果的JSON响应
- """
- result_dict = await JobService.create_job_service(auth=auth, data=data)
- log.info(f"创建定时任务成功: {result_dict}")
- return SuccessResponse(data=result_dict, msg="创建定时任务成功")
- @JobRouter.put("/update/{id}", summary="修改定时任务", description="修改定时任务")
- async def update_obj_controller(
- data: JobUpdateSchema,
- id: int = Path(..., description="定时任务ID"),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:update"]))
- ) -> JSONResponse:
- """
- 修改定时任务
-
- 参数:
- - data (JobUpdateSchema): 更新参数模型
- - id (int): 定时任务ID
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含修改定时任务结果的JSON响应
- """
- result_dict = await JobService.update_job_service(auth=auth, id=id, data=data)
- log.info(f"修改定时任务成功: {result_dict}")
- return SuccessResponse(data=result_dict, msg="修改定时任务成功")
- @JobRouter.delete("/delete", summary="删除定时任务", description="删除定时任务")
- async def delete_obj_controller(
- ids: list[int] = Body(..., description="ID列表"),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
- ) -> JSONResponse:
- """
- 删除定时任务
-
- 参数:
- - ids (list[int]): ID列表
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含删除定时任务结果的JSON响应
- """
- await JobService.delete_job_service(auth=auth, ids=ids)
- log.info(f"删除定时任务成功: {ids}")
- return SuccessResponse(msg="删除定时任务成功")
- @JobRouter.post('/export', summary="导出定时任务", description="导出定时任务")
- async def export_obj_list_controller(
- search: JobQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:export"]))
- ) -> StreamingResponse:
- """
- 导出定时任务
-
- 参数:
- - search (JobQueryParam): 查询参数模型
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - StreamingResponse: 包含导出定时任务结果的流式响应
- """
- result_dict_list = await JobService.get_job_list_service(search=search, auth=auth)
- export_result = await JobService.export_job_service(data_list=result_dict_list)
- log.info('导出定时任务成功')
- return StreamResponse(
- data=bytes2file_response(export_result),
- media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- headers = {
- 'Content-Disposition': 'attachment; filename=job.xlsx'
- }
- )
- @JobRouter.delete("/clear", summary="清空定时任务日志", description="清空定时任务日志")
- async def clear_obj_log_controller(
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
- ) -> JSONResponse:
- """
- 清空定时任务日志
-
- 参数:
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含清空定时任务结果的JSON响应
- """
- await JobService.clear_job_service(auth=auth)
- log.info(f"清空定时任务成功")
- return SuccessResponse(msg="清空定时任务成功")
- @JobRouter.put("/option", summary="暂停/恢复/重启定时任务", description="暂停/恢复/重启定时任务")
- async def option_obj_controller(
- id: int = Body(..., description="定时任务ID"),
- option: int = Body(..., description="操作类型 1: 暂停 2: 恢复 3: 重启"),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:update"]))
- ) -> JSONResponse:
- """
- 暂停/恢复/重启定时任务
-
- 参数:
- - id (int): 定时任务ID
- - option (int): 操作类型 1: 暂停 2: 恢复 3: 重启
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含操作定时任务结果的JSON响应
- """
- await JobService.option_job_service(auth=auth, id=id, option=option)
- log.info(f"操作定时任务成功: {id}")
- return SuccessResponse(msg="操作定时任务成功")
- @JobRouter.get("/log", summary="获取定时任务日志", description="获取定时任务日志", dependencies=[Depends(AuthPermission(["module_application:job:query"]))])
- async def get_job_log_controller():
- """
- 获取定时任务日志
-
- 返回:
- - JSONResponse: 获取定时任务日志的JSON响应
- """
- data = [
- {
- "id": i.id,
- "name": i.name,
- "trigger": i.trigger.__class__.__name__,
- "executor": i.executor,
- "func": i.func,
- "func_ref": i.func_ref,
- "args": i.args,
- "kwargs": i.kwargs,
- "misfire_grace_time": i.misfire_grace_time,
- "coalesce": i.coalesce,
- "max_instances": i.max_instances,
- "next_run_time": i.next_run_time,
- "state": SchedulerUtil.get_single_job_status(job_id=i.id)
- }
- for i in SchedulerUtil.get_all_jobs()
- ]
- return SuccessResponse(msg="获取定时任务日志成功", data=data)
- # 定时任务日志管理接口
- @JobRouter.get("/log/detail/{id}", summary="获取定时任务日志详情", description="获取定时任务日志详情")
- async def get_job_log_detail_controller(
- id: int = Path(..., description="定时任务日志ID"),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
- ) -> JSONResponse:
- """
- 获取定时任务日志详情
-
- 参数:
- - id (int): 定时任务日志ID
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 获取定时任务日志详情的JSON响应
- """
- result_dict = await JobLogService.get_job_log_detail_service(id=id, auth=auth)
- log.info(f"获取定时任务日志详情成功 {id}")
- return SuccessResponse(data=result_dict, msg="获取定时任务日志详情成功")
- @JobRouter.get("/log/list", summary="查询定时任务日志", description="查询定时任务日志")
- async def get_job_log_list_controller(
- page: PaginationQueryParam = Depends(),
- search: JobLogQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:query"]))
- ) -> JSONResponse:
- """
- 查询定时任务日志
-
- 参数:
- - page (PaginationQueryParam): 分页查询参数模型
- - search (JobLogQueryParam): 查询参数模型
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 查询定时任务日志列表的JSON响应
- """
- order_by = [{"created_time": "desc"}]
- result_dict_list = await JobLogService.get_job_log_list_service(auth=auth, search=search, order_by=order_by)
- result_dict = await PaginationService.paginate(data_list=result_dict_list, page_no=page.page_no, page_size=page.page_size)
- log.info(f"查询定时任务日志列表成功")
- return SuccessResponse(data=result_dict, msg="查询定时任务日志列表成功")
- @JobRouter.delete("/log/delete", summary="删除定时任务日志", description="删除定时任务日志")
- async def delete_job_log_controller(
- ids: list[int] = Body(..., description="ID列表"),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
- ) -> JSONResponse:
- """
- 删除定时任务日志
-
- 参数:
- - ids (list[int]): ID列表
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含删除定时任务日志结果的JSON响应
- """
- await JobLogService.delete_job_log_service(auth=auth, ids=ids)
- log.info(f"删除定时任务日志成功: {ids}")
- return SuccessResponse(msg="删除定时任务日志成功")
- @JobRouter.delete("/log/clear", summary="清空定时任务日志", description="清空定时任务日志")
- async def clear_job_log_controller(
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:delete"]))
- ) -> JSONResponse:
- """
- 清空定时任务日志
-
- 参数:
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - JSONResponse: 包含清空定时任务日志结果的JSON响应
- """
- await JobLogService.clear_job_log_service(auth=auth)
- log.info(f"清空定时任务日志成功")
- return SuccessResponse(msg="清空定时任务日志成功")
- @JobRouter.post('/log/export', summary="导出定时任务日志", description="导出定时任务日志")
- async def export_job_log_list_controller(
- search: JobLogQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_application:job:export"]))
- ) -> StreamingResponse:
- """
- 导出定时任务日志
-
- 参数:
- - search (JobLogQueryParam): 查询参数模型
- - auth (AuthSchema): 认证信息模型
-
- 返回:
- - StreamingResponse: 包含导出定时任务日志结果的流式响应
- """
- result_dict_list = await JobLogService.get_job_log_list_service(search=search, auth=auth)
- export_result = await JobLogService.export_job_log_service(data_list=result_dict_list)
- log.info('导出定时任务日志成功')
- return StreamResponse(
- data=bytes2file_response(export_result),
- media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- headers={
- 'Content-Disposition': 'attachment; filename=job_log.xlsx'
- }
- )
|