# common.py import httpx import base64 from fastapi import HTTPException from app.config.setting import settings # TDengine REST配置(跨平台通用) TDENGINE_CONFIG = { "host": settings.TDENGINE_HOST, "port": settings.TDENGINE_PORT, "user": settings.TDENGINE_USER, "password": settings.TDENGINE_PWD, "database": "crane_data" } async def tdengine_rest_query(sql: str) -> dict: """改用httpx的REST查询函数(跨平台)""" rest_url = f"http://{TDENGINE_CONFIG['host']}:{TDENGINE_CONFIG['port']}/rest/sql/{TDENGINE_CONFIG['database']}?tz=Asia%2FShanghai" auth_str = f"{TDENGINE_CONFIG['user']}:{TDENGINE_CONFIG['password']}" auth_base64 = base64.b64encode(auth_str.encode("utf-8")).decode("utf-8") headers = { "Authorization": f"Basic {auth_base64}", "Content-Type": "text/plain", "TZ": "Asia/Shanghai" } try: # 替换requests.post为httpx.post(同步调用) response = httpx.post(rest_url, content=sql, headers=headers, timeout=30) response.raise_for_status() result = response.json() if result.get("code") != 0: raise Exception(f"SQL执行失败: {result.get('desc', '未知错误')}") return result except httpx.HTTPError as e: # 替换requests.exceptions为httpx.HTTPError raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}") async def format_rest_result(rest_result: dict) -> list[dict]: """公共数据格式化函数(适配Element UI)""" columns = [col[0] for col in rest_result["column_meta"]] rows = rest_result["data"] formatted_data = [] for row in rows: row_dict = dict(zip(columns, row)) # 二进制数据转字符串 for key, value in row_dict.items(): if isinstance(value, bytes): row_dict[key] = value.decode("utf-8", errors="ignore") formatted_data.append(row_dict) return formatted_data async def get_table_total_count(table_name: str, filter_sql: str = "") -> int: """获取表的总条数(用于分页)""" count_sql = f"SELECT COUNT(*) FROM {table_name} {filter_sql}" count_result = await tdengine_rest_query(count_sql) return count_result["data"][0][0] if count_result["data"] else 0