| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- # -*- coding: utf-8 -*-
- import platform
- import psutil
- import socket
- import time
- from pathlib import Path
- from app.utils.common_util import bytes2human
- from .schema import (
- CpuInfoSchema,
- MemoryInfoSchema,
- PyInfoSchema,
- ServerMonitorSchema,
- DiskInfoSchema,
- SysInfoSchema
- )
- class ServerService:
- """服务监控模块服务层"""
- @classmethod
- async def get_server_monitor_info_service(cls) -> dict:
- """
- 获取服务器监控信息
-
- 返回:
- - Dict: 包含服务器监控信息的字典。
- """
- return ServerMonitorSchema(
- cpu=cls._get_cpu_info(),
- mem=cls._get_memory_info(),
- sys=cls._get_system_info(),
- py=cls._get_python_info(),
- disks=cls._get_disk_info()
- ).model_dump()
- @classmethod
- def _get_cpu_info(cls) -> CpuInfoSchema:
- """
- 获取CPU信息
-
- 返回:
- - CpuInfoSchema: CPU信息模型。
- """
- cpu_times = psutil.cpu_times_percent()
- cpu_num=psutil.cpu_count(logical=True)
- if not cpu_num:
- cpu_num = 1
- return CpuInfoSchema(
- cpu_num=cpu_num,
- used=cpu_times.user,
- sys=cpu_times.system,
- free=cpu_times.idle
- )
- @classmethod
- def _get_memory_info(cls) -> MemoryInfoSchema:
- """
- 获取内存信息
-
- 返回:
- - MemoryInfoSchema: 内存信息模型。
- """
- memory = psutil.virtual_memory()
- return MemoryInfoSchema(
- total=bytes2human(memory.total),
- used=bytes2human(memory.used),
- free=bytes2human(memory.free),
- usage=memory.percent
- )
- @classmethod
- def _get_system_info(cls) -> SysInfoSchema:
- """
- 获取系统信息
-
- 返回:
- - SysInfoSchema: 系统信息模型。
- """
- hostname = socket.gethostname()
- return SysInfoSchema(
- computer_ip=socket.gethostbyname(hostname),
- computer_name=platform.node(),
- os_arch=platform.machine(),
- os_name=platform.platform(),
- user_dir=str(Path.cwd())
- )
- @classmethod
- def _get_python_info(cls) -> PyInfoSchema:
- """
- 获取Python解释器信息
-
- 返回:
- - PyInfoSchema: Python解释器信息模型。
- """
- current_process = psutil.Process()
- memory = psutil.virtual_memory()
- process_memory = current_process.memory_info()
-
- start_time = current_process.create_time()
- run_time = ServerService._calculate_run_time(start_time)
-
- return PyInfoSchema(
- name=current_process.name(),
- version=platform.python_version(),
- start_time=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),
- run_time=run_time,
- home=str(Path(current_process.exe())),
- memory_total=bytes2human(memory.available),
- memory_used=bytes2human(process_memory.rss),
- memory_free=bytes2human(memory.available - process_memory.rss),
- memory_usage=round((process_memory.rss / memory.available) * 100, 2)
- )
- @classmethod
- def _get_disk_info(cls) -> list[DiskInfoSchema]:
- """
- 获取磁盘信息
-
- 返回:
- - list[DiskInfoSchema]: 磁盘信息模型列表。
- """
- disk_info = []
- for partition in psutil.disk_partitions():
- try:
- # 使用mountpoint而不是device来获取磁盘使用情况
- usage = psutil.disk_usage(partition.mountpoint)
- mount_point = str(Path(partition.mountpoint))
- disk_info.append(
- DiskInfoSchema(
- dir_name=mount_point, # 使用mountpoint替代device
- sys_type_name=partition.fstype,
- type_name=f'本地固定磁盘({mount_point})',
- total=bytes2human(usage.total),
- used=bytes2human(usage.used),
- free=bytes2human(usage.free),
- usage=usage.percent # 直接使用数字而不是字符串
- )
- )
- except (PermissionError, FileNotFoundError):
- # 明确指定可能的异常
- continue
- return disk_info
- @classmethod
- def _calculate_run_time(cls,start_time: float) -> str:
- """
- 计算运行时间
-
- 参数:
- - start_time (float): 进程启动时间(时间戳)
-
- 返回:
- - str: 格式化后的运行时间字符串(例如:"1天2小时3分钟")
- """
- difference = time.time() - start_time
- days = int(difference // (24 * 60 * 60))
- hours = int((difference % (24 * 60 * 60)) // (60 * 60))
- minutes = int((difference % (60 * 60)) // 60)
- return f'{days}天{hours}小时{minutes}分钟'
|