注册
达梦队列的使用
专栏/技术分享/ 文章详情 /

达梦队列的使用

BinGoSTop 2025/11/07 229 0 0
摘要

达梦异步队列使用

1 数据写入触发事件捕获 -> 队列订阅与通知机制 -> 异步任务调度 -> 后台异步处理

2 创建测试案例:

2.1 创建测试用户及相关权限

CREATE USER AQT IDENTIFIED BY DMdba123;
GRANT DBA TO AQT;
–以下是ORACLE需要的权限
GRANT EXECUTE ON DBMS_AQ TO AQT;
GRANT EXECUTE ON DBMS_AQADM TO AQT;

2.2 创建基础测试表

create table t1_test(id varchar,name varchar);

2.3 创建队列消息类型

create or replace Type T1_TEST_MSG_TYPE as OBJECT(MESSAGE varchar(1000));

2.4 创建队列表

DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => ‘T1_TEST_QUEUE_TABLE’,
QUEUE_PAYLOAD_TYPE => ‘T1_TEST_MSG_TYPE’,
MULTIPLE_CONSUMERS => TRUE
);

2.5 创建队列

DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => ‘T1_TEST_QUEUE’,
QUEUE_TABLE => ‘T1_TEST_QUEUE_TABLE’
);

2.6 添加订阅者

DBMS_AQADM.ADD_SUBSCRIBER(
QUEUE_NAME => ‘T1_TEST_QUEUE’,
SUBSCRIBER => SYS.AQ$_AGENT(‘T1_TEST_QUEUE_SUB’,null,null),
DELIVERY_MODE => SYS.DBMS_AQADM.BUFFERED
);

2.7 启动队列(队列必须启动才能入队/出队)

DBMS_AQADM.START_QUEUE(QUEUE_NAME => ‘T1_TEST_QUEUE’);

2.8 创建订阅 - 回调模式,启用自动通知

DBMS_AQ.REGISTER(
SYS.AQREGINFOLIST(SYS.AQ_REG_INFO_LIST( SYS.AQ_REG_INFO(
‘T1_TEST_QUEUE:T1_TEST_QUEUE_SUB’,
DBMS_AQ.NAMESPACE_AQ,
‘PLSQL://T1_TEST_QUEUE_CALLBACK’,
HEXTORAW(‘FF’)
)
),
1);

2.9 创建触发器

create or replace trigger TRI_T1_TEST
after INSERT or UPDATE
on T1_TEST
referencing OLD ROW AS “OLD” NEW ROW AS “NEW”
for each row
DECLARE
pragma autonomous_transaction;
ENQUEUE_OPTIONS_ DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES_ DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MESSAGE_HANDLE_ RAW(16);
T1_TEST_MSG_PAYLOAD_ T1_TEST_MSG_TYPE;

ALREADY_NUM_ number;
BEGIN
T1_TEST_MSG_PAYLOAD_ := T1_TEST_MSG_TYPE(:new.name);

ENQUEUE_OPTIONS_.visibility := DBMS_AQ.IMMEDIATELY;

DBMS_AQ.ENQUEUE(QUEUE_NAME => ‘T1_TEST_QUEUE’,
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS_,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES_,
PAYLOAD => T1_TEST_MSG_PAYLOAD_,
MSGID => V_MESSAGE_HANDLE_);

–END IF;
commit;
Exception
when Others then
Rollback;
DBMS_OUTPUT.PUT_LINE(‘失败E:’||sqlerrm);
END;
/

2.10 创建出队过程

CREATE OR REPLACE PROCEDURE T1_TEST_QUEUE_CALLBACK(con RAW,
reginfo SYS.AQREGINFO,descrSYS.AQ_REG_INFO, descr SYS.AQ_DESCRIPTOR,
payload RAW,
payloadl NUMBER) AS

dequeue_options_ DBMS_AQ.DEQUEUE_OPTIONS_T;
–message_properties_ DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle_ RAW(16);
irr_msg_payload_ T1_TEST_MSG_TYPE;
qname VARCHAR2(200);
–pragma autonomous_transaction;

–wait_time NUMBER := 1000 * 20; – 设置等待时间为半分钟

BEGIN

dequeue_options_.msgid := descr.msg_id;
dequeue_options_.consumer_name := descr.consumer_name;
–dequeue_options_.wait := DBMS_AQ.no_wait + wait_time;
dequeue_options_.wait := DBMS_AQ.NO_WAIT; --测试不管用
dequeue_options_.VISIBILITY := DBMS_AQ.IMMEDIATELY; --测试不管用
dequeue_options_.DEQUEUE_MODE := DBMS_AQ.REMOVE;
qname := descr.queue_name;

PRINT qname;
–IF qname = ‘T1_TEST_QUEUE’ THEN
PRINT ‘OK’;
DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
dequeue_options => dequeue_options_,
message_properties => NULL,
payload => irr_msg_payload_,
msgid => message_handle_);
sleep(10); --模拟出队执行时间
PRINT ‘10sOK’;
– END IF;
END;
/

2.11 测试

insert T1_TEST values (‘id’,‘name’);
commit;

2.12 测试结果

出队过程中使用sleep(10)模拟出队时间测试。insert t1_test 表时,确实满足需求,但是执行时间会超过10s。

3 测试异步方法

3.1 出队、入队、订阅者参数

入队 DBMS_AQ.IMMEDIATE、DBMS_AQ.ON_COMMIT 不管用
订阅者 ‘PLSQL://AQ_TEST_DEQUEUE_PROC?PR=0’, 不管用
出队 DBMS_AQ.IMMEDIATE、DBMS_AQ.ON_COMMIT 不管用
###3.2 尝试使用中间过程
创建两个过程:其中一个是实际出队操作为过程1;另一个是订阅者调用为过程0。过程0中使用DBMS_JOB.SUBMIT调用过程1。
执行测试,报错 失败E:[-6521]:当前触发器不支持DDL语句 。
修改过程0,为插入中间表,再调用过程1。
修改过程1,出队参数从中间表获取,并修改中间表状态。
执行测试,报错 失败E:[-6521]:当前触发器不支持DDL语句 。

4 最终版本

4.1 异步思路

使用Job调用出队过程。中间表记录队列信息、队列状态(分为:待处理、处理中、完成、失败)

4.2 创建「中间表」(核心:存储出队参数,隔离触发器与出队逻辑)

CREATE TABLE T1_QUEUE_MID_TABLE (
mid_id NUMBER PRIMARY KEY, – 中间表主键
queue_name VARCHAR(100) NOT NULL, – 队列名
msg_id RAW(16) NOT NULL, – 消息ID(出队唯一标识)
consumer_name VARCHAR(100) NOT NULL, – 订阅者名
create_time DATE DEFAULT SYSDATE, – 记录创建时间
process_status VARCHAR(20) DEFAULT ‘PENDING’, – 状态:PENDING(待处理)/PROCESSING(处理中)/COMPLETED(完成)/ERROR(失败)
error_msg VARCHAR(500) – 错误信息(仅失败时填充)
);

4.3 创建中间表序列(生成主键用)

CREATE SEQUENCE T1_QUEUE_MID_SEQ
START WITH 1
INCREMENT BY 1
NOCYCLE;
COMMIT;

4.4 修改触发器:仅入队+插中间表(纯DML,无DDL)

CREATE OR REPLACE TRIGGER TRI_T1_TEST
AFTER INSERT OR UPDATE
ON T1_TEST
REFERENCING OLD ROW AS “OLD” NEW ROW AS “NEW”
FOR EACH ROW
DECLARE
PRAGMA AUTONOMOUS_TRANSACTION; – 自治事务:避免影响主插入事务
– 入队相关变量
ENQUEUE_OPTIONS_ SYS.DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES_ SYS.DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MSG_HANDLE_ RAW(16); – 入队后返回的消息ID
T1_MSG_PAYLOAD_ T1_TEST_MSG_TYPE;
– 中间表参数
V_MID_ID_ NUMBER;
BEGIN
– 构造队列消息(用T1_TEST的name字段作为消息内容)
T1_MSG_PAYLOAD_ := T1_TEST_MSG_TYPE(:NEW.name);

– 入队配置(立即可见)
ENQUEUE_OPTIONS_.VISIBILITY := SYS.DBMS_AQ.IMMEDIATELY;

– 执行入队
SYS.DBMS_AQ.ENQUEUE(
QUEUE_NAME => ‘T1_TEST_QUEUE’,
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS_,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES_,
PAYLOAD => T1_MSG_PAYLOAD_,
MSGID => V_MSG_HANDLE_ – 入队成功后,获取消息ID
);

– 生成中间表主键
V_MID_ID_ := T1_QUEUE_MID_SEQ.NEXTVAL;

– 将出队参数插入中间表(核心:隔离触发器与出队)
INSERT INTO T1_QUEUE_MID_TABLE (
mid_id, queue_name, msg_id, consumer_name
) VALUES (
V_MID_ID_, ‘T1_TEST_QUEUE’, V_MSG_HANDLE_, ‘T1_TEST_QUEUE_SUB’
);

COMMIT; – 提交自治事务(入队+插中间表)
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE(‘触发器失败E:’ || SQLERRM);
END;
/

4.5 创建消费过程便于job调用

CREATE OR REPLACE PROCEDURE T1_SCHEDULE_DEQUEUE
AS
V_MID_ID_ NUMBER; – 中间表ID
PRAGMA AUTONOMOUS_TRANSACTION; – 自治事务:不影响主事务
– 出队相关变量
DEQUEUE_OPTIONS_ SYS.DBMS_AQ.DEQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES_ SYS.DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MSG_HANDLE_ RAW(16);
V_MSG_PAYLOAD_ T1_TEST_MSG_TYPE;
– 中间表参数
V_QUEUE_NAME_ VARCHAR(100);
V_MSG_ID_ RAW(16);
V_CONSUMER_NAME_ VARCHAR(100);
BEGIN
–读取1条未处理的中间表记录(加锁防并发:FOR UPDATE SKIP LOCKED)
SELECT mid_id , consumer_name , queue_name,msg_id
INTO V_MID_ID_ , V_CONSUMER_NAME_ , V_QUEUE_NAME_ ,V_MSG_ID_
FROM T1_QUEUE_MID_TABLE
WHERE process_status = ‘PENDING’
ORDER BY create_time ASC
FETCH FIRST 1 ROW ONLY FOR UPDATE SKIP LOCKED; – 避免多调度重复处理

– 标记为「处理中」
UPDATE T1_QUEUE_MID_TABLE
SET process_status = ‘PROCESSING’
WHERE mid_id = V_MID_ID_;
COMMIT;

DEQUEUE_OPTIONS_.MSGID := V_MSG_ID_ ; – 仅出队指定消息
DEQUEUE_OPTIONS_.CONSUMER_NAME := V_CONSUMER_NAME_ ; – 匹配订阅者
DEQUEUE_OPTIONS_.DEQUEUE_MODE := SYS.DBMS_AQ.REMOVE; – 出队后删除消息
DEQUEUE_OPTIONS_.WAIT := 10; – 无消息时等待10秒(避免立即报错)
DEQUEUE_OPTIONS_.VISIBILITY := SYS.DBMS_AQ.IMMEDIATELY; – 出队立即可见

– 打印调试信息
DBMS_OUTPUT.PUT_LINE(‘开始出队:中间表ID=’ || V_MID_ID_ || ‘,队列名=’ || V_QUEUE_NAME_);

– 执行出队
SYS.DBMS_AQ.DEQUEUE(
queue_name => V_QUEUE_NAME_,
dequeue_options => DEQUEUE_OPTIONS_,
message_properties => MESSAGE_PROPERTIES_,
payload => V_MSG_PAYLOAD_,
msgid => V_MSG_HANDLE_
);

– 模拟业务延迟(10秒)
–SYS.DBMS_LOCK.SLEEP(10);

– 打印出队结果
DBMS_OUTPUT.PUT_LINE(‘出队成功:消息内容=’ || V_MSG_PAYLOAD_.MESSAGE || ‘,耗时10秒’);

– 更新中间表状态为「完成」
UPDATE T1_QUEUE_MID_TABLE
SET process_status = ‘COMPLETED’,
create_time = SYSDATE
WHERE mid_id = V_MID_ID_;

COMMIT;

EXCEPTION
WHEN NO_DATA_FOUND THEN
– 无未处理记录,不报错
NULL;
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE(‘调度程序失败E:’ || SQLERRM);
UPDATE T1_QUEUE_MID_TABLE
SET process_status = ‘ERROR’,
error_msg = SUBSTR(SQLERRM, 1, 500) – 截取错误信息(避免字段过长)
WHERE mid_id = V_MID_ID_;

ROLLBACK;
DBMS_OUTPUT.PUT_LINE(‘出队失败E:中间表ID=’ || V_MID_ID_ || ‘,原因=’ || SQLERRM);
RAISE; – 可选:抛出异常便于监控
END;
/

4.6 创建job调用过程

call SP_CREATE_JOB(‘DUILIE’,1,0,’’,0,0,’’,0,’’);
call SP_JOB_CONFIG_START(‘DUILIE’);
call SP_ADD_JOB_STEP_EX(‘DUILIE’, ‘DUILIE’, 0, ‘call AQT.T1_SCHEDULE_DEQUEUE;’, 0, 0, 0, 0, NULL, 0, ‘null’);
call SP_ADD_JOB_SCHEDULE(‘DUILIE’, ‘DUILIE’, 1, 1, 1, 0, 1, ‘00:00:00’, ‘23:59:59’, ‘2025-09-03 16:16:25’, NULL, ‘’);
call SP_JOB_CONFIG_COMMIT(‘DUILIE’);
call SP_JOB_SET_SCHEMA(‘DUILIE’, ‘AQT’);

4.7 测试

insert T1_TEST values (‘bingo’,‘name’); commit;
–查看
手动执行call AQT.T1_SCHEDULE_DEQUEUE;
select *from T1_TEST_QUEUE_TABLE;
select *from T1_QUEUE_MID_TABLE;

4.8 测试结果

4.8.1 执行insert 语句

image.png

4.8.2 查看队列表

image.png

4.8.3 查看中间表

image.png

4.8.4 执行出队过程

image.png

4.8.5 队列表在10s后消失

image.png

4.8.6 中间表:状态从 PENDING -> PROCESSING,过程执行完毕 从PROCESSING-> COMPLETED

image.png
image.png

评论
后发表回复

作者

文章

阅读量

获赞

扫一扫
联系客服