| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # -*- coding: utf-8 -*-
- from typing import Sequence
- from app.core.base_crud import CRUDBase
- from app.api.v1.module_system.auth.schema import AuthSchema
- from .model import BizCraneModel
- from .schema import BizCraneCreateSchema, BizCraneUpdateSchema, BizCraneOutSchema
- class BizCraneCRUD(CRUDBase[BizCraneModel, BizCraneCreateSchema, BizCraneUpdateSchema]):
- """行车信息数据层"""
- def __init__(self, auth: AuthSchema) -> None:
- """
- 初始化CRUD数据层
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- """
- super().__init__(model=BizCraneModel, auth=auth)
- async def get_by_id_crane_crud(self, id: int, preload: list | None = None) -> BizCraneModel | None:
- """
- 详情
-
- 参数:
- - id (int): 对象ID
- - preload (list | None): 预加载关系,未提供时使用模型默认项
-
- 返回:
- - BizCraneModel | None: 模型实例或None
- """
- return await self.get(id=id, preload=preload)
-
- async def list_crane_crud(self, search: dict | None = None, order_by: list[dict] | None = None, preload: list | None = None) -> Sequence[BizCraneModel]:
- """
- 列表查询
-
- 参数:
- - search (dict | None): 查询参数
- - order_by (list[dict] | None): 排序参数,未提供时使用模型默认项
- - preload (list | None): 预加载关系,未提供时使用模型默认项
-
- 返回:
- - Sequence[BizCraneModel]: 模型实例序列
- """
- return await self.list(search=search, order_by=order_by, preload=preload)
-
- async def create_crane_crud(self, data: BizCraneCreateSchema) -> BizCraneModel | None:
- """
- 创建
-
- 参数:
- - data (BizCraneCreateSchema): 创建模型
-
- 返回:
- - BizCraneModel | None: 模型实例或None
- """
- return await self.create(data=data)
-
- async def update_crane_crud(self, id: int, data: BizCraneUpdateSchema) -> BizCraneModel | None:
- """
- 更新
-
- 参数:
- - id (int): 对象ID
- - data (BizCraneUpdateSchema): 更新模型
-
- 返回:
- - BizCraneModel | None: 模型实例或None
- """
- return await self.update(id=id, data=data)
-
- async def delete_crane_crud(self, ids: list[int]) -> None:
- """
- 批量删除
-
- 参数:
- - ids (list[int]): 对象ID列表
-
- 返回:
- - None
- """
- return await self.delete(ids=ids)
-
- async def set_available_crane_crud(self, ids: list[int], status: str) -> None:
- """
- 批量设置可用状态
-
- 参数:
- - ids (list[int]): 对象ID列表
- - status (str): 可用状态
-
- 返回:
- - None
- """
- return await self.set(ids=ids, status=status)
-
- async def page_crane_crud(self, offset: int, limit: int, order_by: list[dict] | None = None, search: dict | None = None, preload: list | None = None) -> dict:
- """
- 分页查询
-
- 参数:
- - offset (int): 偏移量
- - limit (int): 每页数量
- - order_by (list[dict] | None): 排序参数,未提供时使用模型默认项
- - search (dict | None): 查询参数,未提供时查询所有
- - preload (list | None): 预加载关系,未提供时使用模型默认项
-
- 返回:
- - Dict: 分页数据
- """
- order_by_list = order_by or [{'id': 'asc'}]
- search_dict = search or {}
- return await self.page(
- offset=offset,
- limit=limit,
- order_by=order_by_list,
- search=search_dict,
- out_schema=BizCraneOutSchema,
- preload=preload
- )
|