service.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # -*- coding: utf-8 -*-
  2. import io
  3. from typing import Any
  4. from fastapi import UploadFile
  5. import pandas as pd
  6. from app.core.base_schema import BatchSetAvailable
  7. from app.core.exceptions import CustomException
  8. from app.utils.excel_util import ExcelUtil
  9. from app.core.logger import log
  10. from app.api.v1.module_system.auth.schema import AuthSchema
  11. from .schema import DemoCreateSchema, DemoUpdateSchema, DemoOutSchema, DemoQueryParam
  12. from .crud import DemoCRUD
  13. class DemoService:
  14. """
  15. 示例管理模块服务层
  16. """
  17. @classmethod
  18. async def detail_service(cls, auth: AuthSchema, id: int) -> dict:
  19. """
  20. 详情
  21. 参数:
  22. - auth (AuthSchema): 认证信息模型
  23. - id (int): 示例ID
  24. 返回:
  25. - dict: 示例模型实例字典
  26. """
  27. obj = await DemoCRUD(auth).get_by_id_crud(id=id)
  28. if not obj:
  29. raise CustomException(msg="该数据不存在")
  30. return DemoOutSchema.model_validate(obj).model_dump()
  31. @classmethod
  32. async def list_service(cls, auth: AuthSchema, search: DemoQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
  33. """
  34. 列表查询
  35. 参数:
  36. - auth (AuthSchema): 认证信息模型
  37. - search (DemoQueryParam | None): 查询参数
  38. - order_by (list[dict[str, str]] | None): 排序参数
  39. 返回:
  40. - list[dict]: 示例模型实例字典列表
  41. """
  42. search_dict = search.__dict__ if search else None
  43. obj_list = await DemoCRUD(auth).list_crud(search=search_dict, order_by=order_by)
  44. return [DemoOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  45. @classmethod
  46. async def page_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: DemoQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> dict:
  47. """
  48. 分页查询
  49. 参数:
  50. - auth (AuthSchema): 认证信息模型
  51. - page_no (int): 页码
  52. - page_size (int): 每页数量
  53. - search (DemoQueryParam | None): 查询参数
  54. - order_by (list[dict[str, str]] | None): 排序参数
  55. 返回:
  56. - dict: 分页数据
  57. """
  58. search_dict = search.__dict__ if search else {}
  59. order_by_list = order_by or [{'id': 'asc'}]
  60. offset = (page_no - 1) * page_size
  61. result = await DemoCRUD(auth).page_crud(
  62. offset=offset,
  63. limit=page_size,
  64. order_by=order_by_list,
  65. search=search_dict
  66. )
  67. return result
  68. @classmethod
  69. async def create_service(cls, auth: AuthSchema, data: DemoCreateSchema) -> dict:
  70. """
  71. 创建
  72. 参数:
  73. - auth (AuthSchema): 认证信息模型
  74. - data (DemoCreateSchema): 示例创建模型
  75. 返回:
  76. - dict: 示例模型实例字典
  77. """
  78. obj = await DemoCRUD(auth).get(name=data.name)
  79. if obj:
  80. raise CustomException(msg='创建失败,名称已存在')
  81. obj = await DemoCRUD(auth).create_crud(data=data)
  82. return DemoOutSchema.model_validate(obj).model_dump()
  83. @classmethod
  84. async def update_service(cls, auth: AuthSchema, id: int, data: DemoUpdateSchema) -> dict:
  85. """
  86. 更新
  87. 参数:
  88. - auth (AuthSchema): 认证信息模型
  89. - id (int): 示例ID
  90. - data (DemoUpdateSchema): 示例更新模型
  91. 返回:
  92. - dict: 示例模型实例字典
  93. """
  94. # 检查数据是否存在
  95. obj = await DemoCRUD(auth).get_by_id_crud(id=id)
  96. if not obj:
  97. raise CustomException(msg='更新失败,该数据不存在')
  98. # 检查名称是否重复
  99. exist_obj = await DemoCRUD(auth).get(name=data.name)
  100. if exist_obj and exist_obj.id != id:
  101. raise CustomException(msg='更新失败,名称重复')
  102. obj = await DemoCRUD(auth).update_crud(id=id, data=data)
  103. return DemoOutSchema.model_validate(obj).model_dump()
  104. @classmethod
  105. async def delete_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  106. """
  107. 删除
  108. 参数:
  109. - auth (AuthSchema): 认证信息模型
  110. - ids (list[int]): 示例ID列表
  111. 返回:
  112. - None
  113. """
  114. if len(ids) < 1:
  115. raise CustomException(msg='删除失败,删除对象不能为空')
  116. # 检查所有要删除的数据是否存在
  117. for id in ids:
  118. obj = await DemoCRUD(auth).get_by_id_crud(id=id)
  119. if not obj:
  120. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  121. await DemoCRUD(auth).delete_crud(ids=ids)
  122. @classmethod
  123. async def set_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  124. """
  125. 批量设置状态
  126. 参数:
  127. - auth (AuthSchema): 认证信息模型
  128. - data (BatchSetAvailable): 批量设置状态模型
  129. 返回:
  130. - None
  131. """
  132. await DemoCRUD(auth).set_available_crud(ids=data.ids, status=data.status)
  133. @classmethod
  134. async def batch_export_service(cls, obj_list: list[dict[str, Any]]) -> bytes:
  135. """
  136. 批量导出
  137. 参数:
  138. - obj_list (list[dict[str, Any]]): 示例模型实例字典列表
  139. 返回:
  140. - bytes: Excel文件字节流
  141. """
  142. mapping_dict = {
  143. 'id': '编号',
  144. 'name': '名称',
  145. 'status': '状态',
  146. 'description': '备注',
  147. 'created_time': '创建时间',
  148. 'updated_time': '更新时间',
  149. 'created_id': '创建者',
  150. }
  151. # 复制数据并转换状态
  152. data = obj_list.copy()
  153. for item in data:
  154. # 处理状态
  155. item['status'] = '启用' if item.get('status') == '0' else '停用'
  156. # 处理创建者
  157. creator_info = item.get('created_id')
  158. if isinstance(creator_info, dict):
  159. item['created_id'] = creator_info.get('name', '未知')
  160. else:
  161. item['created_id'] = '未知'
  162. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  163. @classmethod
  164. async def batch_import_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  165. """
  166. 批量导入
  167. 参数:
  168. - auth (AuthSchema): 认证信息模型
  169. - file (UploadFile): 上传的Excel文件
  170. - update_support (bool): 是否支持更新存在数据
  171. 返回:
  172. - str: 导入结果信息
  173. """
  174. header_dict = {
  175. '名称': 'name',
  176. '状态': 'status',
  177. '描述': 'description'
  178. }
  179. try:
  180. # 读取Excel文件
  181. contents = await file.read()
  182. df = pd.read_excel(io.BytesIO(contents))
  183. await file.close()
  184. if df.empty:
  185. raise CustomException(msg="导入文件为空")
  186. # 检查表头是否完整
  187. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  188. if missing_headers:
  189. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  190. # 重命名列名
  191. df.rename(columns=header_dict, inplace=True)
  192. # 验证必填字段
  193. required_fields = ['name', 'status']
  194. errors = []
  195. for field in required_fields:
  196. missing_rows = df[df[field].isnull()].index.tolist()
  197. if missing_rows:
  198. field_name = [k for k,v in header_dict.items() if v == field][0]
  199. rows_str = "、".join([str(i+1) for i in missing_rows])
  200. errors.append(f"{field_name}不能为空,第{rows_str}行")
  201. if errors:
  202. raise CustomException(msg=f"导入失败,以下行缺少必要字段:\n{'; '.join(errors)}")
  203. error_msgs = []
  204. success_count = 0
  205. count = 0
  206. # 处理每一行数据
  207. for index, row in df.iterrows():
  208. count += 1
  209. try:
  210. # 数据转换前的类型检查
  211. try:
  212. status = True if row['status'] == '正常' else False
  213. except ValueError:
  214. error_msgs.append(f"第{count}行: 状态必须是'正常'或'停用'")
  215. continue
  216. # 构建用户数据
  217. data = {
  218. "name": str(row['name']),
  219. "status": status,
  220. "description": str(row['description']),
  221. }
  222. # 处理用户导入
  223. exists_obj = await DemoCRUD(auth).get(name=data["name"])
  224. if exists_obj:
  225. if update_support:
  226. await DemoCRUD(auth).update(id=exists_obj.id, data=data)
  227. success_count += 1
  228. else:
  229. error_msgs.append(f"第{count}行: 对象 {data['name']} 已存在")
  230. else:
  231. await DemoCRUD(auth).create(data=data)
  232. success_count += 1
  233. except Exception as e:
  234. error_msgs.append(f"第{count}行: {str(e)}")
  235. continue
  236. # 返回详细的导入结果
  237. result = f"成功导入 {success_count} 条数据"
  238. if error_msgs:
  239. result += "\n错误信息:\n" + "\n".join(error_msgs)
  240. return result
  241. except Exception as e:
  242. log.error(f"批量导入用户失败: {str(e)}")
  243. raise CustomException(msg=f"导入失败: {str(e)}")
  244. @classmethod
  245. async def import_template_download_service(cls) -> bytes:
  246. """
  247. 下载导入模板
  248. 返回:
  249. - bytes: Excel文件字节流
  250. """
  251. header_list = ['名称', '状态', '描述']
  252. selector_header_list = ['状态']
  253. option_list = [{'状态': ['正常', '停用']}]
  254. return ExcelUtil.get_excel_template(
  255. header_list=header_list,
  256. selector_header_list=selector_header_list,
  257. option_list=option_list
  258. )