tdengine_util.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # common.py
  2. import httpx
  3. import base64
  4. from fastapi import HTTPException
  5. from app.config.setting import settings
  6. # TDengine REST配置(跨平台通用)
  7. TDENGINE_CONFIG = {
  8. "host": settings.TDENGINE_HOST,
  9. "port": settings.TDENGINE_PORT,
  10. "user": settings.TDENGINE_USER,
  11. "password": settings.TDENGINE_PWD,
  12. "database": "crane_data"
  13. }
  14. async def tdengine_rest_query(sql: str) -> dict:
  15. """改用httpx的REST查询函数(跨平台)"""
  16. rest_url = f"http://{TDENGINE_CONFIG['host']}:{TDENGINE_CONFIG['port']}/rest/sql/{TDENGINE_CONFIG['database']}?tz=Asia%2FShanghai"
  17. auth_str = f"{TDENGINE_CONFIG['user']}:{TDENGINE_CONFIG['password']}"
  18. auth_base64 = base64.b64encode(auth_str.encode("utf-8")).decode("utf-8")
  19. headers = {
  20. "Authorization": f"Basic {auth_base64}",
  21. "Content-Type": "text/plain",
  22. "TZ": "Asia/Shanghai"
  23. }
  24. try:
  25. # 替换requests.post为httpx.post(同步调用)
  26. response = httpx.post(rest_url, content=sql, headers=headers, timeout=30)
  27. response.raise_for_status()
  28. result = response.json()
  29. if result.get("code") != 0:
  30. raise Exception(f"SQL执行失败: {result.get('desc', '未知错误')}")
  31. return result
  32. except httpx.HTTPError as e: # 替换requests.exceptions为httpx.HTTPError
  33. raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")
  34. except Exception as e:
  35. raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}")
  36. async def format_rest_result(rest_result: dict) -> list[dict]:
  37. """公共数据格式化函数(适配Element UI)"""
  38. columns = [col[0] for col in rest_result["column_meta"]]
  39. rows = rest_result["data"]
  40. formatted_data = []
  41. for row in rows:
  42. row_dict = dict(zip(columns, row))
  43. # 二进制数据转字符串
  44. for key, value in row_dict.items():
  45. if isinstance(value, bytes):
  46. row_dict[key] = value.decode("utf-8", errors="ignore")
  47. formatted_data.append(row_dict)
  48. return formatted_data
  49. async def get_table_total_count(table_name: str, filter_sql: str = "") -> int:
  50. """获取表的总条数(用于分页)"""
  51. count_sql = f"SELECT COUNT(*) FROM {table_name} {filter_sql}"
  52. count_result = await tdengine_rest_query(count_sql)
  53. return count_result["data"][0][0] if count_result["data"] else 0