service.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # -*- coding: utf-8 -*-
  2. from app.core.base_schema import BatchSetAvailable
  3. from app.core.exceptions import CustomException
  4. from app.api.v1.module_system.auth.schema import AuthSchema
  5. from .schema import (
  6. ApplicationCreateSchema,
  7. ApplicationUpdateSchema,
  8. ApplicationOutSchema,
  9. ApplicationQueryParam
  10. )
  11. from .crud import ApplicationCRUD
  12. class ApplicationService:
  13. """
  14. 应用系统管理服务层
  15. """
  16. @classmethod
  17. async def detail_service(cls, auth: AuthSchema, id: int) -> dict:
  18. """
  19. 获取应用详情
  20. 参数:
  21. - auth (AuthSchema): 认证信息模型
  22. - id (int): 应用ID
  23. 返回:
  24. - dict: 应用详情字典
  25. """
  26. obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
  27. if not obj:
  28. raise CustomException(msg='应用不存在')
  29. return ApplicationOutSchema.model_validate(obj).model_dump()
  30. @classmethod
  31. async def list_service(cls, auth: AuthSchema, search: ApplicationQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
  32. """
  33. 获取应用列表
  34. 参数:
  35. - auth (AuthSchema): 认证信息模型
  36. - search (ApplicationQueryParam | None): 查询参数模型
  37. - order_by (list[dict[str, str]] | None): 排序参数,支持字符串或字典列表
  38. 返回:
  39. - list[dict]: 应用详情字典列表
  40. """
  41. # 过滤空值
  42. search_dict = search.__dict__ if search else None
  43. obj_list = await ApplicationCRUD(auth).list_crud(search=search_dict, order_by=order_by)
  44. return [ApplicationOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  45. @classmethod
  46. async def create_service(cls, auth: AuthSchema, data: ApplicationCreateSchema) -> dict:
  47. """
  48. 创建应用
  49. 参数:
  50. - auth (AuthSchema): 认证信息模型
  51. - data (ApplicationCreateSchema): 应用创建模型
  52. 返回:
  53. - Dict: 应用详情字典
  54. """
  55. # 检查名称是否重复
  56. obj = await ApplicationCRUD(auth).get(name=data.name)
  57. if obj:
  58. raise CustomException(msg='创建失败,应用名称已存在')
  59. obj = await ApplicationCRUD(auth).create_crud(data=data)
  60. return ApplicationOutSchema.model_validate(obj).model_dump()
  61. @classmethod
  62. async def update_service(cls, auth: AuthSchema, id: int, data: ApplicationUpdateSchema) -> dict:
  63. """
  64. 更新应用
  65. 参数:
  66. - auth (AuthSchema): 认证信息模型
  67. - id (int): 应用ID
  68. - data (ApplicationUpdateSchema): 应用更新模型
  69. 返回:
  70. - Dict: 应用详情字典
  71. """
  72. obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
  73. if not obj:
  74. raise CustomException(msg='更新失败,该应用不存在')
  75. # 检查名称重复
  76. exist_obj = await ApplicationCRUD(auth).get(name=data.name)
  77. if exist_obj and exist_obj.id != id:
  78. raise CustomException(msg='更新失败,应用名称重复')
  79. obj = await ApplicationCRUD(auth).update_crud(id=id, data=data)
  80. return ApplicationOutSchema.model_validate(obj).model_dump()
  81. @classmethod
  82. async def delete_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  83. """
  84. 删除应用
  85. 参数:
  86. - auth (AuthSchema): 认证信息模型
  87. - ids (list[int]): 应用ID列表
  88. 返回:
  89. - None
  90. """
  91. if len(ids) < 1:
  92. raise CustomException(msg='删除失败,删除对象不能为空')
  93. for id in ids:
  94. obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
  95. if not obj:
  96. raise CustomException(msg=f'删除失败,应用 {id} 不存在')
  97. await ApplicationCRUD(auth).delete_crud(ids=ids)
  98. @classmethod
  99. async def set_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  100. """
  101. 批量设置应用状态
  102. 参数:
  103. - auth (AuthSchema): 认证信息模型
  104. - data (BatchSetAvailable): 批量设置应用状态模型
  105. 返回:
  106. - None
  107. """
  108. await ApplicationCRUD(auth).set_available_crud(ids=data.ids, status=data.status)