| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- # -*- coding: utf-8 -*-
- from app.core.base_schema import BatchSetAvailable
- from app.core.exceptions import CustomException
- from app.api.v1.module_system.auth.schema import AuthSchema
- from .schema import (
- ApplicationCreateSchema,
- ApplicationUpdateSchema,
- ApplicationOutSchema,
- ApplicationQueryParam
- )
- from .crud import ApplicationCRUD
- class ApplicationService:
- """
- 应用系统管理服务层
- """
-
- @classmethod
- async def detail_service(cls, auth: AuthSchema, id: int) -> dict:
- """
- 获取应用详情
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 应用ID
-
- 返回:
- - dict: 应用详情字典
- """
- obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
- if not obj:
- raise CustomException(msg='应用不存在')
- return ApplicationOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def list_service(cls, auth: AuthSchema, search: ApplicationQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
- """
- 获取应用列表
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- - search (ApplicationQueryParam | None): 查询参数模型
- - order_by (list[dict[str, str]] | None): 排序参数,支持字符串或字典列表
-
- 返回:
- - list[dict]: 应用详情字典列表
- """
- # 过滤空值
- search_dict = search.__dict__ if search else None
- obj_list = await ApplicationCRUD(auth).list_crud(search=search_dict, order_by=order_by)
- return [ApplicationOutSchema.model_validate(obj).model_dump() for obj in obj_list]
-
- @classmethod
- async def create_service(cls, auth: AuthSchema, data: ApplicationCreateSchema) -> dict:
- """
- 创建应用
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (ApplicationCreateSchema): 应用创建模型
-
- 返回:
- - Dict: 应用详情字典
- """
- # 检查名称是否重复
- obj = await ApplicationCRUD(auth).get(name=data.name)
- if obj:
- raise CustomException(msg='创建失败,应用名称已存在')
-
- obj = await ApplicationCRUD(auth).create_crud(data=data)
- return ApplicationOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def update_service(cls, auth: AuthSchema, id: int, data: ApplicationUpdateSchema) -> dict:
- """
- 更新应用
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- - id (int): 应用ID
- - data (ApplicationUpdateSchema): 应用更新模型
-
- 返回:
- - Dict: 应用详情字典
- """
- obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
- if not obj:
- raise CustomException(msg='更新失败,该应用不存在')
-
- # 检查名称重复
- exist_obj = await ApplicationCRUD(auth).get(name=data.name)
- if exist_obj and exist_obj.id != id:
- raise CustomException(msg='更新失败,应用名称重复')
-
- obj = await ApplicationCRUD(auth).update_crud(id=id, data=data)
- return ApplicationOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def delete_service(cls, auth: AuthSchema, ids: list[int]) -> None:
- """
- 删除应用
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- - ids (list[int]): 应用ID列表
-
- 返回:
- - None
- """
- if len(ids) < 1:
- raise CustomException(msg='删除失败,删除对象不能为空')
- for id in ids:
- obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
- if not obj:
- raise CustomException(msg=f'删除失败,应用 {id} 不存在')
- await ApplicationCRUD(auth).delete_crud(ids=ids)
-
- @classmethod
- async def set_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
- """
- 批量设置应用状态
-
- 参数:
- - auth (AuthSchema): 认证信息模型
- - data (BatchSetAvailable): 批量设置应用状态模型
-
- 返回:
- - None
- """
- await ApplicationCRUD(auth).set_available_crud(ids=data.ids, status=data.status)
|