service.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # -*- coding: utf-8 -*-
  2. import json
  3. from redis.asyncio.client import Redis
  4. from app.common.enums import RedisInitKeyConfig
  5. from app.core.redis_crud import RedisCURD
  6. from app.core.security import decode_access_token
  7. from app.core.logger import log
  8. from .schema import OnlineQueryParam
  9. class OnlineService:
  10. """在线用户管理模块服务层"""
  11. @classmethod
  12. async def get_online_list_service(cls, redis: Redis, search: OnlineQueryParam | None = None) -> list[dict]:
  13. """
  14. 获取在线用户列表信息(支持分页和搜索)
  15. 参数:
  16. - redis (Redis): Redis异步客户端实例。
  17. - search (OnlineQueryParam | None): 查询参数模型。
  18. 返回:
  19. - list[dict]: 在线用户详情字典列表。
  20. """
  21. keys = await RedisCURD(redis).get_keys(f"{RedisInitKeyConfig.ACCESS_TOKEN.key}:*")
  22. tokens = await RedisCURD(redis).mget(keys)
  23. online_users = []
  24. for token in tokens:
  25. if not token:
  26. continue
  27. try:
  28. payload = decode_access_token(token=token)
  29. session_info = json.loads(payload.sub)
  30. if cls._match_search_conditions(session_info, search):
  31. online_users.append(session_info)
  32. except Exception as e:
  33. log.error(f"解析在线用户数据失败: {e}")
  34. continue
  35. # 按照 login_time 倒序排序
  36. online_users.sort(key=lambda x: x.get('login_time', ''), reverse=True)
  37. return online_users
  38. @classmethod
  39. async def delete_online_service(cls, redis: Redis, session_id: str) -> bool:
  40. """
  41. 强制下线指定在线用户
  42. 参数:
  43. - redis (Redis): Redis异步客户端实例。
  44. - session_id (str): 在线用户会话ID。
  45. 返回:
  46. - bool: 如果操作成功则返回True,否则返回False。
  47. """
  48. # 删除 token
  49. await RedisCURD(redis).delete(f"{RedisInitKeyConfig.ACCESS_TOKEN.key}:{session_id}")
  50. await RedisCURD(redis).delete(f"{RedisInitKeyConfig.REFRESH_TOKEN.key}:{session_id}")
  51. log.info(f"强制下线用户会话: {session_id}")
  52. return True
  53. @classmethod
  54. async def clear_online_service(cls, redis: Redis) -> bool:
  55. """
  56. 强制下线所有在线用户
  57. 参数:
  58. - redis (Redis): Redis异步客户端实例。
  59. 返回:
  60. - bool: 如果操作成功则返回True,否则返回False。
  61. """
  62. # 删除 token
  63. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.ACCESS_TOKEN.key}:*")
  64. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.REFRESH_TOKEN.key}:*")
  65. log.info(f"清除所有在线用户会话成功")
  66. return True
  67. @staticmethod
  68. def _match_search_conditions(online_info: dict, search: OnlineQueryParam | None = None) -> bool:
  69. """
  70. 检查是否匹配搜索条件
  71. 参数:
  72. - online_info (dict): 在线用户信息字典。
  73. - search (OnlineQueryParam | None): 查询参数模型。
  74. 返回:
  75. - bool: 如果匹配则返回True,否则返回False。
  76. """
  77. if not search:
  78. return True
  79. if search.name and search.name[1]:
  80. keyword = search.name[1].strip('%')
  81. if keyword.lower() not in online_info.get("name", "").lower():
  82. return False
  83. if search.ipaddr and search.ipaddr[1]:
  84. keyword = search.ipaddr[1].strip('%')
  85. if keyword not in online_info.get("ipaddr", ""):
  86. return False
  87. if search.login_location and search.login_location[1]:
  88. keyword = search.login_location[1].strip('%')
  89. if keyword.lower() not in online_info.get("login_location", "").lower():
  90. return False
  91. return True