ap_scheduler.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import importlib
  4. from datetime import datetime
  5. from typing import Any
  6. from asyncio import iscoroutinefunction
  7. from apscheduler.job import Job
  8. from apscheduler.events import JobExecutionEvent, EVENT_ALL, JobEvent
  9. from apscheduler.executors.asyncio import AsyncIOExecutor
  10. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  11. from apscheduler.executors.pool import ProcessPoolExecutor
  12. from apscheduler.jobstores.memory import MemoryJobStore
  13. from apscheduler.jobstores.redis import RedisJobStore
  14. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  15. from apscheduler.triggers.cron import CronTrigger
  16. from apscheduler.triggers.date import DateTrigger
  17. from apscheduler.triggers.interval import IntervalTrigger
  18. from concurrent.futures import ThreadPoolExecutor
  19. from app.config.setting import settings
  20. from app.core.database import engine, db_session, async_db_session
  21. from app.core.exceptions import CustomException
  22. from app.core.logger import log
  23. from app.utils.cron_util import CronUtil
  24. from app.api.v1.module_application.job.model import JobModel
  25. job_stores = {
  26. 'default': MemoryJobStore(),
  27. 'sqlalchemy': SQLAlchemyJobStore(url=settings.DB_URI, engine=engine),
  28. 'redis': RedisJobStore(
  29. host=settings.REDIS_HOST,
  30. port=int(settings.REDIS_PORT),
  31. username=settings.REDIS_USER,
  32. password=settings.REDIS_PASSWORD,
  33. db=int(settings.REDIS_DB_NAME),
  34. ),
  35. }
  36. # 配置执行器
  37. executors = {
  38. 'default': AsyncIOExecutor(),
  39. 'processpool': ProcessPoolExecutor(max_workers=1) # 减少进程数量以减少资源消耗
  40. }
  41. # 配置默认参数
  42. job_defaults = {
  43. 'coalesce': True, # 合并执行错过的任务
  44. 'max_instances': 1, # 最大实例数
  45. }
  46. # 配置调度器
  47. scheduler = AsyncIOScheduler()
  48. scheduler.configure(
  49. jobstores=job_stores,
  50. executors=executors,
  51. job_defaults=job_defaults,
  52. timezone='Asia/Shanghai'
  53. )
  54. class SchedulerUtil:
  55. """
  56. 定时任务相关方法
  57. """
  58. @classmethod
  59. def scheduler_event_listener(cls, event: JobEvent | JobExecutionEvent) -> None:
  60. """
  61. 监听任务执行事件。
  62. 参数:
  63. - event (JobEvent | JobExecutionEvent): 任务事件对象。
  64. 返回:
  65. - None
  66. """
  67. # 延迟导入避免循环导入
  68. from app.api.v1.module_application.job.model import JobLogModel
  69. # 获取事件类型和任务ID
  70. event_type = event.__class__.__name__
  71. # 初始化任务状态
  72. status = True
  73. exception_info = ''
  74. if isinstance(event, JobExecutionEvent) and event.exception:
  75. exception_info = str(event.exception)
  76. status = False
  77. if hasattr(event, 'job_id'):
  78. job_id = event.job_id
  79. query_job = cls.get_job(job_id=job_id)
  80. if query_job:
  81. query_job_info = query_job.__getstate__()
  82. # 获取任务名称
  83. job_name = query_job_info.get('name')
  84. # 获取任务组名
  85. job_group = query_job._jobstore_alias
  86. # # 获取任务执行器
  87. job_executor = query_job_info.get('executor')
  88. # 获取调用目标字符串
  89. invoke_target = query_job_info.get('func')
  90. # 获取调用函数位置参数
  91. job_args = ','.join(map(str, query_job_info.get('args', [])))
  92. # 获取调用函数关键字参数
  93. job_kwargs = json.dumps(query_job_info.get('kwargs'))
  94. # 获取任务触发器
  95. job_trigger = str(query_job_info.get('trigger'))
  96. # 构造日志消息
  97. job_message = f"事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 状态: {status}, 任务组: {job_group}, 错误详情: {exception_info}, 执行于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
  98. # 创建ORM对象
  99. job_log = JobLogModel(
  100. job_name=job_name,
  101. job_group=job_group,
  102. job_executor=job_executor,
  103. invoke_target=invoke_target,
  104. job_args=job_args,
  105. job_kwargs=job_kwargs,
  106. job_trigger=job_trigger,
  107. job_message=job_message,
  108. status=status,
  109. exception_info=exception_info,
  110. created_time=datetime.now(),
  111. updated_time=datetime.now(),
  112. job_id=job_id,
  113. )
  114. # 使用线程池执行操作以避免阻塞调度器和数据库锁定问题
  115. executor = ThreadPoolExecutor(max_workers=1)
  116. executor.submit(cls._save_job_log_async_wrapper, job_log)
  117. executor.shutdown(wait=False)
  118. @classmethod
  119. def _save_job_log_async_wrapper(cls, job_log) -> None:
  120. """
  121. 异步保存任务日志的包装器函数,在独立线程中运行
  122. 参数:
  123. - job_log (JobLogModel): 任务日志对象
  124. 返回:
  125. - None
  126. """
  127. with db_session.begin() as session:
  128. try:
  129. session.add(job_log)
  130. session.commit()
  131. except Exception as e:
  132. session.rollback()
  133. log.error(f"保存任务日志失败: {str(e)}")
  134. finally:
  135. session.close()
  136. @classmethod
  137. async def init_system_scheduler(cls) -> None:
  138. """
  139. 应用启动时初始化定时任务。
  140. 返回:
  141. - None
  142. """
  143. # 延迟导入避免循环导入
  144. from app.api.v1.module_application.job.crud import JobCRUD
  145. from app.api.v1.module_system.auth.schema import AuthSchema
  146. log.info('🔎 开始启动定时任务...')
  147. # 启动调度器
  148. scheduler.start()
  149. # 添加事件监听器
  150. scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
  151. async with async_db_session() as session:
  152. async with session.begin():
  153. auth = AuthSchema(db=session)
  154. job_list = await JobCRUD(auth).get_obj_list_crud()
  155. # 只在一个实例上初始化任务
  156. # 使用Redis锁确保只有一个实例执行任务初始化
  157. import redis.asyncio as redis
  158. redis_client = redis.Redis(
  159. host=settings.REDIS_HOST,
  160. port=int(settings.REDIS_PORT),
  161. username=settings.REDIS_USER,
  162. password=settings.REDIS_PASSWORD,
  163. db=int(settings.REDIS_DB_NAME),
  164. )
  165. # 尝试获取锁,过期时间10秒
  166. lock_key = "scheduler_init_lock"
  167. lock_acquired = await redis_client.set(lock_key, "1", ex=10, nx=True)
  168. if lock_acquired:
  169. try:
  170. for item in job_list:
  171. # 检查任务是否已经存在
  172. existing_job = cls.get_job(job_id=item.id)
  173. if existing_job:
  174. cls.remove_job(job_id=item.id) # 删除旧任务
  175. # 添加新任务
  176. cls.add_job(item)
  177. # 根据数据库中保存的状态来设置任务状态
  178. if hasattr(item, 'status') and item.status == "1":
  179. # 如果任务状态为暂停,则立即暂停刚添加的任务
  180. cls.pause_job(job_id=item.id)
  181. log.info('✅️ 系统初始定时任务加载成功')
  182. finally:
  183. # 释放锁
  184. await redis_client.delete(lock_key)
  185. else:
  186. # 等待其他实例完成初始化
  187. import asyncio
  188. await asyncio.sleep(2)
  189. log.info('✅️ 定时任务已由其他实例初始化完成')
  190. @classmethod
  191. async def close_system_scheduler(cls) -> None:
  192. """
  193. 关闭系统定时任务。
  194. 返回:
  195. - None
  196. """
  197. try:
  198. # 移除所有任务
  199. scheduler.remove_all_jobs()
  200. # 等待所有任务完成后再关闭
  201. scheduler.shutdown(wait=True)
  202. log.info('✅️ 关闭定时任务成功')
  203. except Exception as e:
  204. log.error(f'关闭定时任务失败: {str(e)}')
  205. @classmethod
  206. def get_job(cls, job_id: str | int) -> Job | None:
  207. """
  208. 根据任务ID获取任务对象。
  209. 参数:
  210. - job_id (str | int): 任务ID。
  211. 返回:
  212. - Job | None: 任务对象,未找到则为 None。
  213. """
  214. return scheduler.get_job(job_id=str(job_id))
  215. @classmethod
  216. def get_all_jobs(cls) -> list[Job]:
  217. """
  218. 获取全部调度任务列表。
  219. 返回:
  220. - list[Job]: 任务列表。
  221. """
  222. return scheduler.get_jobs()
  223. @classmethod
  224. async def _task_wrapper(cls, job_id, func, *args, **kwargs):
  225. """
  226. 任务执行包装器,添加分布式锁防止同一任务被多个实例同时执行。
  227. 参数:
  228. - job_id: 任务ID
  229. - func: 实际要执行的任务函数
  230. - *args: 任务函数位置参数
  231. - **kwargs: 任务函数关键字参数
  232. 返回:
  233. - 任务函数的返回值
  234. """
  235. import redis.asyncio as redis
  236. import asyncio
  237. from app.config.setting import settings
  238. # 创建Redis客户端
  239. redis_client = redis.Redis(
  240. host=settings.REDIS_HOST,
  241. port=int(settings.REDIS_PORT),
  242. username=settings.REDIS_USER,
  243. password=settings.REDIS_PASSWORD,
  244. db=int(settings.REDIS_DB_NAME),
  245. )
  246. # 生成锁键
  247. lock_key = f"job_lock:{job_id}"
  248. # 设置锁的过期时间(根据任务类型调整,这里设置为30秒)
  249. lock_expire = 30
  250. lock_acquired = False
  251. try:
  252. # 尝试获取锁
  253. lock_acquired = await redis_client.set(lock_key, "1", ex=lock_expire, nx=True)
  254. if lock_acquired:
  255. log.info(f"任务 {job_id} 获取执行锁成功")
  256. # 执行任务
  257. if iscoroutinefunction(func):
  258. return await func(*args, **kwargs)
  259. else:
  260. # 对于同步函数,使用线程池执行
  261. loop = asyncio.get_running_loop()
  262. return await loop.run_in_executor(None, func, *args, **kwargs)
  263. else:
  264. # 获取锁失败,记录日志
  265. log.info(f"任务 {job_id} 获取执行锁失败,跳过本次执行")
  266. return None
  267. finally:
  268. # 释放锁
  269. if lock_acquired:
  270. await redis_client.delete(lock_key)
  271. log.info(f"任务 {job_id} 释放执行锁")
  272. @classmethod
  273. def add_job(cls, job_info: JobModel) -> Job:
  274. """
  275. 根据任务配置创建并添加调度任务。
  276. 参数:
  277. - job_info (JobModel): 任务对象信息(包含触发器、函数、参数等)。
  278. 返回:
  279. - Job: 新增的任务对象。
  280. """
  281. # 动态导入模块
  282. # 1. 解析调用目标
  283. module_path, func_name = str(job_info.func).rsplit('.', 1)
  284. module_path = "app.api.v1.module_application.job.function_task." + module_path
  285. try:
  286. module = importlib.import_module(module_path)
  287. job_func = getattr(module, func_name)
  288. # 2. 确定任务存储器:优先使用redis,确保分布式环境中任务同步
  289. if job_info.jobstore is None:
  290. job_info.jobstore = 'redis' # 改为默认使用redis存储
  291. # 3. 确定执行器
  292. job_executor = job_info.executor
  293. if job_executor is None:
  294. job_executor = 'default'
  295. if job_info.trigger_args is None:
  296. raise ValueError("触发器缺少参数")
  297. # 异步函数必须使用默认执行器
  298. if iscoroutinefunction(job_func):
  299. job_executor = 'default'
  300. # 4. 创建触发器
  301. if job_info.trigger == 'date':
  302. trigger = DateTrigger(run_date=job_info.trigger_args)
  303. elif job_info.trigger == 'interval':
  304. # 将传入的 interval 表达式拆分为不同的字段
  305. fields = job_info.trigger_args.strip().split()
  306. if len(fields) != 5:
  307. raise ValueError("无效的 interval 表达式")
  308. second, minute, hour, day, week = tuple([int(field) if field != '*' else 0 for field in fields])
  309. # 秒、分、时、天、周(* * * * 1)
  310. trigger = IntervalTrigger(
  311. weeks=week,
  312. days=day,
  313. hours=hour,
  314. minutes=minute,
  315. seconds=second,
  316. start_date=job_info.start_date,
  317. end_date=job_info.end_date,
  318. timezone='Asia/Shanghai',
  319. jitter=None
  320. )
  321. elif job_info.trigger == 'cron':
  322. # 秒、分、时、天、月、星期几、年 ()
  323. fields = job_info.trigger_args.strip().split()
  324. if len(fields) not in (6, 7):
  325. raise ValueError("无效的 Cron 表达式")
  326. if not CronUtil.validate_cron_expression(job_info.trigger_args):
  327. raise ValueError(f'定时任务{job_info.name}, Cron表达式不正确')
  328. parsed_fields = [None if field in ('*', '?') else field for field in fields]
  329. if len(fields) == 6:
  330. parsed_fields.append(None)
  331. second, minute, hour, day, month, day_of_week, year = tuple(parsed_fields)
  332. trigger = CronTrigger(
  333. second=second,
  334. minute=minute,
  335. hour=hour,
  336. day=day,
  337. month=month,
  338. day_of_week=day_of_week,
  339. year=year,
  340. start_date=job_info.start_date,
  341. end_date=job_info.end_date,
  342. timezone='Asia/Shanghai'
  343. )
  344. else:
  345. raise ValueError("无效的 trigger 触发器")
  346. # 5. 添加任务(使用包装器函数)
  347. job = scheduler.add_job(
  348. func=cls._task_wrapper,
  349. trigger=trigger,
  350. args=[str(job_info.id), job_func] + (str(job_info.args).split(',') if job_info.args else []),
  351. kwargs=json.loads(job_info.kwargs) if job_info.kwargs else {},
  352. id=str(job_info.id),
  353. name=job_info.name,
  354. coalesce=job_info.coalesce,
  355. max_instances=1, # 确保只有一个实例执行
  356. jobstore=job_info.jobstore,
  357. executor=job_executor,
  358. )
  359. log.info(f"任务 {job_info.id} 添加到 {job_info.jobstore} 存储器成功")
  360. return job
  361. except ModuleNotFoundError:
  362. raise ValueError(f"未找到该模块:{module_path}")
  363. except AttributeError:
  364. raise ValueError(f"未找到该模块下的方法:{func_name}")
  365. except Exception as e:
  366. raise CustomException(msg=f"添加任务失败: {str(e)}")
  367. @classmethod
  368. def remove_job(cls, job_id: str | int) -> None:
  369. """
  370. 根据任务ID删除调度任务。
  371. 参数:
  372. - job_id (str | int): 任务ID。
  373. 返回:
  374. - None
  375. """
  376. query_job = cls.get_job(job_id=str(job_id))
  377. if query_job:
  378. scheduler.remove_job(job_id=str(job_id))
  379. @classmethod
  380. def clear_jobs(cls) -> None:
  381. """
  382. 删除所有调度任务。
  383. 返回:
  384. - None
  385. """
  386. scheduler.remove_all_jobs()
  387. @classmethod
  388. def modify_job(cls, job_id: str | int) -> Job:
  389. """
  390. 更新指定任务的配置(运行中的任务下次执行生效)。
  391. 参数:
  392. - job_id (str | int): 任务ID。
  393. 返回:
  394. - Job: 更新后的任务对象。
  395. 异常:
  396. - CustomException: 当任务不存在时抛出。
  397. """
  398. query_job = cls.get_job(job_id=str(job_id))
  399. if not query_job:
  400. raise CustomException(msg=f"未找到该任务:{job_id}")
  401. return scheduler.modify_job(job_id=str(job_id))
  402. @classmethod
  403. def pause_job(cls, job_id: str | int) -> None:
  404. """
  405. 暂停指定任务(仅运行中可暂停,已终止不可)。
  406. 参数:
  407. - job_id (str | int): 任务ID。
  408. 返回:
  409. - None
  410. 异常:
  411. - ValueError: 当任务不存在时抛出。
  412. """
  413. query_job = cls.get_job(job_id=str(job_id))
  414. if not query_job:
  415. raise ValueError(f"未找到该任务:{job_id}")
  416. scheduler.pause_job(job_id=str(job_id))
  417. @classmethod
  418. def resume_job(cls, job_id: str | int) -> None:
  419. """
  420. 恢复指定任务(仅暂停中可恢复,已终止不可)。
  421. 参数:
  422. - job_id (str | int): 任务ID。
  423. 返回:
  424. - None
  425. 异常:
  426. - ValueError: 当任务不存在时抛出。
  427. """
  428. query_job = cls.get_job(job_id=str(job_id))
  429. if not query_job:
  430. raise ValueError(f"未找到该任务:{job_id}")
  431. scheduler.resume_job(job_id=str(job_id))
  432. @classmethod
  433. def reschedule_job(cls, job_id: str | int, trigger=None, **trigger_args) -> Job | None:
  434. """
  435. 重启指定任务的触发器。
  436. 参数:
  437. - job_id (str | int): 任务ID。
  438. - trigger: 触发器类型
  439. - **trigger_args: 触发器参数
  440. 返回:
  441. - Job: 更新后的任务对象
  442. 异常:
  443. - CustomException: 当任务不存在时抛出。
  444. """
  445. query_job = cls.get_job(job_id=str(job_id))
  446. if not query_job:
  447. raise CustomException(msg=f"未找到该任务:{job_id}")
  448. # 如果没有提供新的触发器,则使用现有触发器
  449. if trigger is None:
  450. # 获取当前任务的触发器配置
  451. current_trigger = query_job.trigger
  452. # 重新调度任务,使用当前的触发器
  453. return scheduler.reschedule_job(job_id=str(job_id), trigger=current_trigger)
  454. else:
  455. # 使用新提供的触发器
  456. return scheduler.reschedule_job(job_id=str(job_id), trigger=trigger, **trigger_args)
  457. @classmethod
  458. def get_single_job_status(cls, job_id: str | int) -> str:
  459. """
  460. 获取单个任务的当前状态。
  461. 参数:
  462. - job_id (str | int): 任务ID
  463. 返回:
  464. - str: 任务状态('running' | 'paused' | 'stopped' | 'unknown')
  465. """
  466. job = cls.get_job(job_id=str(job_id))
  467. if not job:
  468. return 'unknown'
  469. # 检查任务是否在暂停列表中
  470. if job_id in scheduler._jobstores[job._jobstore_alias]._paused_jobs:
  471. return 'paused'
  472. # 检查调度器状态
  473. if scheduler.state == 0: # STATE_STOPPED
  474. return 'stopped'
  475. return 'running'
  476. @classmethod
  477. def print_jobs(cls,jobstore: Any | None = None, out: Any | None = None):
  478. """
  479. 打印调度任务列表。
  480. 参数:
  481. - jobstore (Any | None): 任务存储别名。
  482. - out (Any | None): 输出目标。
  483. 返回:
  484. - None
  485. """
  486. scheduler.print_jobs(jobstore=jobstore, out=out)
  487. @classmethod
  488. def get_job_status(cls) -> str:
  489. """
  490. 获取调度器当前状态。
  491. 返回:
  492. - str: 状态字符串('stopped' | 'running' | 'paused' | 'unknown')。
  493. """
  494. #: constant indicating a scheduler's stopped state
  495. STATE_STOPPED = 0
  496. #: constant indicating a scheduler's running state (started and processing jobs)
  497. STATE_RUNNING = 1
  498. #: constant indicating a scheduler's paused state (started but not processing jobs)
  499. STATE_PAUSED = 2
  500. if scheduler.state == STATE_STOPPED:
  501. return 'stopped'
  502. elif scheduler.state == STATE_RUNNING:
  503. return 'running'
  504. elif scheduler.state == STATE_PAUSED:
  505. return 'paused'
  506. else:
  507. return 'unknown'