本文结合
migrator.go的具体实现,深入解析达梦(DM8)数据库在 GORM V2 中的迁移器适配逻辑,帮助开发者理解其如何解决达梦与 MySQL 在结构迁移上的关键差异。
GORM作为Go语言最流行的ORM框架,其强大的自动迁移功能极大地简化了数据库模式管理。然而,不同数据库在SQL语法和系统表结构上存在显著差异,这使得为特定数据库定制迁移器成为必要。本文将根据达梦官网提供的go-20250513版本的go驱动分析达梦数据库(DM)专用的GORM迁移器实现,从表、列、视图、索引等维度解析其实现逻辑与技术细节,揭示如何高效适配特定数据库的复杂需求。
CreateTable函数达梦数据库迁移器在表创建流程中进行了三阶段精细化处理:
func (m Migrator) CreateTable(values ...interface{}) error {
// 布尔值特殊处理
for _, value := range values {
if err := m.RunWithValue(value, func(stmt *gorm.Statement) error {
for _, v := range stmt.Schema.Fields {
if v.HasDefaultValue {
if vv, ok := v.DefaultValueInterface.(bool); ok {
// true -> 1, false -> 0
if vv {
v.DefaultValueInterface = int64(1)
} else {
v.DefaultValueInterface = int64(0)
}
}
}
}
return nil
}); err != nil {
return err
}
}
// 调用父类创建基础表结构
if err := m.Migrator.CreateTable(values...); err != nil {
return err
}
// 为每列添加注释
for _, value := range values {
if err := m.RunWithValue(value, func(stmt *gorm.Statement) error {
for _, field := range stmt.Schema.FieldsByDBName {
if err := m.alterColumnComment(stmt, field); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
return nil
}
HasTable函数达梦的系统表结构复杂,迁移器通过多表连接和权限验证实现精准查询:
func (m Migrator) HasTable(value interface{}) bool {
tableSql := `
-- /*+ MAX_OPT_N_TABLES(5) */优化器提示,查询优化过程中最多考虑连接 5 张表,用 COUNT(TABS.NAME) 统计符合条件的对象数量
SELECT /*+ MAX_OPT_N_TABLES(5) */ COUNT(TABS.NAME) FROM
(SELECT ID, PID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCH' AND NAME = ?) SCHEMAS,
-- 子类型 SUBTYPE$ 为普通表(UTAB)、系统表(STAB)、视图(VIEW)、同义词(SYNOM)。对 UTAB 类型做了额外过滤,排除特定类型的表(如临时表、外部表等)
(SELECT ID, SCHID, NAME FROM SYS.SYSOBJECTS WHERE
NAME = ? AND TYPE$ = 'SCHOBJ' AND SUBTYPE$ IN ('UTAB', 'STAB', 'VIEW', 'SYNOM')
AND ((SUBTYPE$ ='UTAB' AND CAST((INFO3 & 0x00FF & 0x003F) AS INT) not in (9, 27, 29, 25, 12, 7, 21, 23, 18, 5))
OR SUBTYPE$ in ('STAB', 'VIEW', 'SYNOM'))) TABS
WHERE TABS.SCHID = SCHEMAS.ID
-- 函数 SF_CHECK_PRIV_OPT 来判断当前用户是否有访问对象的权限
AND SF_CHECK_PRIV_OPT(UID(), CURRENT_USERTYPE(), TABS.ID, SCHEMAS.PID, -1, TABS.ID) = 1;`
var count int64
m.RunWithValue(value, func(stmt *gorm.Statement) error {
return m.DB.Raw(tableSql, m.CurrentDatabase(), stmt.Table).Row().Scan(&count)
})
// count>0则说明表存在
return count > 0
}
技术细节:
SF_CHECK_PRIV_OPT确保只返回当前用户有权访问的表SUBTYPE$区分表、视图等不同对象类型INFO3 & 0x00FF & 0x003F排除特殊系统表MAX_OPT_N_TABLES(5)提示优化器限制连接表数量GetTables函数获取当前模式下所有表的实现与HasTable相似,但返回表名列表:
func (m Migrator) GetTables() (tableList []string, err error) {
tableSql := `SELECT /*+ MAX_OPT_N_TABLES(5) */ TABS.NAME FROM ...` // 与HasTable相同条件
err = m.DB.Raw(tableSql, m.CurrentDatabase()).Scan(&tableList).Error
return
}
应用场景:数据库迁移、模式同步、元数据检查等需要获取所有表的场景。
MigrateColumn函数这是迁移器最复杂的部分,实现了列属性的精准比对和最小化变更:
func (m Migrator) MigrateColumn(dst interface{}, field *schema.Field, columnType gorm.ColumnType) error {
// 获取当前定义和数据库实际类型
fullDataType := strings.TrimSpace(strings.ToLower(m.DB.Migrator().FullDataTypeOf(field).SQL))
realDataType := strings.ToLower(columnType.DatabaseTypeName())
containsUnique := false
// 检查当前列是否有UNIQUE约束
if unique, ok := columnType.Unique(); ok {
containsUnique = unique
}
var (
alterColumn bool
isSameType = fullDataType == realDataType
)
// 1. 检查主键外的列类型
if !field.PrimaryKey {
if !strings.HasPrefix(fullDataType, realDataType) {
// 1.1 检查类型别名 (如 INTEGER 和 INT)
aliases := m.DB.Migrator().GetTypeAliases(realDataType)
for _, alias := range aliases {
if strings.HasPrefix(fullDataType, alias) {
isSameType = true
break
}
}
if !isSameType {
alterColumn = true
}
}
}
// 2. 检查数据类型差异
if !isSameType {
// 2.1 检查长度
if length, ok := columnType.Length(); length != int64(field.Size) {
if length > 0 && field.Size > 0 {
alterColumn = true
} else {
// 2.1.1 从数据类型字符串提取数字
matches2 := regFullDataType.FindAllStringSubmatch(fullDataType, -1)
if !field.PrimaryKey && (len(matches2) == 1 && matches2[0][1] != fmt.Sprint(length) && ok) {
alterColumn = true
}
}
}
// 2.2 检查精度 (DECIMAL/NUMERIC类型)
if precision, _, ok := columnType.DecimalSize(); ok && int64(field.Precision) != precision {
if regexp.MustCompile(fmt.Sprintf("[^0-9]%d[^0-9]", field.Precision)).MatchString(m.Migrator.DataTypeOf(field)) {
alterColumn = true
}
}
}
// 3. 检查能否为null
if nullable, ok := columnType.Nullable(); ok && nullable == field.NotNull {
if !field.PrimaryKey && nullable { // 非主键且数据库允许NULL
alterColumn = true
}
}
// 4. 检查唯一性
if unique, ok := columnType.Unique(); ok && unique != field.Unique {
if !field.PrimaryKey { // 非主键
alterColumn = true
}
}
// 5. 检查默认值
if !field.PrimaryKey {
currentDefaultNotNull := field.HasDefaultValue &&
(field.DefaultValueInterface != nil || !strings.EqualFold(field.DefaultValue, "NULL"))
dv, dvNotNull := columnType.DefaultValue()
if dvNotNull && !currentDefaultNotNull {
alterColumn = true // 数据库有默认值,但字段定义没有
} else if !dvNotNull && currentDefaultNotNull {
alterColumn = true // 字段有默认值,但数据库没有
} else if (field.GORMDataType != schema.Time && dv != field.DefaultValue) ||
(field.GORMDataType == schema.Time && !strings.EqualFold(strings.TrimSuffix(dv, "()"), strings.TrimSuffix(field.DefaultValue, "()"))) {
// 默认值不相等且不都是NULL
if currentDefaultNotNull || dvNotNull {
alterColumn = true
}
}
}
// 6. 检查列注释
if comment, ok := columnType.Comment(); ok && comment != field.Comment {
if !field.PrimaryKey { // 非主键
alterColumn = true
}
}
// 仅当需要修改且未设置IgnoreMigration时执行ALTER
if alterColumn && !field.IgnoreMigration {
return m.DB.Migrator().(Migrator).alterColumn(dst, field.DBName, containsUnique)
}
return nil
}
迁移策略:
alterColumn函数处理列修改的核心逻辑,特别处理UNIQUE约束和列注释:
func (m Migrator) alterColumn(value interface{}, field string, containsUnique bool) error {
return m.RunWithValue(value, func(stmt *gorm.Statement) error {
if field := stmt.Schema.LookUpField(field); field != nil {
typeof := m.FullDataTypeOf(field)
// 关键优化:避免重复添加UNIQUE约束
// containsUnique: 原本是否就有UNIQUE字段
// 如果列原本就有UNIQUE,且修改后仍有UNIQUE,则移除SQL中的UNIQUE关键字
if containsUnique && field.Unique {
typeof.SQL = strings.Replace(typeof.SQL, " UNIQUE", "", 1)
}
// 执行ALTER TABLE修改列
if err := m.DB.Exec("ALTER TABLE ? MODIFY ? ?",
clause.Table{Name: stmt.Table},
clause.Column{Name: field.DBName},
typeof,
).Error; err != nil {
return err
}
// 更新列注释
if err := m.alterColumnComment(stmt, field); err != nil {
return err
}
}
return fmt.Errorf("failed to look up field with name: %s", field)
})
}
UNIQUE约束处理逻辑:
containsUnique && field.Unique(已有且仍需)时移除alterColumnComment函数达梦使用特定语法管理列注释:
func (m Migrator) alterColumnComment(stmt *gorm.Statement, field *schema.Field) error {
if field.Comment != "" {
if err := m.DB.Exec("COMMENT ON COLUMN ?.? IS ?",
m.CurrentTable(stmt),
clause.Column{Name: field.DBName},
gorm.Expr(m.Migrator.Dialector.Explain("?", field.Comment)),
).Error; err != nil {
return err
}
}
return nil
}
语法适配:
COMMENT ON COLUMN table.column IS 'comment'ColumnTypes函数获取列元数据是迁移器最复杂的操作之一,需要整合多源数据:
func (m Migrator) ColumnTypes(dst interface{}) ([]gorm.ColumnType, error) {
columnTypes := make([]gorm.ColumnType, 0)
execErr := m.RunWithValue(dst, func(stmt *gorm.Statement) error {
var (
currentDatabase = m.CurrentDatabase()
table = stmt.Table
// 1. 基础列信息查询
columnTypeSQL = `SELECT /*+ MAX_OPT_N_TABLES(5) */ COLS.NAME, COLS.DEFVAL FROM
(SELECT ID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCH' AND NAME = ?) SCHS,
(SELECT ID, SCHID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCHOBJ' AND SUBTYPE$ IN ('UTAB', 'STAB', 'VIEW') AND NAME = ?) TABS,
SYS.SYSCOLUMNS COLS
WHERE TABS.ID=COLS.ID AND SCHS.ID = TABS.SCHID`
// 2. 约束信息查询 (主键、唯一)
columnConsSQL = `SELECT /*+ MAX_OPT_N_TABLES(5) */ COLS.NAME, LNNVL(CONS.TYPE$!='P'), LNNVL(CONS.TYPE$!='U') FROM
(SELECT ID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCH' AND NAME = ?) SCHS,
(SELECT ID, SCHID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCHOBJ' AND SUBTYPE$ IN ('UTAB', 'STAB', 'VIEW') AND NAME = ?) TABS,
SYS.SYSCOLUMNS COLS,
SYS.SYSCONS CONS,
SYS.SYSINDEXES INDS
WHERE SCHS.ID=TABS.SCHID AND TABS.ID=COLS.ID AND COLS.ID=CONS.TABLEID and CONS.INDEXID=INDS.ID and SF_COL_IS_IDX_KEY(INDS.KEYNUM, INDS.KEYINFO, COLS.COLID)=1`
consMap = make(map[string][][]bool) // 列名 -> [是否主键, 是否唯一] 映射
rows, err = m.DB.Session(&gorm.Session{}).Table(stmt.Table).Limit(1).Rows()
)
// 3. 获取驱动标准列类型 (用于基础数据类型信息)
rawColumnTypes, err := rows.ColumnTypes()
// 确保资源释放
if err := rows.Close(); err != nil {
return err
}
// 4. 查询约束信息
cons, consErr := m.DB.Table(table).Raw(columnConsSQL, currentDatabase, table).Rows()
defer cons.Close()
for cons.Next() {
var (
colName string
values = make([]bool, 2) // [是否主键, 是否唯一]
)
cons.Scan(&colName, &values[0], &values[1])
consMap[colName] = append(consMap[colName], values)
}
// 5. 查询列信息
columns, rowErr := m.DB.Table(table).Raw(columnTypeSQL, currentDatabase, table).Rows()
defer columns.Close()
for columns.Next() {
var (
column migrator.ColumnType
values = []interface{}{&column.NameValue, &column.DefaultValueValue}
)
columns.Scan(values...)
// 5.1 清理默认值引号
column.DefaultValueValue.String = strings.Trim(column.DefaultValueValue.String, "'")
// 5.2 绑定约束信息
for key, value := range consMap {
if key == column.NameValue.String {
for _, con := range value {
if con[0] { // 是否主键
column.PrimaryKeyValue.Bool = true
column.PrimaryKeyValue.Valid = true
}
if con[1] { // 是否唯一
column.UniqueValue.Bool = true
column.UniqueValue.Valid = true
}
}
break
}
}
// 5.3 整合驱动标准列类型
for _, c := range rawColumnTypes {
if c.Name() == column.NameValue.String {
column.SQLColumnType = c
break
}
}
columnTypes = append(columnTypes, column)
}
return nil
})
return columnTypes, execErr
}
多源元数据整合:
LNNVL(CONS.TYPE$!='P'判断主键,CONS.TYPE$ = 'U'判断唯一键)和SF_COL_IS_IDX_KEY(判断该列是否是索引键列)处理复杂条件达梦迁移器对视图的支持有限,直接调用父类方法并标记不支持:
func (m Migrator) CreateView(name string, option gorm.ViewOption) error {
// super, not support
return m.Migrator.CreateView(name, option)
}
func (m Migrator) DropView(name string) error {
// super, not support
return m.Migrator.DropView(name)
}
HasConstraint函数通过系统表精确查询约束:
func (m Migrator) HasConstraint(value interface{}, name string) bool {
conSql := `
select count(CON_OBJ.NAME) from
(select ID from SYSOBJECTS where TYPE$='SCH' and NAME = ?) SCH_OBJ,
(select ID, SCHID from SYSOBJECTS where TYPE$='SCHOBJ' and SUBTYPE$ like '_TAB') TAB_OBJ,
(select ID, NAME from SYSOBJECTS where SUBTYPE$ = 'CONS' and NAME=?) CON_OBJ,
SYSCONS CONS
where CON_OBJ.ID=CONS.ID and TAB_OBJ.ID=CONS.TABLEID and TAB_OBJ.SCHID=SCH_OBJ.ID;`
var count int64
m.RunWithValue(value, func(stmt *gorm.Statement) error {
constraint, chk, _ := m.GuessConstraintAndTable(stmt, name)
if constraint != nil {
name = constraint.Name
} else if chk != nil {
name = chk.Name
}
return m.DB.Raw(conSql, m.CurrentDatabase(), name).Row().Scan(&count)
})
return count > 0
}
约束类型处理:
Create/DropConstraint函数)直接调用父类实现,依赖达梦标准SQL支持:
func (m Migrator) CreateConstraint(dst interface{}, name string) error {
return m.Migrator.CreateConstraint(dst, name)
}
func (m Migrator) DropConstraint(value interface{}, name string) error {
return m.Migrator.DropConstraint(value, name)
}
HasIndex函数处理普通索引和全文索引的联合查询:
func (m Migrator) HasIndex(value interface{}, name string) bool {
indexSql := `
WITH USERS(ID) AS (SELECT ID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCH' AND NAME = ?),
TAB(ID,SCHID) AS (SELECT ID, SCHID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCHOBJ' AND SUBTYPE$ = 'UTAB' AND NAME = ?)
SELECT COUNT(DISTINCT INDEX_NAME) FROM (
-- 普通索引
SELECT /*+ MAX_OPT_N_TABLES(5) */ OBJ_INDS.NAME AS INDEX_NAME FROM USERS, TAB, SYS.SYSINDEXES AS INDS, SYS.SYSCOLUMNS AS COLS,
(SELECT ID, PID, NAME FROM SYS.SYSOBJECTS WHERE SUBTYPE$='INDEX' AND NAME = ?) OBJ_INDS
WHERE TAB.ID =COLS.ID AND TAB.ID =OBJ_INDS.PID AND INDS.ID=OBJ_INDS.ID AND TAB.SCHID= USERS.ID
AND SF_COL_IS_IDX_KEY(INDS.KEYNUM, INDS.KEYINFO, COLS.COLID)=1
UNION
-- 全文索引
SELECT OBJ_INDS.NAME AS INDEX_NAME FROM USERS, TAB, SYSCONTEXTINDEXES AS OBJ_INDS, SYS.SYSCOLUMNS AS COLS
WHERE TAB.ID = COLS.ID AND TAB.ID = OBJ_INDS.TABLEID AND COLS.COLID = OBJ_INDS.COLID AND TAB.SCHID = USERS.ID AND OBJ_INDS.NAME = ?
)`
var count int64
m.RunWithValue(value, func(stmt *gorm.Statement) error {
if idx := stmt.Schema.LookIndex(name); idx != nil {
name = idx.Name
}
return m.DB.Raw(indexSql, m.CurrentDatabase(), stmt.Schema.Table, name, name).Row().Scan(&count)
})
return count > 0
}
索引类型处理:
GetIndexes函数最复杂的元数据查询,整合多表信息:
func (m Migrator) GetIndexes(value interface{}) ([]gorm.Index, error) {
//查询某个模式(Schema)下的某个表中,所有列相关的索引信息(包括普通索引和全文索引),并返回每列所属的索引名称、是否唯一、是否主键等信息
indexSql := `
-- 获取指定模式(Schema)的 ID
WITH USERS(ID) AS (SELECT ID FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCH' AND NAME = ?),
-- 获取指定表的 ID、模式 ID 和表名
TAB(ID,SCHID,NAME) AS (SELECT ID, SCHID, NAME FROM SYS.SYSOBJECTS WHERE TYPE$ = 'SCHOBJ' AND SUBTYPE$ = 'UTAB' AND NAME = ?)
-- 获取普通索引信息
SELECT /*+ MAX_OPT_N_TABLES(5) */
TAB.NAME AS TABLE_NAME,
COLS.NAME AS COLUMN_NAME,
OBJ_INDS.NAME AS INDEX_NAME,
CASE INDS.ISUNIQUE WHEN 'Y' THEN 0 ELSE 1 END AS NON_UNIQUE,
CASE OBJ_INDS.TYPE$ WHEN 'P' THEN 1 ELSE 0 END AS IS_PRIMARY
FROM USERS, TAB, SYS.SYSINDEXES AS INDS, SYS.SYSCOLUMNS AS COLS,
(SELECT INDS.ID, INDS.PID, INDS.NAME, CONS.TYPE$
FROM SYS.SYSOBJECTS AS INDS
LEFT JOIN SYS.SYSCONS AS CONS ON CONS.INDEXID=INDS.ID AND SUBTYPE$='INDEX') OBJ_INDS
WHERE TAB.ID =COLS.ID AND TAB.ID =OBJ_INDS.PID AND INDS.ID=OBJ_INDS.ID AND TAB.SCHID=USERS.ID
AND SF_COL_IS_IDX_KEY(INDS.KEYNUM, INDS.KEYINFO, COLS.COLID)=1
UNION
-- 获取全文索引信息
SELECT TAB.NAME AS TABLE_NAME, COLS.NAME AS COLUMN_NAME, OBJ_INDS.NAME AS INDEX_NAME, 1 AS NON_UNIQUE, 0 AS IS_PRIMARY
FROM USERS, TAB, SYSCONTEXTINDEXES AS OBJ_INDS, SYS.SYSCOLUMNS AS COLS
WHERE TAB.ID = COLS.ID AND TAB.ID = OBJ_INDS.TABLEID AND COLS.COLID = OBJ_INDS.COLID AND TAB.SCHID = USERS.ID;`
indexes := make([]gorm.Index, 0)
err := m.RunWithValue(value, func(stmt *gorm.Statement) error {
result := make([]*Index, 0)
m.DB.Raw(indexSql, m.CurrentDatabase(), stmt.Table).Scan(&result)
// 按索引名分组 (处理多列索引)
indexMap := groupByIndexName(result)
for _, idx := range indexMap {
tempIdx := &migrator.Index{
TableName: idx[0].TableName,
NameValue: idx[0].IndexName,
PrimaryKeyValue: sql.NullBool{Bool: idx[0].Primary, Valid: true},
UniqueValue: sql.NullBool{Bool: idx[0].NonUnique, Valid: true},
}
// 收集该索引的所有列
for _, x := range idx {
tempIdx.ColumnList = append(tempIdx.ColumnList, x.ColumnName)
}
indexes = append(indexes, tempIdx)
}
return nil
})
return indexes, err
}
核心辅助函数:
func groupByIndexName(indexList []*Index) map[string][]*Index {
columnIndexMap := make(map[string][]*Index, len(indexList))
for _, idx := range indexList {
columnIndexMap[idx.IndexName] = append(columnIndexMap[idx.IndexName], idx)
}
return columnIndexMap
}
多列索引处理:
Create/Drop/Rename函数)// 创建索引 - 直接调用父类
func (m Migrator) CreateIndex(dst interface{}, name string) error {
return m.Migrator.CreateIndex(dst, name)
}
// 删除索引 - 自定义实现
func (m Migrator) DropIndex(value interface{}, name string) error {
return m.RunWithValue(value, func(stmt *gorm.Statement) error {
if idx := stmt.Schema.LookIndex(name); idx != nil {
name = idx.Name // 获取实际索引名
}
return m.DB.Exec("DROP INDEX ?", clause.Column{Name: name}).Error
})
}
// 重命名索引
func (m Migrator) RenameIndex(value interface{}, oldName, newName string) error {
return m.RunWithValue(value, func(stmt *gorm.Statement) error {
return m.DB.Exec("ALTER INDEX ? RENAME TO ?",
clause.Column{Name: oldName},
clause.Column{Name: newName},
).Error
})
}
差异处理:
func (m Migrator) CurrentDatabase() (name string) {
m.DB.Raw("SELECT SYS_CONTEXT('USERENV', 'CURRENT_SCHEMA');").Row().Scan(&name)
return
}
var regFullDataType = regexp.MustCompile(`\D*(\d+)\D?`)
达梦GORM迁移器的实现体现了几个关键设计原则:
该实现方案为我们提供了处理复杂数据库适配问题的完整思路:从元数据查询到SQL语法适配,从数据类型映射到错误处理策略,全方位展示了企业级数据库驱动开发的关键技术要点。对于需要为特定数据库定制ORM功能的开发者,这份代码提供了宝贵的参考和实践指南。
文章
阅读量
获赞
