| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- # -*- coding: utf-8 -*-
- from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
- from fastapi import Query
- from app.core.base_schema import BaseSchema, UserBySchema
- from app.core.validator import DateTimeStr, datetime_validator
- class JobCreateSchema(BaseModel):
- """
- 定时任务调度表对应pydantic模型
- """
- name: str = Field(..., max_length=64, description='任务名称')
- func: str = Field(..., description='任务函数')
- trigger: str = Field(..., description='触发器:控制此作业计划的 trigger 对象')
- args: str | None = Field(default=None, description='位置参数')
- kwargs: str | None = Field(default=None, description='关键字参数')
- coalesce: bool | None = Field(..., description='是否合并运行:是否在多个运行时间到期时仅运行作业一次')
- max_instances: int | None = Field(default=1, ge=1, description='最大实例数:允许的最大并发执行实例数')
- jobstore: str | None = Field(..., max_length=64, description='任务存储')
- executor: str | None = Field(..., max_length=64, description='任务执行器:将运行此作业的执行程序的名称')
- trigger_args: str | None = Field(default=None, description='触发器参数')
- start_date: str | None = Field(default=None, description='开始时间')
- end_date: str | None = Field(default=None, description='结束时间')
- description: str | None = Field(default=None, max_length=255, description='描述')
- status: str = Field(default='0', description='任务状态:启动,停止')
- @field_validator('trigger')
- @classmethod
- def _validate_trigger(cls, v: str) -> str:
- allowed = {'cron', 'interval', 'date'}
- v = v.strip()
- if v not in allowed:
- raise ValueError('触发器必须为 cron/interval/date')
- return v
- @model_validator(mode='after')
- def _validate_dates(self):
- """跨字段校验:结束时间不得早于开始时间。"""
- if self.start_date and self.end_date:
- try:
- start = datetime_validator(self.start_date)
- end = datetime_validator(self.end_date)
- except Exception:
- raise ValueError('时间格式必须为 YYYY-MM-DD HH:MM:SS')
- if end < start:
- raise ValueError('结束时间不能早于开始时间')
- return self
- class JobUpdateSchema(JobCreateSchema):
- """定时任务更新模型"""
- ...
-
- class JobOutSchema(JobCreateSchema, BaseSchema, UserBySchema):
- """定时任务响应模型"""
- model_config = ConfigDict(from_attributes=True)
- ...
- class JobLogCreateSchema(BaseModel):
- """
- 定时任务调度日志表对应pydantic模型
- """
- model_config = ConfigDict(from_attributes=True)
- job_name: str = Field(..., description='任务名称')
- job_group: str | None = Field(default=None, description='任务组名')
- job_executor: str | None = Field(default=None, description='任务执行器')
- invoke_target: str | None = Field(default=None, description='调用目标字符串')
- job_args: str | None = Field(default=None, description='位置参数')
- job_kwargs: str | None = Field(default=None, description='关键字参数')
- job_trigger: str | None = Field(default=None, description='任务触发器')
- job_message: str | None = Field(default=None, description='日志信息')
- exception_info: str | None = Field(default=None, description='异常信息')
- status: str = Field(default='0', description='任务状态:正常,失败')
- description: str | None = Field(default=None, max_length=255, description='描述')
- created_time: DateTimeStr | None = Field(default=None, description='创建时间')
- updated_time: DateTimeStr | None = Field(default=None, description='更新时间')
- class JobLogUpdateSchema(JobLogCreateSchema):
- """定时任务调度日志表更新模型"""
- ...
- id: int | None = Field(default=None, description='任务日志ID')
- class JobLogOutSchema(JobLogUpdateSchema, BaseSchema, UserBySchema):
- """定时任务调度日志表响应模型"""
- model_config = ConfigDict(from_attributes=True)
- ...
- class JobQueryParam:
- """定时任务查询参数"""
- def __init__(
- self,
- name: str | None = Query(None, description="任务名称"),
- status: str | None = Query(None, description="状态: 启动,停止"),
- created_time: list[DateTimeStr] | None = Query(None, description="创建时间范围", examples=["2025-01-01 00:00:00", "2025-12-31 23:59:59"]),
- updated_time: list[DateTimeStr] | None = Query(None, description="更新时间范围", examples=["2025-01-01 00:00:00", "2025-12-31 23:59:59"]),
- created_id: int | None = Query(None, description="创建人"),
- updated_id: int | None = Query(None, description="更新人"),
- ) -> None:
-
- # 模糊查询字段
- self.name = ("like", f"%{name}%") if name else None
-
- # 精确查询字段
- self.created_id = created_id
- self.updated_id = updated_id
- self.status = status
-
- # 时间范围查询
- if created_time and len(created_time) == 2:
- self.created_time = ("between", (created_time[0], created_time[1]))
- if updated_time and len(updated_time) == 2:
- self.updated_time = ("between", (updated_time[0], updated_time[1]))
- class JobLogQueryParam:
- """定时任务查询参数"""
- def __init__(
- self,
- job_id: int | None = Query(None, description="定时任务ID"),
- job_name: str | None = Query(None, description="任务名称"),
- status: str | None = Query(None, description="状态: 正常,失败"),
- created_time: list[DateTimeStr] | None = Query(None, description="创建时间范围", examples=["2025-01-01 00:00:00", "2025-12-31 23:59:59"]),
- updated_time: list[DateTimeStr] | None = Query(None, description="更新时间范围", examples=["2025-01-01 00:00:00", "2025-12-31 23:59:59"]),
- ) -> None:
- # 定时任务ID查询
- self.job_id = job_id
- # 模糊查询字段
- self.job_name = ("like", job_name)
- # 精确查询字段
- self.status = status
- # 时间范围查询
- if created_time and len(created_time) == 2:
- self.created_time = ("between", (created_time[0], created_time[1]))
- if updated_time and len(updated_time) == 2:
- self.updated_time = ("between", (updated_time[0], updated_time[1]))
|