service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # -*- coding: utf-8 -*-
  2. import platform
  3. import psutil
  4. import socket
  5. import time
  6. from pathlib import Path
  7. from app.utils.common_util import bytes2human
  8. from .schema import (
  9. CpuInfoSchema,
  10. MemoryInfoSchema,
  11. PyInfoSchema,
  12. ServerMonitorSchema,
  13. DiskInfoSchema,
  14. SysInfoSchema
  15. )
  16. class ServerService:
  17. """服务监控模块服务层"""
  18. @classmethod
  19. async def get_server_monitor_info_service(cls) -> dict:
  20. """
  21. 获取服务器监控信息
  22. 返回:
  23. - Dict: 包含服务器监控信息的字典。
  24. """
  25. return ServerMonitorSchema(
  26. cpu=cls._get_cpu_info(),
  27. mem=cls._get_memory_info(),
  28. sys=cls._get_system_info(),
  29. py=cls._get_python_info(),
  30. disks=cls._get_disk_info()
  31. ).model_dump()
  32. @classmethod
  33. def _get_cpu_info(cls) -> CpuInfoSchema:
  34. """
  35. 获取CPU信息
  36. 返回:
  37. - CpuInfoSchema: CPU信息模型。
  38. """
  39. cpu_times = psutil.cpu_times_percent()
  40. cpu_num=psutil.cpu_count(logical=True)
  41. if not cpu_num:
  42. cpu_num = 1
  43. return CpuInfoSchema(
  44. cpu_num=cpu_num,
  45. used=cpu_times.user,
  46. sys=cpu_times.system,
  47. free=cpu_times.idle
  48. )
  49. @classmethod
  50. def _get_memory_info(cls) -> MemoryInfoSchema:
  51. """
  52. 获取内存信息
  53. 返回:
  54. - MemoryInfoSchema: 内存信息模型。
  55. """
  56. memory = psutil.virtual_memory()
  57. return MemoryInfoSchema(
  58. total=bytes2human(memory.total),
  59. used=bytes2human(memory.used),
  60. free=bytes2human(memory.free),
  61. usage=memory.percent
  62. )
  63. @classmethod
  64. def _get_system_info(cls) -> SysInfoSchema:
  65. """
  66. 获取系统信息
  67. 返回:
  68. - SysInfoSchema: 系统信息模型。
  69. """
  70. hostname = socket.gethostname()
  71. return SysInfoSchema(
  72. computer_ip=socket.gethostbyname(hostname),
  73. computer_name=platform.node(),
  74. os_arch=platform.machine(),
  75. os_name=platform.platform(),
  76. user_dir=str(Path.cwd())
  77. )
  78. @classmethod
  79. def _get_python_info(cls) -> PyInfoSchema:
  80. """
  81. 获取Python解释器信息
  82. 返回:
  83. - PyInfoSchema: Python解释器信息模型。
  84. """
  85. current_process = psutil.Process()
  86. memory = psutil.virtual_memory()
  87. process_memory = current_process.memory_info()
  88. start_time = current_process.create_time()
  89. run_time = ServerService._calculate_run_time(start_time)
  90. return PyInfoSchema(
  91. name=current_process.name(),
  92. version=platform.python_version(),
  93. start_time=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),
  94. run_time=run_time,
  95. home=str(Path(current_process.exe())),
  96. memory_total=bytes2human(memory.available),
  97. memory_used=bytes2human(process_memory.rss),
  98. memory_free=bytes2human(memory.available - process_memory.rss),
  99. memory_usage=round((process_memory.rss / memory.available) * 100, 2)
  100. )
  101. @classmethod
  102. def _get_disk_info(cls) -> list[DiskInfoSchema]:
  103. """
  104. 获取磁盘信息
  105. 返回:
  106. - list[DiskInfoSchema]: 磁盘信息模型列表。
  107. """
  108. disk_info = []
  109. for partition in psutil.disk_partitions():
  110. try:
  111. # 使用mountpoint而不是device来获取磁盘使用情况
  112. usage = psutil.disk_usage(partition.mountpoint)
  113. mount_point = str(Path(partition.mountpoint))
  114. disk_info.append(
  115. DiskInfoSchema(
  116. dir_name=mount_point, # 使用mountpoint替代device
  117. sys_type_name=partition.fstype,
  118. type_name=f'本地固定磁盘({mount_point})',
  119. total=bytes2human(usage.total),
  120. used=bytes2human(usage.used),
  121. free=bytes2human(usage.free),
  122. usage=usage.percent # 直接使用数字而不是字符串
  123. )
  124. )
  125. except (PermissionError, FileNotFoundError):
  126. # 明确指定可能的异常
  127. continue
  128. return disk_info
  129. @classmethod
  130. def _calculate_run_time(cls,start_time: float) -> str:
  131. """
  132. 计算运行时间
  133. 参数:
  134. - start_time (float): 进程启动时间(时间戳)
  135. 返回:
  136. - str: 格式化后的运行时间字符串(例如:"1天2小时3分钟")
  137. """
  138. difference = time.time() - start_time
  139. days = int(difference // (24 * 60 * 60))
  140. hours = int((difference % (24 * 60 * 60)) // (60 * 60))
  141. minutes = int((difference % (60 * 60)) // 60)
  142. return f'{days}天{hours}小时{minutes}分钟'