permission.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # -*- coding: utf-8 -*-
  2. from typing import Any
  3. from sqlalchemy.sql.elements import ColumnElement
  4. from sqlalchemy import select
  5. from app.api.v1.module_system.user.model import UserModel
  6. from app.api.v1.module_system.dept.model import DeptModel
  7. from app.api.v1.module_system.auth.schema import AuthSchema
  8. from app.utils.common_util import get_child_id_map, get_child_recursion
  9. class Permission:
  10. """
  11. 为业务模型提供数据权限过滤功能
  12. """
  13. # 数据权限常量定义,提高代码可读性
  14. DATA_SCOPE_SELF = 1 # 仅本人数据
  15. DATA_SCOPE_DEPT = 2 # 本部门数据
  16. DATA_SCOPE_DEPT_AND_CHILD = 3 # 本部门及以下数据
  17. DATA_SCOPE_ALL = 4 # 全部数据
  18. DATA_SCOPE_CUSTOM = 5 # 自定义数据
  19. def __init__(self, model: Any, auth: AuthSchema):
  20. """
  21. 初始化权限过滤器实例
  22. Args:
  23. db: 数据库会话
  24. model: 数据模型类
  25. current_user: 当前用户对象
  26. auth: 认证信息对象
  27. """
  28. self.model = model
  29. self.auth = auth
  30. self.conditions: list[ColumnElement] = [] # 权限条件列表
  31. async def filter_query(self, query: Any) -> Any:
  32. """
  33. 异步过滤查询对象
  34. Args:
  35. query: SQLAlchemy查询对象
  36. Returns:
  37. 过滤后的查询对象
  38. """
  39. condition = await self.__permission_condition()
  40. return query.where(condition) if condition is not None else query
  41. async def __permission_condition(self) -> ColumnElement | None:
  42. """
  43. 应用数据范围权限隔离
  44. 基于角色的五种数据权限范围过滤
  45. 支持五种权限类型:
  46. 1. 仅本人数据权限 - 只能查看自己创建的数据
  47. 2. 本部门数据权限 - 只能查看同部门的数据
  48. 3. 本部门及以下数据权限 - 可以查看本部门及所有子部门的数据
  49. 4. 全部数据权限 - 可以查看所有数据
  50. 5. 自定义数据权限 - 通过role_dept_relation表定义可访问的部门列表
  51. 权限处理原则:
  52. - 多个角色的权限取并集(最宽松原则)
  53. - 优先级:全部数据 > 部门权限(2、3、5的并集)> 仅本人
  54. - 构造权限过滤表达式,返回None表示不限制
  55. """
  56. # 如果不需要检查数据权限,则不限制
  57. if not self.auth.user:
  58. return None
  59. # 如果检查数据权限为False,则不限制
  60. if not self.auth.check_data_scope:
  61. return None
  62. # 如果模型没有创建人created_id字段,则不限制
  63. if not hasattr(self.model, "created_id"):
  64. return None
  65. # 超级管理员可以查看所有数据
  66. if self.auth.user.is_superuser:
  67. return None
  68. # 如果用户没有角色,则只能查看自己的数据
  69. roles = getattr(self.auth.user, "roles", []) or []
  70. if not roles:
  71. created_id_attr = getattr(self.model, "created_id", None)
  72. if created_id_attr is not None:
  73. return created_id_attr == self.auth.user.id
  74. return None
  75. # 获取用户所有角色的权限范围
  76. data_scopes = set()
  77. custom_dept_ids = set() # 自定义权限(data_scope=5)关联的部门ID集合
  78. for role in roles:
  79. data_scopes.add(role.data_scope)
  80. # 收集自定义权限(data_scope=5)关联的部门ID
  81. if role.data_scope == self.DATA_SCOPE_CUSTOM and hasattr(role, 'depts') and role.depts:
  82. for dept in role.depts:
  83. custom_dept_ids.add(dept.id)
  84. # 权限优先级处理:全部数据权限最高优先级
  85. if self.DATA_SCOPE_ALL in data_scopes:
  86. return None
  87. # 收集所有可访问的部门ID(2、3、5权限的并集)
  88. accessible_dept_ids = set()
  89. user_dept_id = getattr(self.auth.user, "dept_id", None)
  90. # 处理自定义数据权限(5)
  91. if self.DATA_SCOPE_CUSTOM in data_scopes:
  92. accessible_dept_ids.update(custom_dept_ids)
  93. # 处理本部门数据权限(2)
  94. if self.DATA_SCOPE_DEPT in data_scopes:
  95. if user_dept_id is not None:
  96. accessible_dept_ids.add(user_dept_id)
  97. # 处理本部门及以下数据权限(3)
  98. if self.DATA_SCOPE_DEPT_AND_CHILD in data_scopes:
  99. if user_dept_id is not None:
  100. try:
  101. # 查询所有部门并递归获取子部门
  102. dept_sql = select(DeptModel)
  103. dept_result = await self.auth.db.execute(dept_sql)
  104. dept_objs = dept_result.scalars().all()
  105. id_map = get_child_id_map(dept_objs)
  106. # get_child_recursion返回的结果已包含自身ID和所有子部门ID
  107. dept_with_children_ids = get_child_recursion(id=user_dept_id, id_map=id_map)
  108. accessible_dept_ids.update(dept_with_children_ids)
  109. except Exception:
  110. # 查询失败时降级到本部门
  111. accessible_dept_ids.add(user_dept_id)
  112. # 如果有部门权限(2、3、5任一),使用部门过滤
  113. if accessible_dept_ids:
  114. creator_rel = getattr(self.model, "created_by", None)
  115. # 优先使用关系过滤(性能更好)
  116. if creator_rel is not None and hasattr(UserModel, 'dept_id'):
  117. return creator_rel.has(getattr(UserModel, 'dept_id').in_(list(accessible_dept_ids)))
  118. # 降级方案:如果模型没有created_by关系但有created_id,则只能查看自己的数据
  119. else:
  120. created_id_attr = getattr(self.model, "created_id", None)
  121. if created_id_attr is not None:
  122. return created_id_attr == self.auth.user.id
  123. return None
  124. # 处理仅本人数据权限(1)
  125. if self.DATA_SCOPE_SELF in data_scopes:
  126. created_id_attr = getattr(self.model, "created_id", None)
  127. if created_id_attr is not None:
  128. return created_id_attr == self.auth.user.id
  129. return None
  130. # 默认情况:如果用户有角色但没有任何有效权限范围,只能查看自己的数据
  131. created_id_attr = getattr(self.model, "created_id", None)
  132. if created_id_attr is not None:
  133. return created_id_attr == self.auth.user.id
  134. return None