为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。
【DM版本】: DM8,sqlalchemy2.0+
【操作系统】:
【CPU】:
【问题描述】*:达梦驱动在sqlalchemy2.0 中 不支持异步
解决方案:(伪异步,仅支持 await语法)
import asyncio
from typing import Optional, Type, Any
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
class CustomAsyncSession(AsyncSession):
def __init__(
self,
bind= None,
*,
sync_session_class: Optional[Type[Session]] = None,
**kw: Any,
):
if sync_session_class:
self.sync_session_class = sync_session_class
self.sync_session = self._proxied = self._assign_proxied(
self.sync_session_class(bind=bind, binds=None, **kw)
)
SQLALCHEMY_DATABASE_URL = 'dm+dmPython://SYSDBA:xxx@127.0.0.1:30236'
engine_sync = create_engine(
url=SQLALCHEMY_DATABASE_URL,
pool_recycle=300,
pool_size=20,
max_overflow=15,
pool_timeout=15,
echo=False)
session_maker: async_sessionmaker[AsyncSession] = async_sessionmaker(bind=engine_sync, autoflush=False, autocommit=False, class_=CustomAsyncSession)
async def main():
async with session_maker() as session:
from model.model_info import ModelInfo
query = select(ModelInfo.id, ModelInfo.create_time).limit(1)
res = await session.execute(query)
print(res.fetchall())
if __name__ == '__main__':
asyncio.run(main())
6