# -*- coding: utf-8 -*- import platform import psutil import socket import time from pathlib import Path from app.utils.common_util import bytes2human from .schema import ( CpuInfoSchema, MemoryInfoSchema, PyInfoSchema, ServerMonitorSchema, DiskInfoSchema, SysInfoSchema ) class ServerService: """服务监控模块服务层""" @classmethod async def get_server_monitor_info_service(cls) -> dict: """ 获取服务器监控信息 返回: - Dict: 包含服务器监控信息的字典。 """ return ServerMonitorSchema( cpu=cls._get_cpu_info(), mem=cls._get_memory_info(), sys=cls._get_system_info(), py=cls._get_python_info(), disks=cls._get_disk_info() ).model_dump() @classmethod def _get_cpu_info(cls) -> CpuInfoSchema: """ 获取CPU信息 返回: - CpuInfoSchema: CPU信息模型。 """ cpu_times = psutil.cpu_times_percent() cpu_num=psutil.cpu_count(logical=True) if not cpu_num: cpu_num = 1 return CpuInfoSchema( cpu_num=cpu_num, used=cpu_times.user, sys=cpu_times.system, free=cpu_times.idle ) @classmethod def _get_memory_info(cls) -> MemoryInfoSchema: """ 获取内存信息 返回: - MemoryInfoSchema: 内存信息模型。 """ memory = psutil.virtual_memory() return MemoryInfoSchema( total=bytes2human(memory.total), used=bytes2human(memory.used), free=bytes2human(memory.free), usage=memory.percent ) @classmethod def _get_system_info(cls) -> SysInfoSchema: """ 获取系统信息 返回: - SysInfoSchema: 系统信息模型。 """ hostname = socket.gethostname() return SysInfoSchema( computer_ip=socket.gethostbyname(hostname), computer_name=platform.node(), os_arch=platform.machine(), os_name=platform.platform(), user_dir=str(Path.cwd()) ) @classmethod def _get_python_info(cls) -> PyInfoSchema: """ 获取Python解释器信息 返回: - PyInfoSchema: Python解释器信息模型。 """ current_process = psutil.Process() memory = psutil.virtual_memory() process_memory = current_process.memory_info() start_time = current_process.create_time() run_time = ServerService._calculate_run_time(start_time) return PyInfoSchema( name=current_process.name(), version=platform.python_version(), start_time=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)), run_time=run_time, home=str(Path(current_process.exe())), memory_total=bytes2human(memory.available), memory_used=bytes2human(process_memory.rss), memory_free=bytes2human(memory.available - process_memory.rss), memory_usage=round((process_memory.rss / memory.available) * 100, 2) ) @classmethod def _get_disk_info(cls) -> list[DiskInfoSchema]: """ 获取磁盘信息 返回: - list[DiskInfoSchema]: 磁盘信息模型列表。 """ disk_info = [] for partition in psutil.disk_partitions(): try: # 使用mountpoint而不是device来获取磁盘使用情况 usage = psutil.disk_usage(partition.mountpoint) mount_point = str(Path(partition.mountpoint)) disk_info.append( DiskInfoSchema( dir_name=mount_point, # 使用mountpoint替代device sys_type_name=partition.fstype, type_name=f'本地固定磁盘({mount_point})', total=bytes2human(usage.total), used=bytes2human(usage.used), free=bytes2human(usage.free), usage=usage.percent # 直接使用数字而不是字符串 ) ) except (PermissionError, FileNotFoundError): # 明确指定可能的异常 continue return disk_info @classmethod def _calculate_run_time(cls,start_time: float) -> str: """ 计算运行时间 参数: - start_time (float): 进程启动时间(时间戳) 返回: - str: 格式化后的运行时间字符串(例如:"1天2小时3分钟") """ difference = time.time() - start_time days = int(difference // (24 * 60 * 60)) hours = int((difference % (24 * 60 * 60)) // (60 * 60)) minutes = int((difference % (60 * 60)) // 60) return f'{days}天{hours}小时{minutes}分钟'