permission.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # -*- coding: utf-8 -*-
  2. from typing import Any, Optional, Dict, Set
  3. from pluggy import Result
  4. from sqlalchemy.sql.elements import ColumnElement
  5. from sqlalchemy import select
  6. from app.api.v1.module_system.user.model import UserModel
  7. from app.api.v1.module_system.dept.model import DeptModel
  8. from app.api.v1.module_system.auth.schema import AuthSchema
  9. from app.utils.common_util import get_child_id_map, get_child_recursion
  10. class Permission:
  11. """
  12. 为业务模型提供数据权限过滤功能
  13. """
  14. # 数据权限常量定义,提高代码可读性
  15. DATA_SCOPE_SELF = 1 # 仅本人数据
  16. DATA_SCOPE_DEPT = 2 # 本部门数据
  17. DATA_SCOPE_DEPT_AND_CHILD = 3 # 本部门及以下数据
  18. DATA_SCOPE_ALL = 4 # 全部数据
  19. DATA_SCOPE_CUSTOM = 5 # 自定义数据
  20. def __init__(self, model: Any, auth: AuthSchema):
  21. """
  22. 初始化权限过滤器实例
  23. Args:
  24. db: 数据库会话
  25. model: 数据模型类
  26. current_user: 当前用户对象
  27. auth: 认证信息对象
  28. """
  29. self.model = model
  30. self.auth = auth
  31. self.conditions: list[ColumnElement] = [] # 权限条件列表
  32. async def filter_query(self, query: Any) -> Any:
  33. """
  34. 异步过滤查询对象
  35. Args:
  36. query: SQLAlchemy查询对象
  37. Returns:
  38. 过滤后的查询对象
  39. """
  40. condition = await self.__permission_condition()
  41. return query.where(condition) if condition is not None else query
  42. async def __permission_condition(self) -> ColumnElement | None:
  43. """
  44. 应用数据范围权限隔离
  45. 基于角色的五种数据权限范围过滤
  46. 支持五种权限类型:
  47. 1. 仅本人数据权限 - 只能查看自己创建的数据
  48. 2. 本部门数据权限 - 只能查看同部门的数据
  49. 3. 本部门及以下数据权限 - 可以查看本部门及所有子部门的数据
  50. 4. 全部数据权限 - 可以查看所有数据
  51. 5. 自定义数据权限 - 通过role_dept_relation表定义可访问的部门列表
  52. 权限处理原则:
  53. - 多个角色的权限取并集(最宽松原则)
  54. - 优先级:全部数据 > 部门权限(2、3、5的并集)> 仅本人
  55. - 构造权限过滤表达式,返回None表示不限制
  56. """
  57. # 如果不需要检查数据权限,则不限制
  58. if not self.auth.user:
  59. return None
  60. # 如果检查数据权限为False,则不限制
  61. if not self.auth.check_data_scope:
  62. return None
  63. # 如果模型没有创建人created_id字段,则不限制
  64. if not hasattr(self.model, "created_id"):
  65. return None
  66. # 超级管理员可以查看所有数据
  67. if self.auth.user.is_superuser:
  68. return None
  69. # 如果用户没有角色,则只能查看自己的数据
  70. roles = getattr(self.auth.user, "roles", []) or []
  71. if not roles:
  72. created_id_attr = getattr(self.model, "created_id", None)
  73. if created_id_attr is not None:
  74. return created_id_attr == self.auth.user.id
  75. return None
  76. # 获取用户所有角色的权限范围
  77. data_scopes = set()
  78. custom_dept_ids = set() # 自定义权限(data_scope=5)关联的部门ID集合
  79. for role in roles:
  80. data_scopes.add(role.data_scope)
  81. # 收集自定义权限(data_scope=5)关联的部门ID
  82. if role.data_scope == self.DATA_SCOPE_CUSTOM and hasattr(role, 'depts') and role.depts:
  83. for dept in role.depts:
  84. custom_dept_ids.add(dept.id)
  85. # 权限优先级处理:全部数据权限最高优先级
  86. if self.DATA_SCOPE_ALL in data_scopes:
  87. return None
  88. # 收集所有可访问的部门ID(2、3、5权限的并集)
  89. accessible_dept_ids = set()
  90. user_dept_id = getattr(self.auth.user, "dept_id", None)
  91. # 处理自定义数据权限(5)
  92. if self.DATA_SCOPE_CUSTOM in data_scopes:
  93. accessible_dept_ids.update(custom_dept_ids)
  94. # 处理本部门数据权限(2)
  95. if self.DATA_SCOPE_DEPT in data_scopes:
  96. if user_dept_id is not None:
  97. accessible_dept_ids.add(user_dept_id)
  98. # 处理本部门及以下数据权限(3)
  99. if self.DATA_SCOPE_DEPT_AND_CHILD in data_scopes:
  100. if user_dept_id is not None:
  101. try:
  102. # 查询所有部门并递归获取子部门
  103. dept_sql = select(DeptModel)
  104. dept_result = await self.auth.db.execute(dept_sql)
  105. dept_objs = dept_result.scalars().all()
  106. id_map = get_child_id_map(dept_objs)
  107. # get_child_recursion返回的结果已包含自身ID和所有子部门ID
  108. dept_with_children_ids = get_child_recursion(id=user_dept_id, id_map=id_map)
  109. accessible_dept_ids.update(dept_with_children_ids)
  110. except Exception:
  111. # 查询失败时降级到本部门
  112. accessible_dept_ids.add(user_dept_id)
  113. # 如果有部门权限(2、3、5任一),使用部门过滤
  114. if accessible_dept_ids:
  115. creator_rel = getattr(self.model, "created_by", None)
  116. # 优先使用关系过滤(性能更好)
  117. if creator_rel is not None and hasattr(UserModel, 'dept_id'):
  118. return creator_rel.has(getattr(UserModel, 'dept_id').in_(list(accessible_dept_ids)))
  119. # 降级方案:如果模型没有created_by关系但有created_id,则只能查看自己的数据
  120. else:
  121. created_id_attr = getattr(self.model, "created_id", None)
  122. if created_id_attr is not None:
  123. return created_id_attr == self.auth.user.id
  124. return None
  125. # 处理仅本人数据权限(1)
  126. if self.DATA_SCOPE_SELF in data_scopes:
  127. created_id_attr = getattr(self.model, "created_id", None)
  128. if created_id_attr is not None:
  129. return created_id_attr == self.auth.user.id
  130. return None
  131. # 默认情况:如果用户有角色但没有任何有效权限范围,只能查看自己的数据
  132. created_id_attr = getattr(self.model, "created_id", None)
  133. if created_id_attr is not None:
  134. return created_id_attr == self.auth.user.id
  135. return None
  136. async def filter_query_sql(self, sql: str) -> (str, Optional[Dict[str, Any]]):
  137. """
  138. 异步过滤**原始SQL字符串**,拼接完整权限条件(无逻辑省略)
  139. Args:
  140. sql: 原始SQL字符串(如"SELECT * FROM biz_var_dict")
  141. Returns:
  142. 拼接权限条件后的完整SQL字符串、权限参数字典(防SQL注入,可直接传入execute_raw_sql)
  143. """
  144. # 1. 获取完整的权限SQL条件和参数(无省略,包含所有角色/部门逻辑)
  145. permission_condition, permission_params = await self.__permission_condition_sql()
  146. # 2. 无权限条件,直接返回原始SQL和空参数(保持原有逻辑)
  147. if not permission_condition:
  148. return sql.strip(), None
  149. # 3. 清理原始SQL,保证拼接语法正确(移除末尾分号、多余空格)
  150. cleaned_sql = sql.strip().rstrip("; \n\t") # 处理多种空白字符和分号
  151. # 4. 智能判断原始SQL是否包含WHERE子句,进行拼接(兼容所有场景)
  152. if "WHERE" in cleaned_sql.upper():
  153. # 已有WHERE,用AND拼接权限条件(保留原有查询条件)
  154. final_sql = f"{cleaned_sql} AND {permission_condition}"
  155. else:
  156. # 无WHERE,直接添加WHERE和权限条件
  157. final_sql = f"{cleaned_sql} WHERE {permission_condition}"
  158. # 5. 返回拼接后的完整SQL和权限参数(后续执行可直接使用)
  159. return final_sql, permission_params
  160. async def __permission_condition_sql(self) -> (Optional[str], Optional[Dict[str, Any]]):
  161. """
  162. 应用数据范围权限隔离(完整逻辑,无任何省略)
  163. 基于角色的五种数据权限范围过滤,返回原始SQL条件字符串和参数(防注入)
  164. Returns:
  165. 权限条件字符串(如"created_id = :current_user_id AND dept_id IN (:accessible_dept_ids)")
  166. 权限参数字典(如{"current_user_id": 100, "accessible_dept_ids": [1,2,3]})
  167. """
  168. # 初始化返回结果(权限SQL条件、权限参数)
  169. permission_sql: Optional[str] = None
  170. permission_params: Dict[str, Any] = {}
  171. # --------------- 原有逻辑:前置判断(无任何省略)---------------
  172. # 1. 未登录用户,不限制数据权限
  173. if not self.auth.user:
  174. return permission_sql, permission_params
  175. # 2. 关闭数据权限检查,不限制数据权限
  176. if not self.auth.check_data_scope:
  177. return permission_sql, permission_params
  178. # 3. 模型无created_id字段,无法进行权限过滤,不限制
  179. if not hasattr(self.model, "created_id"):
  180. return permission_sql, permission_params
  181. # 4. 超级管理员,拥有全部数据权限,不限制
  182. if self.auth.user.is_superuser:
  183. return permission_sql, permission_params
  184. # --------------- 原有逻辑:获取用户角色与数据范围(无任何省略)---------------
  185. current_user_id = self.auth.user.id
  186. roles = getattr(self.auth.user, "roles", []) or []
  187. # 5. 无角色用户,仅能查看自己创建的数据(基础权限)
  188. if not roles:
  189. permission_sql = "created_id = :current_user_id"
  190. permission_params["current_user_id"] = current_user_id
  191. return permission_sql, permission_params
  192. # 6. 收集所有角色的权限范围和自定义部门ID
  193. data_scopes: Set[int] = set()
  194. custom_dept_ids: Set[int] = set()
  195. for role in roles:
  196. data_scopes.add(role.data_scope)
  197. # 收集自定义权限(DATA_SCOPE_CUSTOM=5)关联的部门ID
  198. if role.data_scope == self.DATA_SCOPE_CUSTOM and hasattr(role, 'depts') and role.depts:
  199. for dept in role.depts:
  200. custom_dept_ids.add(dept.id)
  201. # 7. 全部数据权限(最高优先级),不限制数据权限
  202. if self.DATA_SCOPE_ALL in data_scopes:
  203. return None, None
  204. # --------------- 收集可访问的部门ID---------------
  205. accessible_dept_ids: Set[int] = set()
  206. user_dept_id = getattr(self.auth.user, "dept_id", None)
  207. # 8. 处理自定义数据权限(DATA_SCOPE_CUSTOM=5)
  208. if self.DATA_SCOPE_CUSTOM in data_scopes:
  209. accessible_dept_ids.update(custom_dept_ids)
  210. # 9. 处理本部门数据权限(DATA_SCOPE_DEPT=3)
  211. if self.DATA_SCOPE_DEPT in data_scopes:
  212. if user_dept_id is not None:
  213. accessible_dept_ids.add(user_dept_id)
  214. # 10. 处理本部门及以下数据权限(DATA_SCOPE_DEPT_AND_CHILD=4)
  215. if self.DATA_SCOPE_DEPT_AND_CHILD in data_scopes:
  216. if user_dept_id is not None:
  217. try:
  218. # 完整查询所有部门,递归获取子部门ID(无省略)
  219. dept_sql = select(DeptModel)
  220. dept_result = await self.auth.db.execute(dept_sql)
  221. dept_objs = dept_result.scalars().all()
  222. id_map = get_child_id_map(dept_objs)
  223. # 递归获取当前部门及所有子部门ID(包含自身)
  224. dept_with_children_ids = get_child_recursion(id=user_dept_id, id_map=id_map)
  225. # 合并到可访问部门ID集合
  226. accessible_dept_ids.update(dept_with_children_ids)
  227. except Exception:
  228. # 查询失败降级:仅保留本部门ID(与原有逻辑一致)
  229. accessible_dept_ids.add(user_dept_id)
  230. # --------------- 原有逻辑:构建部门权限过滤条件(无任何省略)---------------
  231. if accessible_dept_ids:
  232. # 转换部门ID集合为列表(方便SQL IN条件使用)
  233. accessible_dept_list = list(accessible_dept_ids)
  234. # 优先使用模型的created_by关联(关联UserModel.dept_id,性能更优)
  235. creator_rel = getattr(self.model, "created_by", None)
  236. if creator_rel is not None and hasattr(UserModel, 'dept_id'):
  237. # 构建部门权限SQL条件(参数化,防注入)
  238. permission_sql = "created_by_dept_id IN (:accessible_dept_ids)"
  239. permission_params["accessible_dept_ids"] = accessible_dept_list
  240. # 补充:同时保留创建人ID条件(可选,根据你的业务需求调整)
  241. permission_sql = f"created_id = :current_user_id AND {permission_sql}"
  242. permission_params["current_user_id"] = current_user_id
  243. else:
  244. # 降级方案:无created_by关联,仅能查看自己创建的数据
  245. permission_sql = "created_id = :current_user_id"
  246. permission_params["current_user_id"] = current_user_id
  247. return permission_sql, permission_params
  248. # --------------- 原有逻辑:处理仅本人数据权限(DATA_SCOPE_SELF=2)---------------
  249. if self.DATA_SCOPE_SELF in data_scopes:
  250. permission_sql = "created_id = :current_user_id"
  251. permission_params["current_user_id"] = current_user_id
  252. return permission_sql, permission_params
  253. # --------------- 原有逻辑:默认情况(无有效权限范围,仅能查看自己数据)---------------
  254. permission_sql = "created_id = :current_user_id"
  255. permission_params["current_user_id"] = current_user_id
  256. return permission_sql, permission_params