service.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. # -*- coding: utf-8 -*-
  2. import io
  3. from typing import Any
  4. from fastapi import UploadFile
  5. import pandas as pd
  6. from app.core.exceptions import CustomException
  7. from app.utils.hash_bcrpy_util import PwdUtil
  8. from app.core.base_schema import BatchSetAvailable, UploadResponseSchema
  9. from app.core.logger import log
  10. from app.utils.common_util import traversal_to_tree
  11. from app.utils.excel_util import ExcelUtil
  12. from app.utils.upload_util import UploadUtil
  13. from ..position.crud import PositionCRUD
  14. from ..role.crud import RoleCRUD
  15. from ..menu.crud import MenuCRUD
  16. from ..dept.crud import DeptCRUD
  17. from ..auth.schema import AuthSchema
  18. from ..menu.schema import MenuOutSchema
  19. from .crud import UserCRUD
  20. from .schema import (
  21. CurrentUserUpdateSchema,
  22. ResetPasswordSchema,
  23. UserOutSchema,
  24. UserCreateSchema,
  25. UserUpdateSchema,
  26. UserChangePasswordSchema,
  27. UserRegisterSchema,
  28. UserForgetPasswordSchema,
  29. UserQueryParam
  30. )
  31. class UserService:
  32. """用户模块服务层"""
  33. @classmethod
  34. async def get_detail_by_id_service(cls, auth: AuthSchema, id: int) -> dict:
  35. """
  36. 根据ID获取用户详情
  37. 参数:
  38. - auth (AuthSchema): 认证信息模型
  39. - id (int): 用户ID
  40. 返回:
  41. - dict: 用户详情字典
  42. """
  43. user = await UserCRUD(auth).get_by_id_crud(id=id)
  44. if not user:
  45. raise CustomException(msg="用户不存在")
  46. # 如果用户绑定了部门,则获取部门名称
  47. if user.dept_id:
  48. dept = await DeptCRUD(auth).get_by_id_crud(id=user.dept_id)
  49. UserOutSchema.dept_name = dept.name if dept else None
  50. else:
  51. UserOutSchema.dept_name = None
  52. return UserOutSchema.model_validate(user).model_dump()
  53. @classmethod
  54. async def get_user_list_service(cls, auth: AuthSchema, search: UserQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
  55. """
  56. 获取用户列表
  57. 参数:
  58. - auth (AuthSchema): 认证信息模型
  59. - search (UserQueryParam | None): 查询参数对象。
  60. - order_by (list[dict[str, str]] | None): 排序参数列表。
  61. 返回:
  62. - list[dict]: 用户详情字典列表
  63. """
  64. user_list = await UserCRUD(auth).get_list_crud(search=search.__dict__, order_by=order_by)
  65. user_dict_list = []
  66. for user in user_list:
  67. user_dict = UserOutSchema.model_validate(user).model_dump()
  68. user_dict_list.append(user_dict)
  69. return user_dict_list
  70. @classmethod
  71. async def create_user_service(cls, data: UserCreateSchema, auth: AuthSchema) -> dict:
  72. """
  73. 创建用户
  74. 参数:
  75. - data (UserCreateSchema): 用户创建信息
  76. - auth (AuthSchema): 认证信息模型
  77. 返回:
  78. - dict: 创建后的用户详情字典
  79. """
  80. if not data.username:
  81. raise CustomException(msg="用户名不能为空")
  82. # 检查是否试图创建超级管理员
  83. if data.is_superuser:
  84. raise CustomException(msg='不允许创建超级管理员')
  85. # 检查用户名是否存在
  86. user = await UserCRUD(auth).get_by_username_crud(username=data.username)
  87. if user:
  88. raise CustomException(msg='已存在相同用户名称的账号')
  89. # 检查部门是否存在
  90. if data.dept_id:
  91. dept = await DeptCRUD(auth).get_by_id_crud(id=data.dept_id)
  92. if not dept:
  93. raise CustomException(msg='部门不存在')
  94. # 创建用户
  95. if data.password:
  96. data.password = PwdUtil.set_password_hash(password=data.password)
  97. user_dict = data.model_dump(exclude_unset=True, exclude={"role_ids", "position_ids"})
  98. # 创建用户
  99. new_user = await UserCRUD(auth).create(data=user_dict)
  100. # 设置角色
  101. if data.role_ids and len(data.role_ids) > 0:
  102. await UserCRUD(auth).set_user_roles_crud(user_ids=[new_user.id], role_ids=data.role_ids)
  103. # 设置岗位
  104. if data.position_ids and len(data.position_ids) > 0:
  105. await UserCRUD(auth).set_user_positions_crud(user_ids=[new_user.id], position_ids=data.position_ids)
  106. new_user_dict = UserOutSchema.model_validate(new_user).model_dump()
  107. return new_user_dict
  108. @classmethod
  109. async def update_user_service(cls, id: int, data: UserUpdateSchema, auth: AuthSchema) -> dict:
  110. """
  111. 更新用户
  112. 参数:
  113. - id (int): 用户ID
  114. - data (UserUpdateSchema): 用户更新信息
  115. - auth (AuthSchema): 认证信息模型
  116. 返回:
  117. - Dict: 更新后的用户详情字典
  118. """
  119. if not data.username:
  120. raise CustomException(msg="账号不能为空")
  121. # 检查用户是否存在
  122. user = await UserCRUD(auth).get_by_id_crud(id=id)
  123. if not user:
  124. raise CustomException(msg='用户不存在')
  125. # 检查是否尝试修改超级管理员
  126. if user.is_superuser:
  127. raise CustomException(msg='超级管理员不允许修改')
  128. # 检查用户名是否重复
  129. exist_user = await UserCRUD(auth).get_by_username_crud(username=data.username)
  130. if exist_user and exist_user.id != id:
  131. raise CustomException(msg='已存在相同的账号')
  132. # 新增:检查手机号是否重复
  133. if data.mobile:
  134. exist_mobile_user = await UserCRUD(auth).get_by_mobile_crud(mobile=data.mobile)
  135. if exist_mobile_user and exist_mobile_user.id != id:
  136. raise CustomException(msg='更新失败,手机号已存在')
  137. # 新增:检查邮箱是否重复
  138. if data.email:
  139. exist_email_user = await UserCRUD(auth).get(email=data.email)
  140. if exist_email_user and exist_email_user.id != id:
  141. raise CustomException(msg='更新失败,邮箱已存在')
  142. # 检查部门是否存在且可用
  143. if data.dept_id:
  144. dept = await DeptCRUD(auth).get_by_id_crud(id=data.dept_id)
  145. if not dept:
  146. raise CustomException(msg='部门不存在')
  147. if not dept.status:
  148. raise CustomException(msg='部门已被禁用')
  149. # 更新用户 - 排除不应被修改的字段, 更新不更新密码
  150. user_dict = data.model_dump(exclude_unset=True, exclude={"role_ids", "position_ids", "last_login", "password"})
  151. new_user = await UserCRUD(auth).update(id=id, data=user_dict)
  152. # 更新角色和岗位
  153. if data.role_ids and len(data.role_ids) > 0:
  154. # 检查角色是否都存在且可用
  155. roles = await RoleCRUD(auth).get_list_crud(search={"id": ("in", data.role_ids)})
  156. if len(roles) != len(data.role_ids):
  157. raise CustomException(msg='部分角色不存在')
  158. if not all(role.status for role in roles):
  159. raise CustomException(msg='部分角色已被禁用')
  160. await UserCRUD(auth).set_user_roles_crud(user_ids=[id], role_ids=data.role_ids)
  161. if data.position_ids and len(data.position_ids) > 0:
  162. # 检查岗位是否都存在且可用
  163. positions = await PositionCRUD(auth).get_list_crud(search={"id": ("in", data.position_ids)})
  164. if len(positions) != len(data.position_ids):
  165. raise CustomException(msg='部分岗位不存在')
  166. if not all(position.status for position in positions):
  167. raise CustomException(msg='部分岗位已被禁用')
  168. await UserCRUD(auth).set_user_positions_crud(user_ids=[id], position_ids=data.position_ids)
  169. user_dict = UserOutSchema.model_validate(new_user).model_dump()
  170. return user_dict
  171. @classmethod
  172. async def delete_user_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  173. """
  174. 删除用户
  175. 参数:
  176. - auth (AuthSchema): 认证信息模型
  177. - ids (list[int]): 用户ID列表
  178. 返回:
  179. - None
  180. """
  181. if len(ids) < 1:
  182. raise CustomException(msg='删除失败,删除对象不能为空')
  183. for id in ids:
  184. user = await UserCRUD(auth).get_by_id_crud(id=id)
  185. if not user:
  186. raise CustomException(msg="用户不存在")
  187. if user.is_superuser:
  188. raise CustomException(msg="超级管理员不能删除")
  189. if user.status:
  190. raise CustomException(msg="用户已启用,不能删除")
  191. if auth.user and auth.user.id == id:
  192. raise CustomException(msg="不能删除当前登陆用户")
  193. # 删除用户角色关联数据
  194. await UserCRUD(auth).set_user_roles_crud(user_ids=ids, role_ids=[])
  195. # 删除用户岗位关联数据
  196. await UserCRUD(auth).set_user_positions_crud(user_ids=ids, position_ids=[])
  197. # 删除用户
  198. await UserCRUD(auth).delete(ids=ids)
  199. @classmethod
  200. async def get_current_user_info_service(cls, auth: AuthSchema) -> dict:
  201. """
  202. 获取当前用户信息
  203. 参数:
  204. - auth (AuthSchema): 认证信息模型
  205. 返回:
  206. - Dict: 当前用户详情字典
  207. """
  208. # 获取用户基本信息
  209. if not auth.user or not auth.user.id:
  210. raise CustomException(msg="用户不存在")
  211. user = await UserCRUD(auth).get_by_id_crud(id=auth.user.id)
  212. # 获取部门名称
  213. if user and user.dept:
  214. UserOutSchema.dept_name = user.dept.name
  215. user_dict = UserOutSchema.model_validate(user).model_dump()
  216. # 获取菜单权限
  217. if auth.user and auth.user.is_superuser:
  218. # 使用树形结构查询,预加载children关系
  219. menu_all = await MenuCRUD(auth).get_tree_list_crud(search={'type': ('in', [1, 2, 4]), 'status': '0'}, order_by=[{"order": "asc"}])
  220. menus = [MenuOutSchema.model_validate(menu).model_dump() for menu in menu_all]
  221. else:
  222. # 收集用户所有角色的菜单ID,使用列表推导式优化代码
  223. menu_ids = {
  224. menu.id
  225. for role in auth.user.roles or []
  226. for menu in role.menus
  227. if menu.status and menu.type in [1, 2, 4]
  228. }
  229. # 使用树形结构查询,预加载children关系
  230. menus = [
  231. MenuOutSchema.model_validate(menu).model_dump()
  232. for menu in await MenuCRUD(auth).get_tree_list_crud(search={'id': ('in', list(menu_ids))}, order_by=[{"order": "asc"}])
  233. ] if menu_ids else []
  234. user_dict["menus"] = traversal_to_tree(menus)
  235. return user_dict
  236. @classmethod
  237. async def update_current_user_info_service(cls, auth: AuthSchema, data: CurrentUserUpdateSchema) -> dict:
  238. """
  239. 更新当前用户信息
  240. 参数:
  241. - auth (AuthSchema): 认证信息模型
  242. - data (CurrentUserUpdateSchema): 当前用户更新信息
  243. 返回:
  244. - Dict: 更新后的当前用户详情字典
  245. """
  246. if not auth.user or not auth.user.id:
  247. raise CustomException(msg="用户不存在")
  248. user = await UserCRUD(auth).get_by_id_crud(id=auth.user.id)
  249. if not user:
  250. raise CustomException(msg="用户不存在")
  251. if user.is_superuser:
  252. raise CustomException(msg="超级管理员不能修改个人信息")
  253. # 新增:检查手机号是否重复
  254. if data.mobile:
  255. exist_mobile_user = await UserCRUD(auth).get_by_mobile_crud(mobile=data.mobile)
  256. if exist_mobile_user and exist_mobile_user.id != auth.user.id:
  257. raise CustomException(msg='更新失败,手机号已存在')
  258. # 新增:检查邮箱是否重复
  259. if data.email:
  260. exist_email_user = await UserCRUD(auth).get(email=data.email)
  261. if exist_email_user and exist_email_user.id != auth.user.id:
  262. raise CustomException(msg='更新失败,邮箱已存在')
  263. user_update_data = UserUpdateSchema(**data.model_dump())
  264. new_user = await UserCRUD(auth).update(id=auth.user.id, data=user_update_data)
  265. return UserOutSchema.model_validate(new_user).model_dump()
  266. @classmethod
  267. async def set_user_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  268. """
  269. 设置用户状态
  270. 参数:
  271. - auth (AuthSchema): 认证信息模型
  272. - data (BatchSetAvailable): 批量设置用户状态数据
  273. 返回:
  274. - None
  275. """
  276. for id in data.ids:
  277. user = await UserCRUD(auth).get_by_id_crud(id=id)
  278. if not user:
  279. raise CustomException(msg=f"用户ID {id} 不存在")
  280. if user.is_superuser:
  281. raise CustomException(msg="超级管理员状态不能修改")
  282. await UserCRUD(auth).set_available_crud(ids=data.ids, status=data.status)
  283. @classmethod
  284. async def upload_avatar_service(cls, base_url: str, file: UploadFile) -> dict:
  285. """
  286. 上传用户头像
  287. 参数:
  288. - base_url (str): 基础URL
  289. - file (UploadFile): 上传的文件
  290. 返回:
  291. - Dict: 上传头像响应字典
  292. """
  293. filename, filepath, file_url = await UploadUtil.upload_file(file=file, base_url=base_url)
  294. return UploadResponseSchema(
  295. file_path=f'{filepath}',
  296. file_name=filename,
  297. origin_name=file.filename,
  298. file_url=f'{file_url}',
  299. ).model_dump()
  300. @classmethod
  301. async def change_user_password_service(cls, auth: AuthSchema, data: UserChangePasswordSchema) -> dict:
  302. """
  303. 修改用户密码
  304. 参数:
  305. - auth (AuthSchema): 认证信息模型
  306. - data (UserChangePasswordSchema): 用户密码修改数据
  307. 返回:
  308. - Dict: 更新后的当前用户详情字典
  309. """
  310. if not auth.user or not auth.user.id:
  311. raise CustomException(msg="用户不存在")
  312. if not data.old_password or not data.new_password:
  313. raise CustomException(msg='密码不能为空')
  314. # 验证原密码
  315. user = await UserCRUD(auth).get_by_id_crud(id=auth.user.id)
  316. if not user:
  317. raise CustomException(msg="用户不存在")
  318. if not PwdUtil.verify_password(plain_password=data.old_password, password_hash=user.password):
  319. raise CustomException(msg='原密码输入错误')
  320. # 更新密码
  321. new_password_hash = PwdUtil.set_password_hash(password=data.new_password)
  322. new_user = await UserCRUD(auth).change_password_crud(id=user.id, password_hash=new_password_hash)
  323. return UserOutSchema.model_validate(new_user).model_dump()
  324. @classmethod
  325. async def reset_user_password_service(cls, auth: AuthSchema, data: ResetPasswordSchema) -> dict:
  326. """
  327. 重置用户密码
  328. 参数:
  329. - auth (AuthSchema): 认证信息模型
  330. - data (ResetPasswordSchema): 用户密码重置数据
  331. 返回:
  332. - Dict: 更新后的当前用户详情字典
  333. """
  334. if not data.password:
  335. raise CustomException(msg='密码不能为空')
  336. # 验证用户
  337. user = await UserCRUD(auth).get_by_id_crud(id=data.id)
  338. if not user:
  339. raise CustomException(msg="用户不存在")
  340. # 检查是否是超级管理员
  341. if user.is_superuser:
  342. raise CustomException(msg="超级管理员密码不能重置")
  343. # 更新密码
  344. new_password_hash = PwdUtil.set_password_hash(password=data.password)
  345. new_user = await UserCRUD(auth).change_password_crud(id=data.id, password_hash=new_password_hash)
  346. return UserOutSchema.model_validate(new_user).model_dump()
  347. @classmethod
  348. async def register_user_service(cls, auth: AuthSchema, data: UserRegisterSchema) -> dict:
  349. """
  350. 用户注册
  351. 参数:
  352. - auth (AuthSchema): 认证信息模型
  353. - data (UserRegisterSchema): 用户注册数据
  354. 返回:
  355. - Dict: 注册后的用户详情字典
  356. """
  357. # 检查用户名是否存在
  358. username_ok = await UserCRUD(auth).get_by_username_crud(username=data.username)
  359. if username_ok:
  360. raise CustomException(msg='账号已存在')
  361. data.password = PwdUtil.set_password_hash(password=data.password)
  362. data.name = data.username
  363. create_dict = data.model_dump(exclude_unset=True, exclude={"role_ids", "position_ids"})
  364. # 设置默认用户类型为普通用户
  365. create_dict.setdefault("user_type", "0")
  366. # 设置创建人ID
  367. if auth.user and auth.user.id:
  368. create_dict["created_id"] = auth.user.id
  369. result = await UserCRUD(auth).create(data=create_dict)
  370. if data.role_ids:
  371. await UserCRUD(auth).set_user_roles_crud(user_ids=[result.id], role_ids=data.role_ids)
  372. return UserOutSchema.model_validate(result).model_dump()
  373. @classmethod
  374. async def forget_password_service(cls, auth: AuthSchema, data: UserForgetPasswordSchema) -> dict:
  375. """
  376. 用户忘记密码
  377. 参数:
  378. - auth (AuthSchema): 认证信息模型
  379. - data (UserForgetPasswordSchema): 用户忘记密码数据
  380. 返回:
  381. - Dict: 更新后的当前用户详情字典
  382. """
  383. user = await UserCRUD(auth).get_by_username_crud(username=data.username)
  384. if not user:
  385. raise CustomException(msg="用户不存在")
  386. if not user.status:
  387. raise CustomException(msg="用户已停用")
  388. # 检查是否是超级管理员
  389. if user.is_superuser:
  390. raise CustomException(msg="超级管理员密码不能重置")
  391. new_password_hash = PwdUtil.set_password_hash(password=data.new_password)
  392. new_user = await UserCRUD(auth).forget_password_crud(id=user.id, password_hash=new_password_hash)
  393. return UserOutSchema.model_validate(new_user).model_dump()
  394. @classmethod
  395. async def batch_import_user_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  396. """
  397. 批量导入用户
  398. 参数:
  399. - auth (AuthSchema): 认证信息模型
  400. - file (UploadFile): 上传的Excel文件
  401. - update_support (bool, optional): 是否支持更新已存在用户. 默认值为False.
  402. 返回:
  403. - str: 导入结果消息
  404. """
  405. header_dict = {
  406. '部门编号': 'dept_id',
  407. '用户名': 'username',
  408. '名称': 'name',
  409. '邮箱': 'email',
  410. '手机号': 'mobile',
  411. '性别': 'gender',
  412. '状态': 'status'
  413. }
  414. try:
  415. # 读取Excel文件
  416. contents = await file.read()
  417. df = pd.read_excel(io.BytesIO(contents))
  418. await file.close()
  419. if df.empty:
  420. raise CustomException(msg="导入文件为空")
  421. # 检查表头是否完整
  422. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  423. if missing_headers:
  424. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  425. # 重命名列名
  426. df.rename(columns=header_dict, inplace=True)
  427. # 验证必填字段
  428. required_fields = ['username', 'name', 'dept_id']
  429. for field in required_fields:
  430. missing_rows = df[df[field].isnull()].index.tolist()
  431. raise CustomException(msg=f"{[k for k,v in header_dict.items() if v == field][0]}不能为空,第{[i+1 for i in missing_rows]}行")
  432. error_msgs = []
  433. success_count = 0
  434. count = 0
  435. # 处理每一行数据
  436. for index, row in df.iterrows():
  437. try:
  438. count = count + 1
  439. # 数据转换
  440. gender = 1 if row['gender'] == '男' else (2 if row['gender'] == '女' else 1)
  441. status = True if row['status'] == '正常' else False
  442. # 构建用户数据
  443. user_data = {
  444. "username": str(row['username']).strip(),
  445. "name": str(row['name']).strip(),
  446. "email": str(row['email']).strip(),
  447. "mobile": str(row['mobile']).strip(),
  448. "gender": gender,
  449. "status": status,
  450. "dept_id": int(row['dept_id']),
  451. "password": PwdUtil.set_password_hash(password="123456") # 设置默认密码
  452. }
  453. # 处理用户导入
  454. exists_user = await UserCRUD(auth).get_by_username_crud(username=user_data["username"])
  455. if exists_user:
  456. # 检查是否是超级管理员
  457. if exists_user.is_superuser:
  458. error_msgs.append(f"第{count}行: 超级管理员不允许修改")
  459. continue
  460. if update_support:
  461. user_update_data = UserUpdateSchema(**user_data)
  462. await UserCRUD(auth).update(id=exists_user.id, data=user_update_data)
  463. success_count += 1
  464. else:
  465. error_msgs.append(f"第{count}行: 用户 {user_data['username']} 已存在")
  466. else:
  467. user_create_data = UserCreateSchema(**user_data)
  468. await UserCRUD(auth).create(data=user_create_data)
  469. success_count += 1
  470. except Exception as e:
  471. error_msgs.append(f"第{count}行: 异常{str(e)}")
  472. continue
  473. # 返回详细的导入结果
  474. result = f"成功导入 {success_count} 条数据"
  475. if error_msgs:
  476. result += "\n错误信息:\n" + "\n".join(error_msgs)
  477. return result
  478. except Exception as e:
  479. log.error(f"批量导入用户失败: {str(e)}")
  480. raise CustomException(msg=f"导入失败: {str(e)}")
  481. @classmethod
  482. async def get_import_template_user_service(cls) -> bytes:
  483. """
  484. 获取用户导入模板
  485. 返回:
  486. - bytes: Excel文件字节流
  487. """
  488. header_list = ['部门编号', '用户名', '名称', '邮箱', '手机号', '性别', '状态']
  489. selector_header_list = ['性别', '状态']
  490. option_list = [{'性别': ['男', '女', '未知']}, {'状态': ['正常', '停用']}]
  491. return ExcelUtil.get_excel_template(
  492. header_list=header_list,
  493. selector_header_list=selector_header_list,
  494. option_list=option_list
  495. )
  496. @classmethod
  497. async def export_user_list_service(cls, user_list: list[dict[str, Any]]) -> bytes:
  498. """
  499. 导出用户列表为Excel文件
  500. 参数:
  501. - user_list (List[Dict[str, Any]]): 用户列表
  502. 返回:
  503. - bytes: Excel文件字节流
  504. """
  505. if not user_list:
  506. raise CustomException(msg="没有数据可导出")
  507. # 定义字段映射
  508. mapping_dict = {
  509. 'id': '用户编号',
  510. 'avatar': '头像',
  511. 'username': '用户名称',
  512. 'name': '用户昵称',
  513. 'dept_name': '部门',
  514. 'email': '邮箱',
  515. 'mobile': '手机号',
  516. 'gender': '性别',
  517. 'status': '状态',
  518. 'is_superuser': '是否超级管理员',
  519. 'last_login': '最后登录时间',
  520. 'description': '备注',
  521. 'created_time': '创建时间',
  522. 'updated_time': '更新时间',
  523. 'updated_id': '更新者ID',
  524. }
  525. # 复制数据并转换
  526. # creator = {'id': 1, 'name': '管理员', 'username': 'admin'}
  527. data = user_list.copy()
  528. for item in data:
  529. item['status'] = '启用' if item.get('status') == "0" else '停用'
  530. gender = item.get('gender')
  531. item['gender'] = '男' if gender == '1' else ('女' if gender == '2' else '未知')
  532. item['is_superuser'] = '是' if item.get('is_superuser') else '否'
  533. item['creator'] = item.get('creator', {}).get('name', '未知') if isinstance(item.get('creator'), dict) else '未知'
  534. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)