| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589 |
- # -*- coding: utf-8 -*-
- import json
- import importlib
- from datetime import datetime
- from typing import Any
- from asyncio import iscoroutinefunction
- from apscheduler.job import Job
- from apscheduler.events import JobExecutionEvent, EVENT_ALL, JobEvent
- from apscheduler.executors.asyncio import AsyncIOExecutor
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.executors.pool import ProcessPoolExecutor
- from apscheduler.jobstores.memory import MemoryJobStore
- from apscheduler.jobstores.redis import RedisJobStore
- from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
- from apscheduler.triggers.cron import CronTrigger
- from apscheduler.triggers.date import DateTrigger
- from apscheduler.triggers.interval import IntervalTrigger
- from concurrent.futures import ThreadPoolExecutor
- from app.config.setting import settings
- from app.core.database import engine, db_session, async_db_session
- from app.core.exceptions import CustomException
- from app.core.logger import log
- from app.utils.cron_util import CronUtil
- from app.api.v1.module_application.job.model import JobModel
- job_stores = {
- 'default': MemoryJobStore(),
- 'sqlalchemy': SQLAlchemyJobStore(url=settings.DB_URI, engine=engine),
- 'redis': RedisJobStore(
- host=settings.REDIS_HOST,
- port=int(settings.REDIS_PORT),
- username=settings.REDIS_USER,
- password=settings.REDIS_PASSWORD,
- db=int(settings.REDIS_DB_NAME),
- ),
- }
- # 配置执行器
- executors = {
- 'default': AsyncIOExecutor(),
- 'processpool': ProcessPoolExecutor(max_workers=1) # 减少进程数量以减少资源消耗
- }
- # 配置默认参数
- job_defaults = {
- 'coalesce': True, # 合并执行错过的任务
- 'max_instances': 1, # 最大实例数
- }
- # 配置调度器
- scheduler = AsyncIOScheduler()
- scheduler.configure(
- jobstores=job_stores,
- executors=executors,
- job_defaults=job_defaults,
- timezone='Asia/Shanghai'
- )
- class SchedulerUtil:
- """
- 定时任务相关方法
- """
- @classmethod
- def scheduler_event_listener(cls, event: JobEvent | JobExecutionEvent) -> None:
- """
- 监听任务执行事件。
-
- 参数:
- - event (JobEvent | JobExecutionEvent): 任务事件对象。
-
- 返回:
- - None
- """
- # 延迟导入避免循环导入
- from app.api.v1.module_application.job.model import JobLogModel
-
- # 获取事件类型和任务ID
- event_type = event.__class__.__name__
- # 初始化任务状态
- status = True
- exception_info = ''
- if isinstance(event, JobExecutionEvent) and event.exception:
- exception_info = str(event.exception)
- status = False
- if hasattr(event, 'job_id'):
- job_id = event.job_id
- query_job = cls.get_job(job_id=job_id)
- if query_job:
- query_job_info = query_job.__getstate__()
- # 获取任务名称
- job_name = query_job_info.get('name')
- # 获取任务组名
- job_group = query_job._jobstore_alias
- # # 获取任务执行器
- job_executor = query_job_info.get('executor')
- # 获取调用目标字符串
- invoke_target = query_job_info.get('func')
- # 获取调用函数位置参数
- job_args = ','.join(map(str, query_job_info.get('args', [])))
- # 获取调用函数关键字参数
- job_kwargs = json.dumps(query_job_info.get('kwargs'))
- # 获取任务触发器
- job_trigger = str(query_job_info.get('trigger'))
- # 构造日志消息
- job_message = f"事件类型: {event_type}, 任务ID: {job_id}, 任务名称: {job_name}, 状态: {status}, 任务组: {job_group}, 错误详情: {exception_info}, 执行于{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
-
- # 创建ORM对象
- job_log = JobLogModel(
- job_name=job_name,
- job_group=job_group,
- job_executor=job_executor,
- invoke_target=invoke_target,
- job_args=job_args,
- job_kwargs=job_kwargs,
- job_trigger=job_trigger,
- job_message=job_message,
- status=status,
- exception_info=exception_info,
- created_time=datetime.now(),
- updated_time=datetime.now(),
- job_id=job_id,
- )
-
- # 使用线程池执行操作以避免阻塞调度器和数据库锁定问题
- executor = ThreadPoolExecutor(max_workers=1)
- executor.submit(cls._save_job_log_async_wrapper, job_log)
- executor.shutdown(wait=False)
- @classmethod
- def _save_job_log_async_wrapper(cls, job_log) -> None:
- """
- 异步保存任务日志的包装器函数,在独立线程中运行
-
- 参数:
- - job_log (JobLogModel): 任务日志对象
-
- 返回:
- - None
- """
- with db_session.begin() as session:
- try:
- session.add(job_log)
- session.commit()
- except Exception as e:
- session.rollback()
- log.error(f"保存任务日志失败: {str(e)}")
- finally:
- session.close()
- @classmethod
- async def init_system_scheduler(cls) -> None:
- """
- 应用启动时初始化定时任务。
-
- 返回:
- - None
- """
- # 延迟导入避免循环导入
- from app.api.v1.module_application.job.crud import JobCRUD
- from app.api.v1.module_system.auth.schema import AuthSchema
- log.info('🔎 开始启动定时任务...')
-
- # 启动调度器
- scheduler.start()
-
- # 添加事件监听器
- scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
-
- async with async_db_session() as session:
- async with session.begin():
- auth = AuthSchema(db=session)
- job_list = await JobCRUD(auth).get_obj_list_crud()
-
- # 只在一个实例上初始化任务
- # 使用Redis锁确保只有一个实例执行任务初始化
- import redis.asyncio as redis
- redis_client = redis.Redis(
- host=settings.REDIS_HOST,
- port=int(settings.REDIS_PORT),
- username=settings.REDIS_USER,
- password=settings.REDIS_PASSWORD,
- db=int(settings.REDIS_DB_NAME),
- )
-
- # 尝试获取锁,过期时间10秒
- lock_key = "scheduler_init_lock"
- lock_acquired = await redis_client.set(lock_key, "1", ex=10, nx=True)
-
- if lock_acquired:
- try:
- for item in job_list:
- # 检查任务是否已经存在
- existing_job = cls.get_job(job_id=item.id)
- if existing_job:
- cls.remove_job(job_id=item.id) # 删除旧任务
-
- # 添加新任务
- cls.add_job(item)
-
- # 根据数据库中保存的状态来设置任务状态
- if hasattr(item, 'status') and item.status == "1":
- # 如果任务状态为暂停,则立即暂停刚添加的任务
- cls.pause_job(job_id=item.id)
- log.info('✅️ 系统初始定时任务加载成功')
- finally:
- # 释放锁
- await redis_client.delete(lock_key)
- else:
- # 等待其他实例完成初始化
- import asyncio
- await asyncio.sleep(2)
- log.info('✅️ 定时任务已由其他实例初始化完成')
- @classmethod
- async def close_system_scheduler(cls) -> None:
- """
- 关闭系统定时任务。
-
- 返回:
- - None
- """
- try:
- # 移除所有任务
- scheduler.remove_all_jobs()
- # 等待所有任务完成后再关闭
- scheduler.shutdown(wait=True)
- log.info('✅️ 关闭定时任务成功')
- except Exception as e:
- log.error(f'关闭定时任务失败: {str(e)}')
- @classmethod
- def get_job(cls, job_id: str | int) -> Job | None:
- """
- 根据任务ID获取任务对象。
-
- 参数:
- - job_id (str | int): 任务ID。
-
- 返回:
- - Job | None: 任务对象,未找到则为 None。
- """
- return scheduler.get_job(job_id=str(job_id))
- @classmethod
- def get_all_jobs(cls) -> list[Job]:
- """
- 获取全部调度任务列表。
-
- 返回:
- - list[Job]: 任务列表。
- """
- return scheduler.get_jobs()
- @classmethod
- async def _task_wrapper(cls, job_id, func, *args, **kwargs):
- """
- 任务执行包装器,添加分布式锁防止同一任务被多个实例同时执行。
-
- 参数:
- - job_id: 任务ID
- - func: 实际要执行的任务函数
- - *args: 任务函数位置参数
- - **kwargs: 任务函数关键字参数
-
- 返回:
- - 任务函数的返回值
- """
- import redis.asyncio as redis
- import asyncio
- from app.config.setting import settings
-
- # 创建Redis客户端
- redis_client = redis.Redis(
- host=settings.REDIS_HOST,
- port=int(settings.REDIS_PORT),
- username=settings.REDIS_USER,
- password=settings.REDIS_PASSWORD,
- db=int(settings.REDIS_DB_NAME),
- )
-
- # 生成锁键
- lock_key = f"job_lock:{job_id}"
-
- # 设置锁的过期时间(根据任务类型调整,这里设置为30秒)
- lock_expire = 30
- lock_acquired = False
-
- try:
- # 尝试获取锁
- lock_acquired = await redis_client.set(lock_key, "1", ex=lock_expire, nx=True)
-
- if lock_acquired:
- log.info(f"任务 {job_id} 获取执行锁成功")
- # 执行任务
- if iscoroutinefunction(func):
- return await func(*args, **kwargs)
- else:
- # 对于同步函数,使用线程池执行
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(None, func, *args, **kwargs)
- else:
- # 获取锁失败,记录日志
- log.info(f"任务 {job_id} 获取执行锁失败,跳过本次执行")
- return None
- finally:
- # 释放锁
- if lock_acquired:
- await redis_client.delete(lock_key)
- log.info(f"任务 {job_id} 释放执行锁")
-
- @classmethod
- def add_job(cls, job_info: JobModel) -> Job:
- """
- 根据任务配置创建并添加调度任务。
-
- 参数:
- - job_info (JobModel): 任务对象信息(包含触发器、函数、参数等)。
-
- 返回:
- - Job: 新增的任务对象。
- """
- # 动态导入模块
- # 1. 解析调用目标
- module_path, func_name = str(job_info.func).rsplit('.', 1)
- module_path = "app.api.v1.module_application.job.function_task." + module_path
- try:
- module = importlib.import_module(module_path)
- job_func = getattr(module, func_name)
-
- # 2. 确定任务存储器:优先使用redis,确保分布式环境中任务同步
- if job_info.jobstore is None:
- job_info.jobstore = 'redis' # 改为默认使用redis存储
-
- # 3. 确定执行器
- job_executor = job_info.executor
- if job_executor is None:
- job_executor = 'default'
-
- if job_info.trigger_args is None:
- raise ValueError("触发器缺少参数")
-
- # 异步函数必须使用默认执行器
- if iscoroutinefunction(job_func):
- job_executor = 'default'
-
- # 4. 创建触发器
- if job_info.trigger == 'date':
- trigger = DateTrigger(run_date=job_info.trigger_args)
- elif job_info.trigger == 'interval':
- # 将传入的 interval 表达式拆分为不同的字段
- fields = job_info.trigger_args.strip().split()
- if len(fields) != 5:
- raise ValueError("无效的 interval 表达式")
- second, minute, hour, day, week = tuple([int(field) if field != '*' else 0 for field in fields])
- # 秒、分、时、天、周(* * * * 1)
- trigger = IntervalTrigger(
- weeks=week,
- days=day,
- hours=hour,
- minutes=minute,
- seconds=second,
- start_date=job_info.start_date,
- end_date=job_info.end_date,
- timezone='Asia/Shanghai',
- jitter=None
- )
- elif job_info.trigger == 'cron':
- # 秒、分、时、天、月、星期几、年 ()
- fields = job_info.trigger_args.strip().split()
- if len(fields) not in (6, 7):
- raise ValueError("无效的 Cron 表达式")
- if not CronUtil.validate_cron_expression(job_info.trigger_args):
- raise ValueError(f'定时任务{job_info.name}, Cron表达式不正确')
- parsed_fields = [None if field in ('*', '?') else field for field in fields]
- if len(fields) == 6:
- parsed_fields.append(None)
- second, minute, hour, day, month, day_of_week, year = tuple(parsed_fields)
- trigger = CronTrigger(
- second=second,
- minute=minute,
- hour=hour,
- day=day,
- month=month,
- day_of_week=day_of_week,
- year=year,
- start_date=job_info.start_date,
- end_date=job_info.end_date,
- timezone='Asia/Shanghai'
- )
- else:
- raise ValueError("无效的 trigger 触发器")
- # 5. 添加任务(使用包装器函数)
- job = scheduler.add_job(
- func=cls._task_wrapper,
- trigger=trigger,
- args=[str(job_info.id), job_func] + (str(job_info.args).split(',') if job_info.args else []),
- kwargs=json.loads(job_info.kwargs) if job_info.kwargs else {},
- id=str(job_info.id),
- name=job_info.name,
- coalesce=job_info.coalesce,
- max_instances=1, # 确保只有一个实例执行
- jobstore=job_info.jobstore,
- executor=job_executor,
- )
- log.info(f"任务 {job_info.id} 添加到 {job_info.jobstore} 存储器成功")
- return job
- except ModuleNotFoundError:
- raise ValueError(f"未找到该模块:{module_path}")
- except AttributeError:
- raise ValueError(f"未找到该模块下的方法:{func_name}")
- except Exception as e:
- raise CustomException(msg=f"添加任务失败: {str(e)}")
- @classmethod
- def remove_job(cls, job_id: str | int) -> None:
- """
- 根据任务ID删除调度任务。
-
- 参数:
- - job_id (str | int): 任务ID。
-
- 返回:
- - None
- """
- query_job = cls.get_job(job_id=str(job_id))
- if query_job:
- scheduler.remove_job(job_id=str(job_id))
- @classmethod
- def clear_jobs(cls) -> None:
- """
- 删除所有调度任务。
-
- 返回:
- - None
- """
- scheduler.remove_all_jobs()
- @classmethod
- def modify_job(cls, job_id: str | int) -> Job:
- """
- 更新指定任务的配置(运行中的任务下次执行生效)。
-
- 参数:
- - job_id (str | int): 任务ID。
-
- 返回:
- - Job: 更新后的任务对象。
-
- 异常:
- - CustomException: 当任务不存在时抛出。
- """
- query_job = cls.get_job(job_id=str(job_id))
- if not query_job:
- raise CustomException(msg=f"未找到该任务:{job_id}")
- return scheduler.modify_job(job_id=str(job_id))
- @classmethod
- def pause_job(cls, job_id: str | int) -> None:
- """
- 暂停指定任务(仅运行中可暂停,已终止不可)。
- 参数:
- - job_id (str | int): 任务ID。
- 返回:
- - None
- 异常:
- - ValueError: 当任务不存在时抛出。
- """
- query_job = cls.get_job(job_id=str(job_id))
- if not query_job:
- raise ValueError(f"未找到该任务:{job_id}")
- scheduler.pause_job(job_id=str(job_id))
- @classmethod
- def resume_job(cls, job_id: str | int) -> None:
- """
- 恢复指定任务(仅暂停中可恢复,已终止不可)。
- 参数:
- - job_id (str | int): 任务ID。
- 返回:
- - None
- 异常:
- - ValueError: 当任务不存在时抛出。
- """
- query_job = cls.get_job(job_id=str(job_id))
- if not query_job:
- raise ValueError(f"未找到该任务:{job_id}")
- scheduler.resume_job(job_id=str(job_id))
- @classmethod
- def reschedule_job(cls, job_id: str | int, trigger=None, **trigger_args) -> Job | None:
- """
- 重启指定任务的触发器。
- 参数:
- - job_id (str | int): 任务ID。
- - trigger: 触发器类型
- - **trigger_args: 触发器参数
- 返回:
- - Job: 更新后的任务对象
- 异常:
- - CustomException: 当任务不存在时抛出。
- """
- query_job = cls.get_job(job_id=str(job_id))
- if not query_job:
- raise CustomException(msg=f"未找到该任务:{job_id}")
-
- # 如果没有提供新的触发器,则使用现有触发器
- if trigger is None:
- # 获取当前任务的触发器配置
- current_trigger = query_job.trigger
- # 重新调度任务,使用当前的触发器
- return scheduler.reschedule_job(job_id=str(job_id), trigger=current_trigger)
- else:
- # 使用新提供的触发器
- return scheduler.reschedule_job(job_id=str(job_id), trigger=trigger, **trigger_args)
-
- @classmethod
- def get_single_job_status(cls, job_id: str | int) -> str:
- """
- 获取单个任务的当前状态。
- 参数:
- - job_id (str | int): 任务ID
- 返回:
- - str: 任务状态('running' | 'paused' | 'stopped' | 'unknown')
- """
- job = cls.get_job(job_id=str(job_id))
- if not job:
- return 'unknown'
-
- # 检查任务是否在暂停列表中
- if job_id in scheduler._jobstores[job._jobstore_alias]._paused_jobs:
- return 'paused'
-
- # 检查调度器状态
- if scheduler.state == 0: # STATE_STOPPED
- return 'stopped'
-
- return 'running'
- @classmethod
- def print_jobs(cls,jobstore: Any | None = None, out: Any | None = None):
- """
- 打印调度任务列表。
-
- 参数:
- - jobstore (Any | None): 任务存储别名。
- - out (Any | None): 输出目标。
-
- 返回:
- - None
- """
- scheduler.print_jobs(jobstore=jobstore, out=out)
- @classmethod
- def get_job_status(cls) -> str:
- """
- 获取调度器当前状态。
-
- 返回:
- - str: 状态字符串('stopped' | 'running' | 'paused' | 'unknown')。
- """
- #: constant indicating a scheduler's stopped state
- STATE_STOPPED = 0
- #: constant indicating a scheduler's running state (started and processing jobs)
- STATE_RUNNING = 1
- #: constant indicating a scheduler's paused state (started but not processing jobs)
- STATE_PAUSED = 2
- if scheduler.state == STATE_STOPPED:
- return 'stopped'
- elif scheduler.state == STATE_RUNNING:
- return 'running'
- elif scheduler.state == STATE_PAUSED:
- return 'paused'
- else:
- return 'unknown'
|