CREATE USER AQT IDENTIFIED BY DMdba123;
GRANT DBA TO AQT;
–以下是ORACLE需要的权限
GRANT EXECUTE ON DBMS_AQ TO AQT;
GRANT EXECUTE ON DBMS_AQADM TO AQT;
create table t1_test(id varchar,name varchar);
create or replace Type T1_TEST_MSG_TYPE as OBJECT(MESSAGE varchar(1000));
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => ‘T1_TEST_QUEUE_TABLE’,
QUEUE_PAYLOAD_TYPE => ‘T1_TEST_MSG_TYPE’,
MULTIPLE_CONSUMERS => TRUE
);
DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => ‘T1_TEST_QUEUE’,
QUEUE_TABLE => ‘T1_TEST_QUEUE_TABLE’
);
DBMS_AQADM.ADD_SUBSCRIBER(
QUEUE_NAME => ‘T1_TEST_QUEUE’,
SUBSCRIBER => SYS.AQ$_AGENT(‘T1_TEST_QUEUE_SUB’,null,null),
DELIVERY_MODE => SYS.DBMS_AQADM.BUFFERED
);
DBMS_AQADM.START_QUEUE(QUEUE_NAME => ‘T1_TEST_QUEUE’);
DBMS_AQ.REGISTER(
SYS.AQREGINFOLIST(SYS.AQ_REG_INFO(
‘T1_TEST_QUEUE:T1_TEST_QUEUE_SUB’,
DBMS_AQ.NAMESPACE_AQ,
‘PLSQL://T1_TEST_QUEUE_CALLBACK’,
HEXTORAW(‘FF’)
)
),
1);
create or replace trigger TRI_T1_TEST
after INSERT or UPDATE
on T1_TEST
referencing OLD ROW AS “OLD” NEW ROW AS “NEW”
for each row
DECLARE
pragma autonomous_transaction;
ENQUEUE_OPTIONS_ DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES_ DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MESSAGE_HANDLE_ RAW(16);
T1_TEST_MSG_PAYLOAD_ T1_TEST_MSG_TYPE;
ALREADY_NUM_ number;
BEGIN
T1_TEST_MSG_PAYLOAD_ := T1_TEST_MSG_TYPE(:new.name);
ENQUEUE_OPTIONS_.visibility := DBMS_AQ.IMMEDIATELY;
DBMS_AQ.ENQUEUE(QUEUE_NAME => ‘T1_TEST_QUEUE’,
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS_,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES_,
PAYLOAD => T1_TEST_MSG_PAYLOAD_,
MSGID => V_MESSAGE_HANDLE_);
–END IF;
commit;
Exception
when Others then
Rollback;
DBMS_OUTPUT.PUT_LINE(‘失败E:’||sqlerrm);
END;
/
CREATE OR REPLACE PROCEDURE T1_TEST_QUEUE_CALLBACK(con RAW,
reginfo SYS.AQREGINFO,descrSYS.AQ_DESCRIPTOR,
payload RAW,
payloadl NUMBER) AS
dequeue_options_ DBMS_AQ.DEQUEUE_OPTIONS_T;
–message_properties_ DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle_ RAW(16);
irr_msg_payload_ T1_TEST_MSG_TYPE;
qname VARCHAR2(200);
–pragma autonomous_transaction;
–wait_time NUMBER := 1000 * 20; – 设置等待时间为半分钟
BEGIN
dequeue_options_.msgid := descr.msg_id;
dequeue_options_.consumer_name := descr.consumer_name;
–dequeue_options_.wait := DBMS_AQ.no_wait + wait_time;
dequeue_options_.wait := DBMS_AQ.NO_WAIT; --测试不管用
dequeue_options_.VISIBILITY := DBMS_AQ.IMMEDIATELY; --测试不管用
dequeue_options_.DEQUEUE_MODE := DBMS_AQ.REMOVE;
qname := descr.queue_name;
PRINT qname;
–IF qname = ‘T1_TEST_QUEUE’ THEN
PRINT ‘OK’;
DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
dequeue_options => dequeue_options_,
message_properties => NULL,
payload => irr_msg_payload_,
msgid => message_handle_);
sleep(10); --模拟出队执行时间
PRINT ‘10sOK’;
– END IF;
END;
/
insert T1_TEST values (‘id’,‘name’);
commit;
出队过程中使用sleep(10)模拟出队时间测试。insert t1_test 表时,确实满足需求,但是执行时间会超过10s。
入队 DBMS_AQ.IMMEDIATE、DBMS_AQ.ON_COMMIT 不管用
订阅者 ‘PLSQL://AQ_TEST_DEQUEUE_PROC?PR=0’, 不管用
出队 DBMS_AQ.IMMEDIATE、DBMS_AQ.ON_COMMIT 不管用
###3.2 尝试使用中间过程
创建两个过程:其中一个是实际出队操作为过程1;另一个是订阅者调用为过程0。过程0中使用DBMS_JOB.SUBMIT调用过程1。
执行测试,报错 失败E:[-6521]:当前触发器不支持DDL语句 。
修改过程0,为插入中间表,再调用过程1。
修改过程1,出队参数从中间表获取,并修改中间表状态。
执行测试,报错 失败E:[-6521]:当前触发器不支持DDL语句 。
使用Job调用出队过程。中间表记录队列信息、队列状态(分为:待处理、处理中、完成、失败)
CREATE TABLE T1_QUEUE_MID_TABLE (
mid_id NUMBER PRIMARY KEY, – 中间表主键
queue_name VARCHAR(100) NOT NULL, – 队列名
msg_id RAW(16) NOT NULL, – 消息ID(出队唯一标识)
consumer_name VARCHAR(100) NOT NULL, – 订阅者名
create_time DATE DEFAULT SYSDATE, – 记录创建时间
process_status VARCHAR(20) DEFAULT ‘PENDING’, – 状态:PENDING(待处理)/PROCESSING(处理中)/COMPLETED(完成)/ERROR(失败)
error_msg VARCHAR(500) – 错误信息(仅失败时填充)
);
CREATE SEQUENCE T1_QUEUE_MID_SEQ
START WITH 1
INCREMENT BY 1
NOCYCLE;
COMMIT;
CREATE OR REPLACE TRIGGER TRI_T1_TEST
AFTER INSERT OR UPDATE
ON T1_TEST
REFERENCING OLD ROW AS “OLD” NEW ROW AS “NEW”
FOR EACH ROW
DECLARE
PRAGMA AUTONOMOUS_TRANSACTION; – 自治事务:避免影响主插入事务
– 入队相关变量
ENQUEUE_OPTIONS_ SYS.DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES_ SYS.DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MSG_HANDLE_ RAW(16); – 入队后返回的消息ID
T1_MSG_PAYLOAD_ T1_TEST_MSG_TYPE;
– 中间表参数
V_MID_ID_ NUMBER;
BEGIN
– 构造队列消息(用T1_TEST的name字段作为消息内容)
T1_MSG_PAYLOAD_ := T1_TEST_MSG_TYPE(:NEW.name);
– 入队配置(立即可见)
ENQUEUE_OPTIONS_.VISIBILITY := SYS.DBMS_AQ.IMMEDIATELY;
– 执行入队
SYS.DBMS_AQ.ENQUEUE(
QUEUE_NAME => ‘T1_TEST_QUEUE’,
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS_,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES_,
PAYLOAD => T1_MSG_PAYLOAD_,
MSGID => V_MSG_HANDLE_ – 入队成功后,获取消息ID
);
– 生成中间表主键
V_MID_ID_ := T1_QUEUE_MID_SEQ.NEXTVAL;
– 将出队参数插入中间表(核心:隔离触发器与出队)
INSERT INTO T1_QUEUE_MID_TABLE (
mid_id, queue_name, msg_id, consumer_name
) VALUES (
V_MID_ID_, ‘T1_TEST_QUEUE’, V_MSG_HANDLE_, ‘T1_TEST_QUEUE_SUB’
);
COMMIT; – 提交自治事务(入队+插中间表)
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE(‘触发器失败E:’ || SQLERRM);
END;
/
CREATE OR REPLACE PROCEDURE T1_SCHEDULE_DEQUEUE
AS
V_MID_ID_ NUMBER; – 中间表ID
PRAGMA AUTONOMOUS_TRANSACTION; – 自治事务:不影响主事务
– 出队相关变量
DEQUEUE_OPTIONS_ SYS.DBMS_AQ.DEQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES_ SYS.DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MSG_HANDLE_ RAW(16);
V_MSG_PAYLOAD_ T1_TEST_MSG_TYPE;
– 中间表参数
V_QUEUE_NAME_ VARCHAR(100);
V_MSG_ID_ RAW(16);
V_CONSUMER_NAME_ VARCHAR(100);
BEGIN
–读取1条未处理的中间表记录(加锁防并发:FOR UPDATE SKIP LOCKED)
SELECT mid_id , consumer_name , queue_name,msg_id
INTO V_MID_ID_ , V_CONSUMER_NAME_ , V_QUEUE_NAME_ ,V_MSG_ID_
FROM T1_QUEUE_MID_TABLE
WHERE process_status = ‘PENDING’
ORDER BY create_time ASC
FETCH FIRST 1 ROW ONLY FOR UPDATE SKIP LOCKED; – 避免多调度重复处理
– 标记为「处理中」
UPDATE T1_QUEUE_MID_TABLE
SET process_status = ‘PROCESSING’
WHERE mid_id = V_MID_ID_;
COMMIT;
DEQUEUE_OPTIONS_.MSGID := V_MSG_ID_ ; – 仅出队指定消息
DEQUEUE_OPTIONS_.CONSUMER_NAME := V_CONSUMER_NAME_ ; – 匹配订阅者
DEQUEUE_OPTIONS_.DEQUEUE_MODE := SYS.DBMS_AQ.REMOVE; – 出队后删除消息
DEQUEUE_OPTIONS_.WAIT := 10; – 无消息时等待10秒(避免立即报错)
DEQUEUE_OPTIONS_.VISIBILITY := SYS.DBMS_AQ.IMMEDIATELY; – 出队立即可见
– 打印调试信息
DBMS_OUTPUT.PUT_LINE(‘开始出队:中间表ID=’ || V_MID_ID_ || ‘,队列名=’ || V_QUEUE_NAME_);
– 执行出队
SYS.DBMS_AQ.DEQUEUE(
queue_name => V_QUEUE_NAME_,
dequeue_options => DEQUEUE_OPTIONS_,
message_properties => MESSAGE_PROPERTIES_,
payload => V_MSG_PAYLOAD_,
msgid => V_MSG_HANDLE_
);
– 模拟业务延迟(10秒)
–SYS.DBMS_LOCK.SLEEP(10);
– 打印出队结果
DBMS_OUTPUT.PUT_LINE(‘出队成功:消息内容=’ || V_MSG_PAYLOAD_.MESSAGE || ‘,耗时10秒’);
– 更新中间表状态为「完成」
UPDATE T1_QUEUE_MID_TABLE
SET process_status = ‘COMPLETED’,
create_time = SYSDATE
WHERE mid_id = V_MID_ID_;
COMMIT;
EXCEPTION
WHEN NO_DATA_FOUND THEN
– 无未处理记录,不报错
NULL;
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE(‘调度程序失败E:’ || SQLERRM);
UPDATE T1_QUEUE_MID_TABLE
SET process_status = ‘ERROR’,
error_msg = SUBSTR(SQLERRM, 1, 500) – 截取错误信息(避免字段过长)
WHERE mid_id = V_MID_ID_;
ROLLBACK;
DBMS_OUTPUT.PUT_LINE(‘出队失败E:中间表ID=’ || V_MID_ID_ || ‘,原因=’ || SQLERRM);
RAISE; – 可选:抛出异常便于监控
END;
/
call SP_CREATE_JOB(‘DUILIE’,1,0,’’,0,0,’’,0,’’);
call SP_JOB_CONFIG_START(‘DUILIE’);
call SP_ADD_JOB_STEP_EX(‘DUILIE’, ‘DUILIE’, 0, ‘call AQT.T1_SCHEDULE_DEQUEUE;’, 0, 0, 0, 0, NULL, 0, ‘null’);
call SP_ADD_JOB_SCHEDULE(‘DUILIE’, ‘DUILIE’, 1, 1, 1, 0, 1, ‘00:00:00’, ‘23:59:59’, ‘2025-09-03 16:16:25’, NULL, ‘’);
call SP_JOB_CONFIG_COMMIT(‘DUILIE’);
call SP_JOB_SET_SCHEMA(‘DUILIE’, ‘AQT’);
insert T1_TEST values (‘bingo’,‘name’); commit;
–查看
手动执行call AQT.T1_SCHEDULE_DEQUEUE;
select *from T1_TEST_QUEUE_TABLE;
select *from T1_QUEUE_MID_TABLE;
文章
阅读量
获赞
