# -*- coding: utf-8 -*- from typing import Sequence from app.core.base_crud import CRUDBase from app.api.v1.module_system.auth.schema import AuthSchema from .model import BizCraneModel from .schema import BizCraneCreateSchema, BizCraneUpdateSchema, BizCraneOutSchema class BizCraneCRUD(CRUDBase[BizCraneModel, BizCraneCreateSchema, BizCraneUpdateSchema]): """行车信息数据层""" def __init__(self, auth: AuthSchema) -> None: """ 初始化CRUD数据层 参数: - auth (AuthSchema): 认证信息模型 """ super().__init__(model=BizCraneModel, auth=auth) async def get_by_id_crane_crud(self, id: int, preload: list | None = None) -> BizCraneModel | None: """ 详情 参数: - id (int): 对象ID - preload (list | None): 预加载关系,未提供时使用模型默认项 返回: - BizCraneModel | None: 模型实例或None """ return await self.get(id=id, preload=preload) async def get_by_id_crane_crud_for_no(self, crane_no: str, preload: list | None = None) -> BizCraneModel | None: """ 详情 参数: - id (int): 对象ID - preload (list | None): 预加载关系,未提供时使用模型默认项 返回: - BizCraneModel | None: 模型实例或None """ return await self.get(crane_no=crane_no, preload=preload) async def list_crane_crud(self, search: dict | None = None, order_by: list[dict] | None = None, preload: list | None = None) -> Sequence[BizCraneModel]: """ 列表查询 参数: - search (dict | None): 查询参数 - order_by (list[dict] | None): 排序参数,未提供时使用模型默认项 - preload (list | None): 预加载关系,未提供时使用模型默认项 返回: - Sequence[BizCraneModel]: 模型实例序列 """ return await self.list(search=search, order_by=order_by, preload=preload) async def create_crane_crud(self, data: BizCraneCreateSchema) -> BizCraneModel | None: """ 创建 参数: - data (BizCraneCreateSchema): 创建模型 返回: - BizCraneModel | None: 模型实例或None """ return await self.create(data=data) async def update_crane_crud(self, id: int, data: BizCraneUpdateSchema) -> BizCraneModel | None: """ 更新 参数: - id (int): 对象ID - data (BizCraneUpdateSchema): 更新模型 返回: - BizCraneModel | None: 模型实例或None """ return await self.update(id=id, data=data) async def delete_crane_crud(self, ids: list[int]) -> None: """ 批量删除 参数: - ids (list[int]): 对象ID列表 返回: - None """ return await self.delete(ids=ids) async def set_available_crane_crud(self, ids: list[int], status: str) -> None: """ 批量设置可用状态 参数: - ids (list[int]): 对象ID列表 - status (str): 可用状态 返回: - None """ return await self.set(ids=ids, status=status) async def page_crane_crud(self, offset: int, limit: int, order_by: list[dict] | None = None, search: dict | None = None, preload: list | None = None) -> dict: """ 分页查询 参数: - offset (int): 偏移量 - limit (int): 每页数量 - order_by (list[dict] | None): 排序参数,未提供时使用模型默认项 - search (dict | None): 查询参数,未提供时查询所有 - preload (list | None): 预加载关系,未提供时使用模型默认项 返回: - Dict: 分页数据 """ order_by_list = order_by or [{'id': 'asc'}] search_dict = search or {} return await self.page( offset=offset, limit=limit, order_by=order_by_list, search=search_dict, out_schema=BizCraneOutSchema, preload=preload )