注册

dmAsync 使用异常数据库连接异常:<class 'dmPython.Connection'> returned a result with an exception set

陈柏恺 2026/01/06 63 1

为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。
【DM版本】: V8
【操作系统】:Windows
【CPU】: X86
【问题描述】*:

Python执行异步数据库连接失败:(同步数据库连接正常)
(xzyd-langflow) PS E:\projects\python\xzyd-langflow\src\backend\base\langflow\services\database_ext> python .\test_async.py
2026-01-06 12:17:40.202 | INFO | main:setup_dm_env:16 - 成功加载达梦 DLL 目录: D:\Tools\dmdbms\bin
SQLAlchemy 版本: 2.0.41 (需 > 2.0.22)
dmSQLAlchemy 版本: 2.0.41 (需 > 2.0.7)
dmAsync 是否安装: True
2026-01-06 12:17:40.528 | ERROR | main:<module>:71 - 运行失败: <class 'dmPython.Connection'> returned a result with an exception set

-------------------test_async.py------------------------

import os
import platform
from loguru import logger

import sqlalchemy

1. 达梦 DLL 环境配置 (必须在任何数据库操作之前)

def setup_dm_env():
dm_bin_path = r"D:\Tools\dmdbms\bin"
if platform.system().lower() == 'windows':
if os.path.exists(dm_bin_path):
os.environ['PATH'] = dm_bin_path + os.pathsep + os.environ.get('PATH', '')
if hasattr(os, 'add_dll_directory'):
try:
os.add_dll_directory(dm_bin_path)
logger.info(f"成功加载达梦 DLL 目录: {dm_bin_path}")
except Exception as e:
logger.error(f"加载 DLL 目录失败: {e}")

setup_dm_env()

import dmSQLAlchemy
try:
import dmAsync
has_dm_async = True
except:
has_dm_async = False

print(f"SQLAlchemy 版本: {sqlalchemy.version} (需 > 2.0.22)")
print(f"dmSQLAlchemy 版本: {dmSQLAlchemy.current_version} (需 > 2.0.7)")
print(f"dmAsync 是否安装: {has_dm_async}")

2. 导入异步支持组件

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import text

3. 使用 create_async_engine

确保 URI 使用 dm+dmAsync 协议

engine = create_async_engine(
'dm+dmAsync://LANGFLOW:LANGFLOWLANGFLOWLANGFLOW@192.168.206.132:5236/XZYD_LANGFLOW',
echo=True
)

4. 使用 async_sessionmaker (SQLAlchemy 2.0+ 推荐)

AsyncSessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
autoflush=False
)

async def async_func():
async with AsyncSessionLocal() as session:
# 执行异步查询
query = text('SELECT * FROM DUAL')
result = await session.execute(query)
rows = result.fetchall()
for row in rows:
print(f"查询结果: {row}")

# 异步清理
await engine.dispose()

if name == "main":
try:
asyncio.run(async_func())
except Exception as e:
logger.error(f"运行失败: {e}")

回答 0
暂无回答
扫一扫
联系客服