5.1 简介及安装
SQLAlchemy 是 python 下的开源软件,提供了 SQL 工具包及对象关系映射(ORM)工具,让应用程序开发人员使用上 SQL 的强大功能和灵活性。dmSQLAlchemy 方言包是 DM 提供用于 SQLAlchemy 连接 DM 数据库的方法。
1.SQLAlchemy 软件的安装。例如:SQLAlchemy-1.1.10.win-amd64-py2.7.exe。
2.dmSQLAlchemy 方言包的软件生成与安装。
dmSQLAlchemy 可以运行在任何安装了 Python 的平台上。生成工具 setup.py 位于 drivers\python\dmSQLAlchemy\dmSQLAlchemyx.x 目录中。
可以使用如下命令很方便地在 Windows 和 Linux 操作系统下编译并安装 dmSQLAlchemy:
//进入到setup.py所在的源码目录,执行以下命令:
python setup.py install
也可以先生成安装文件再进行安装,不同平台生成安装包的命令如下:
Windows:python setup.py bdist_wininst
Linux:python setup.py bdist_rpm
生成之后的安装包(例如 dmSQLAlchemy-1.1.10.win-amd64.exe)位于 drivers\python\dmSQLAlchemy\dmSQLAlchemyx.x\dist 目录中。点击安装包安装即可。
DM 提供的 dmSQLAlchemy 与 SQLAlchemy 的版本对应关系如下表:
| dmSQLAlchemy 版本 | SQLAlchemy 版本 |
|---|---|
| 1.1.x | 1.3.x |
| 1.4.x | 1.4.x |
| 2.0.x | 2.0.x |
5.2 engine 的配置
create_engine()返回一个数据库引擎,下面是 DM 数据库的配置方法。
from sqlalchemy import create_engine
engine =
create_engine('dm://SYSDBA:Dmsys_123@localhost:5236/',connect_args={'local_code':1,'connection_timeout':15})
或
engine =
create_engine('dm+dmPython://SYSDBA:Dmsys_123@localhost:5236/',connect_args={'local_code':1,'connection_timeout':15})
其中,connect_args 是字典选项,只要在 connect_args 中以字典对象的方式配置 dmPython.connect 支持的选项即可。可以包含多个字典对象,用逗号分隔。dmPython.connect 请参考 3.1.1.1 dmPython.connect。其他官方配置参考 SQLAlchemy 官网 http://docs.sqlalchemy.org 文档。
同时 dmSQLAlchemy 存在独属于 dmSQLAlchemy 的配置选项:
a) compatible_mode:此选项用来设置区分不同功能实现,可以设置为 DM、Oracle、MySQL 和 TSQL,缺省时为 DM。例如采用 MYSQL 时,对于 json 类型,返回值将返回为 dict 类型。
b) add_quote_all:此选项用来设置 dmSQLAlchemy 执行操作时是否对于所有表名,列名等增加引号,可以设置为 True 和 False,缺省为 False。例如设置为 True 时,将会使所有的表名与列名等完全遵循输入大小写,设置为 False 时将采用默认的 SQLAlchemy 大小写逻辑。
5.3 类型映射
5.3.1 简介
SQLAlchemy 支持了数据库中使用的大部分类型,根据映射关系,dmSQLAlchemy 方言包支持达梦数据库的下列类型:
ARRAY,BFILE,BIGINT,BINARY,BIT,BLOB,CHAR,CHARACTER,CLOB,DATE,DATETIME,DEC,DECIMAL,DOUBLE,DOUBLE PRECISION,FLOAT,JSON,JSONB,INT,INTEGER,INTERVAL YEAR,INTERVAL MONTH,INTERVAL DAY,INTERVAL HOUR,INTERVAL MINUTE,INTERVAL SECOND,INTERVAL YEAR TO MONTH,INTERVAL DAY TO HOUR,INTERVAL DAY TO MINUTE,INTERVAL DAY TO SECOND,INTERVAL HOUR TO MINUTE,INTERVAL HOUR TO SECOND,INTERVAL MINUTE TO SECOND,IMAGE,ROWID,LONG,NCLOB,NUMBER,NUMERIC,NVARCHAR2,REAL,SMALLINT,TEXT,TIME,TIME WITH TIME ZONE,TIMESTAMP,TIMESTAMP WITH LOCAL TIME ZONE,TIMESTAMP WITH TIME ZONE,TINYINT,VARBINARY,VARCHAR,VARCHAR2,VECTOR。
其中由于 SQLAlchemy 类型支持问题,DECIMAL,DEC,NUMBER 与 NUMERIC 类型在 SQLAlchemy 中均被映射为 NUMBER 类型,时间间隔数据类型(INTERVAL YEAR 等)均被映射为 INTERVAL 类型,CHARACTER 类型被映射为 CHAR 类型,TINYINT 类型被映射为 SMALLINT 类型,CLOB 类型与 LONGVARCHAR 类型被映射为 TEXT 类型 TIME WITH TIME ZONE 类型被映射为 TIME 类型。
5.3.2 向量类型
在 dmSQLAlchemy 2.0.8 版本新增了对于数据库中向量类型的支持,由于原生 SQLAlchemy 中不支持 VECTOR 类型,当前调用时需要使用 dmSQLAlchemy 中的方法直接调用。
5.3.2.1 所需环境
由于 VECTOR 类型中部分方法需要 numpy 库支持,如果需要使用 VECTOR 类型,则需要安装 numpy 模块:
// 在非离线状态下,执行以下命令:
pip install numpy
// 在离线状态下,可以将whl包放到环境中,使用pip install 安装指定的包,或者直接将源码包放到环境中使用pip install .或python setup.py install安装
如果需要启用 VectorWordSeek,则需要使用 sentence_transformers 库,同时需要准备本地模型,例如 all-MiniLM-L6-v2 轻量级模型等;启用 VectorImageSeek 时则需要 torch 库,同时也需要准备本地模型,例如 resnet50 轻量级模型等。
5.3.2.2 简单向量数据库示例
示例一 创建一个表,表中有向量列,向其中插入向量数据,查询列以及与指定向量的欧式距离,之后删除指定行,创建 ivf 索引,重建 ivf 索引,最终查询列定义。
from sqlalchemy import create_engine, inspect, Identity, Integer, select, Column, Sequence
from sqlalchemy.orm import Session, sessionmaker, declarative_base
from dmSQLAlchemy import VECTOR, VectorAdaptor
Base=declarative_base()
engine =create_engine('dm+dmPython://SYSDBA:SysDBA*00@localhost:5236', echo=True)
adaptor = VectorAdaptor(engine)
Session=sessionmaker(bind=engine)
session=Session()
inspector = inspect(engine)
class ItemModel(Base):
__tablename__ = "test_vector"
id = Column(Integer,Identity(), primary_key=True)
embedding = Column(VECTOR(dim=3, format='float64'))
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
session.add(ItemModel(embedding=[1, 2, 3]))# 插入Vector类型数据
session.add(ItemModel(embedding=[2, 2, 3]))
session.add(ItemModel(embedding=[5, 2, 1]))
distance = ItemModel.embedding.l1_distance([1,2,3])# 使用欧氏距离作为列输出
result = session.query(ItemModel.id, distance).all()
for row in result:
print(row)
session.query(ItemModel).filter(ItemModel.embedding == [1, 2, 3]).delete()# 删除vector列值为[1, 2, 3]的行
session.commit()
adaptor.create_vector_ivf_index(ItemModel.embedding, metric_name = "cosine", index_name = "indexc")# 创建ivf索引
adaptor.rebuild_vector_ivf_index(ItemModel.embedding, metric_name = "cosine", index_name = "indexc")# 重建ivf索引
session.commit()
columns = inspector.get_columns('test_vector')
for row in columns:
print(row)
示例二 创建以词搜词实例对象,向其中存入数据后进行以词搜词。
from dmSQLAlchemy import VECTOR, VectorAdaptor, VectorWordSeek
conn_str = 'dm+dmPython://SYSDBA:SysDBA*00@localhost:5236/'
wordseekmodel = VectorWordSeek(table_name='test_wordseek', connection_str = conn_str, model_path=r"path\to\model", drop_if_existing = True, engine_args = {'echo' : True})
wordseekmodel.drop_table()
wordseekmodel.create_table(True) #创建指定表
wordseekmodel.insert(texts = ['This is a flag', 'This is a notice', 'This is a camp'])
result = wordseekmodel.query(count = 2, DistanceMetric = 'COSINE', query_vector = ['dog'])
for row_dict in result:
print(row_dict)
示例三 创建以词搜词实例对象,向其中存入数据后进行以词搜词。
from dmSQLAlchemy import VECTOR, VectorAdaptor, VectorWordSeek, VectorImageSeek
conn_str = 'dm+dmPython://SYSDBA:Hust4400_DBA@localhost:5236/'
wordseekmodel = VectorImageSeek(table_name='test_imageseek', drop_if_existing = True, connection_str = conn_str, engine_args = {'echo' : True}, model_path=r"path\to\model")
wordseekmodel.drop_table()
wordseekmodel.create_table(True)
wordseekmodel.insert(paths = ["path\to\images"])
result = wordseekmodel.query(count = 10, DistanceMetric = 'COSINE', query_vector = 'path\to\image')
for row_dict in result:
print(row_dict)
5.4 异步功能
5.4.1 简介
dmSQLAlchemy 在 2.0.7 版本支持了 SQLAlchemy 的异步功能,由于功能要求限制,当前仅支持在 SQLAlchemy 版本大于 2.0.22 的版本上使用异步功能,同时需要 python 中安装了 dmAsync 包。
5.4.2 使用
可以通过创建异步 connection 或者创建异步 session 两种方法来使用 dmSQLAlchemy。
5.4.2.1 通过创建异步 connection 使用 dmSQLAlchemy
与正常使用 SQLAlchemy 的异步连接数据库方法相同,可以通过在创建 engine 时使用 create_async_engine 方法创建异步 engine,通过异步 engine 的 begin 接口创建连接,通过连接执行语句:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, MetaData, text
import asyncio
engine = create_async_engine("dm+dmAsync://SYSDBA:sysDBA*00@localhost:5236/", echo=True)
AsyncSessionLocal = sessionmaker(autoflush=False, bind=engine, class_=AsyncSession)
async def async_func():
async with AsyncSessionLocal() as session:
query = text('SELECT * FROM DUAL;')
result = await session.execute(query)
rows = result.fetchall()
for row in rows:
print(row)
await session.commit()
await engine.dispose()
asyncio.run(async_func())
5.4.2.2 通过创建异步 session 使用 dmSQLAlchemy
除了通过 connection 去执行语句,更常用的方法是通过创建异步 session 去使用 SQLAlclehmy。
在创建 engine 时使用 create_async_engine 方法可以创建异步 engine,根据异步 engine,使用 async_sessionmaker 方法创建 session,从而进行对数据库的异步操作
import asyncio
import datetime
from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
class Base(AsyncAttrs, DeclarativeBase):
pass
class B(Base):
__tablename__ = "b"
id: Mapped[int] = mapped_column(primary_key=True)
a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
data: Mapped[str]
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
data: Mapped[str]
create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
bs: Mapped[List[B]] = relationship()
async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
async with async_session() as session:
async with session.begin():
session.add_all(
[ A(bs=[B(data="b1", id=1), B(data="b2", id=2)], data="a1",id=1),
A(bs=[], data="a2",id=2),
A(bs=[B(data="b3", id=3), B(data="b4", id=4)], data="a3",id=3),
]
)
async def select_and_update_objects(
async_session: async_sessionmaker[AsyncSession],
) -> None:
async with async_session() as session:
stmt = select(A).order_by(A.id).options(selectinload(A.bs))
result = await session.execute(stmt)
for a in result.scalars():
print(a, a.data)
print(f"created at: {a.create_date}")
for b in a.bs:
print(b, b.data)
result = await session.execute(select(A).order_by(A.id).limit(1))
a1 = result.scalars().one()
a1.data = "new data"
await session.commit()
print(a1.data)
for b1 in await a1.awaitable_attrs.bs:
print(b1, b1.data)
async def async_main() -> None:
engine = create_async_engine("dm+dmAsync://SYSDBA:sysDBA*00@localhost:5236", echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await insert_objects(async_session)
await select_and_update_objects(async_session)
await engine.dispose()
asyncio.run(async_main()
5.5 特殊说明
1、dmSQLalchemy 当前版本,主键 integer primary_key 不会再自动形成自增列了;
2、只有当 dmPython 版本大于 2.5.9 时才可以正常使用 executemany 返回多行数据集;
3、由于表名标准化问题,克隆或迁移表时,全大写表名将无法识别,请避免使用此类表名;
4、由于当 python 中的 True 或 False 作为 JSON 类型的值时,选取时使用的 json_value 函数在匹配时如果期望匹配 True 或 False,需要使用字符串’true’或’false’匹配。
5、由于 SQLAlchemy 对于类型返回值的检测问题,导致 JSON 类型无法返回为正常的 dict,因此暂时将从数据库中查询到的 JSON 或者 JSONB 类型列的值映射为字符串类型返回。
5.6 暂不支持功能
1、由于数据库语法暂不支持 LATERAL 功能,因此在 dmSQLAlchemy 中当前暂不支持使用 SQLAlchemy 提供的 lateral 方法构建子查询语句。
2、由于语法限制暂不支持使用 bulk_insert_mappings,supports_default_values,supports_empty_insert 等空值插入方法。
5.7 方言包特殊连接参数
为了适配不同情况,创建连接时可以通过 OPTIONS 设置选项,在 dmSQLAlchemy 中允许设置仅属于 dmSQLAlchemy 的特殊的连接选项,当前 dmSQLAlchemy 包含以下连接参数:
compatible_mode 选项
简介:用来适配不同兼容模式下处理
可选值:'DM','MYSQL','TSQL','ORACLE'
默认值:'DM'
不同选项含义:当前仅使用'MYSQL'参数值会有区别,其余参数对于结果无影响。当参数值被设置为'MYSQL'时,对于 JSON 类型,在从数据库内部获取数据时将被转换为 dict 类型,其余情况将返回为字符串类型。
parse_type 选项
简介:用来使方言包采用不同语法解析模块执行语句
可选值:'DM','MYSQL','TSQL'
默认值:'DM'
不同选项含义:当采用以上不同参数时,将采用对应的语法解析模块,同时,在该语法解析模式下 compatible_mode 选项将被默认设置为对应的兼容模式,如需更改,需显式指定为其他兼容模式。
add_quote_all 选项
简介:用来使方言包对于所有表名、列名等信息增加双引号,保证大小写完全一致
可选值:true 或 false
默认值:false
不同选项含义:当设置为 true 时,方言包在执行语句时,将对所有表名、列名、模式名等增加双引号,从而保证所有执行操作的表名、列名、模式名等与输入时保持大小写完全一致,当设置为 false 时,则遵循 SQLAlchemy 原始设置对于部分情况下名称增加双引号。