DBMS_AQ 包

DM的DBMS_AQ包部分兼容ORACLE的DBMS_AQ包,提供高级队列的以下常用功能:消息入队出队,注册、注销通知等。

39.1 数据类型

  1. SYS.AQ$_RECIPIENT_LIST_T

系统对象类型,用于描述订阅者信息组。

语法如下:

TYPE SYS.AQ$_RECIPIENT_LIST_T IS TABLE OF SYS.AQ\$_AGENT INDEX BY INTEGER;
  1. SYS.AQ$_REG_INFO

系统对象类型,用于描述注册信息。

语法如下:

TYPE SYS.AQ$_REG_INFO AS OBJECT(

	name VARCHAR(128),

	namespace NUMBER,

	callback VARCHAR(256),

	ctx VARBINARY(2000),

	anyctx "<ADT_1>",

	ctxtype NUMBER,

	qosflags NUMBER,

	payloadcbk VARCHAR(4000),

	timeout NUMBER

);

参数详解

  • NAME指定注册信息,格式为:[模式名.]队列名:订阅者。非正规表示符要用""括起来,如SYSDBA."@que_name":sub_name。
  • NAMESPACE指定订阅者命名空间,取值包括NAMESPACE_ANONYMOUS或NAMESPACE_AQ。仅做语法支持。
  • CALLBACK指定回调过程名,格式为:PLSQL://过程名。
  • 剩余参数仅做语法支持。
  1. SYS.AQ$_REG_INFO_LIST

系统对象类型,用于描述注册信息组。

语法如下:

TYPE SYS.AQ$_REG_INFO_LIST AS VARRAY(1024) OF SYS.AQ$_REG_INFO;
  1. SYS.AQ$_DESCRIPTOR

系统对象类型,用于描述回调过程参数。

语法如下:

TYPE SYS.AQ$_DESCRIPTOR AS OBJEC (

	queue_name VARCHAR(65),

	consumer_name VARCHAR(30),

	msg_id VARBINARY(16)

);

参数详解

  • queue_name指定触发回调的队列名,格式为:模式名.队列名。
  • CONSUMER_NAME指定触发回调的订阅者名。
  • MSG_ID为触发回调的消息id。
  1. DBMS_AQ. ENQUEUE_OPTIONS_T

DBMS_AQ包专有类型,指定入队选项。

语法如下:

TYPE ENQUEUE_OPTIONS_T IS RECORD (

	visibility INTEGER DEFAULT ON_COMMIT,

	relative_msgid VARBINARY(16) DEFAULT NULL,

	sequence_deviation INTEGER DEFAULT NULL

);

参数详解

  • 参数仅做语法支持。
  1. DBMS_AQ.DEQUEUE_OPTIONS_T

DBMS_AQ包专有类型,指定出队选项。

语法如下:

TYPE DEQUEUE_OPTIONS_T IS RECORD (

	consumer_name VARCHAR(30) DEFAULT NULL,

	dequeue_mode INTEGER DEFAULT REMOVE,

	navigation INTEGER DEFAULT NEXT_MESSAGE,

	visibility INTEGER DEFAULT ON_COMMIT,

	wait INTEGER DEFAULT FOREVER,

	msgid VARBINARY(16) DEFAULT NULL

);

参数详解

  • DEQUEUE_MODE指定出队模式,取值包括:BROWSE(直接读取消息)、REMOVE(读取后删除消息)。
  • WAIT指定没有消息时的等待时间(秒),值FOREVER和NO_WAIT分别表示一直等待和不等待。
  • 剩余参数仅做语法支持。
  1. DBMS_AQ. MESSAGE_PROPERTIES_T

DBMS_AQ包专有类型,指定消息属性。

语法如下:

TYPE MESSAGE_PROPERTIES_T IS RECORD (

	priority INTEGER NOT NULL DEFAULT 1,

	delay INTEGER NOT NULL DEFAULT 0,

	expiration INTEGER NOT NULL DEFAULT -1,

	attempts INTEGER,

	recipient_list SYS.AQ$_RECIPIENT_LIST_T,

	exception_queue VARCHAR(61) DEFAULT NULL,

	state INTEGER

);

参数详解

  • 参数仅做语法支持。

39.2 相关方法

  1. REGISTER

注册订阅者和回调信息。

语法如下:

PROCEDURE REGISTER(

	reg_list IN SYS.AQ$_REG_INFO_LIST

	reg_count IN NUMBER

);

参数详解

  • REG_LIST指定注册信息数组。参考2.1数据类型说明。
  • REG_COUNT指定REG_LIST中的注册信息个数。
  1. UNREGISTER

注销订阅者和回调信息。

语法如下:

PROCEDURE UNREGISTER(

	reg_list IN SYS.AQ\$_REG_INFO_LIST

	reg_count IN NUMBER

);

参数详解

  • REG_LIST指定注销信息数组。参考2.1数据类型说明。
  • REG_COUNT指定REG_LIST中的注销信息个数。
  1. ENQUEUE

消息入队,若队列注册过订阅者和回调信息,则入队消息后会尝试为每个订阅者执行其注册的回调过程。

语法如下:

PROCEDURE ENQUEUE(

	queue_name IN VARCHAR,

	enqueue_options IN ENQUEUE_OPTIONS_T,

	message_properties IN MESSAGE_PROPERTIES_T,

	payload IN "<ADT_1>",

	msgid OUT VARBINARY

);

参数详解

  • QUEUE_NAME指定要入队的队列名,格式为:[模式名.]队列名。
  • ENQUEUE_OPTIONS指定入队选项,仅做语法支持。参考2.1数据类型说明。
  • MESSAGE_PROPERTIES指定消息属性,仅做语法支持。参考2.1数据类型说明。
  • PAYLOAD指定消息载荷,必须与创建队列表指定的消息载荷类型对应。
  • MSGID输出生成的入队消息id。
  1. DEQUEUE

消息出队,队列中存在多条消息时总是优先出队第一条。

语法如下:

PROCEDURE DEQUEUE(

	queue_name IN VARCHAR,

	dequeue_options IN DEQUEUE_OPTIONS_T,

	message_properties OUT MESSAGE_PROPERTIES_T,

	payload OUT "<ADT_1>",

	msgid OUT VARBINARY

);

参数详解

  • QUEUE_NAME指定要出队的队列名,格式为:[模式名.]队列名。
  • DEQUEUE_OPTIONS指定出队选项。参考2.1数据类型说明。
  • MESSAGE_PROPERTIES指定消息属性,仅做语法支持。参考2.1数据类型说明。
  • PAYLOAD输出消息载荷,必须与创建队列表指定的消息载荷类型对应。
  • MSGID输出消息id。

39.3 回调说明

每个订阅者可注册多个回调过程,但消息入队时最多仅触发一个回调。触发的回调过程必须有效,与队列处于同一模式,且参数数目、类型和顺序必须满足如下定义,不符合预期的回调会跳过执行:

PROCEDURE CALLBACK(

	CONTEXT1 RAW,

	REGINFO SYS.AQ$_REG_INFO,

	DESCR SYS.AQ$_DESCRIPTOR,

	PAYLOAD RAW,

	PAYLOADL NUMBER

);

参数详解

  • REGINFO指定触发回调的注册信息,同过程REGISTER参数。参考2.1节数据类型说明。
  • DESCR指定触发回调的消息和队列信息。参考2.1节数据类型说明。
  • 剩余参数仅做语法支持。

39.4 举例说明

使用包过程和函数之前,如果还未创建过系统包,请先调用系统过程创建。

SP_CREATE_SYSTEM_PACKAGES (1,'DBMS_AQ');

例1 在38.3节例3之后执行以下操作:创建回调过程MY_CALLBACK并注册到订阅者MY_SUB。

CREATE OR REPLACE PROCEDURE SSS.MY_CALLBACK(

	context1 RAW,

	reginfo SYS.AQ$_REG_INFO,

	descr SYS.AQ$_DESCRIPTOR,

	payload RAW,

	payloadl NUMBER

) AS

	DOP DBMS_AQ.DEQUEUE_OPTIONS_T;

	MSG SYSDBA.MY_MSG_TYPE;

	MSGID RAW(16);

BEGIN

	DOP.dequeue_mode := DBMS_AQ.BROWSE;

	DBMS_AQ.DEQUEUE(

	queue_name => descr.queue_name,

	dequeue_options => DOP,

	message_properties => NULL,

	payload => MSG,

	msgid => MSGID

  );
  PRINT MSG.MESSAGE;

END;

/

DBMS_AQ.REGISTER(

	SYS.AQ$_REG_INFO_LIST(

		SYS.AQ$_REG_INFO(

		'SSS.MY_QUEUE:MY_SUB',

		DBMS_AQ.NAMESPACE_AQ,

		'plsql://MY_CALLBACK',

		HEXTORAW('FF')

  )
  
  ),
  
1);

例2 消息入队,入队后会触发回调MY_CALLBACK,从而打印入队的消息载荷。

DECLARE

	EOP DBMS_AQ.ENQUEUE_OPTIONS_T;

	MSG MY_MSG_TYPE;

	MSGID RAW(16);

BEGIN

	MSG := MY_MSG_type(TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3'));

	DBMS_AQ.ENQUEUE(

		queue_name => 'SSS.MY_QUEUE',

		enqueue_options => EOP,

		message_properties => NULL,

		payload => MSG,

		msgid => MSGID

);

COMMIT;

END;

/

例3 消息以REMOVE模式出队,出队后消息会被删除。

DECLARE

	DOP DBMS_AQ.DEQUEUE_OPTIONS_T;

	MSG MY_MSG_TYPE;

	MSGID RAW(16);

BEGIN

	DBMS_AQ.DEQUEUE(

		queue_name => 'SSS.MY_QUEUE',

		dequeue_options => DOP,

		message_properties => NULL,

		payload => MSG,

		msgid => MSGID

);
	PRINT MSG.MESSAGE;

END;

/
微信扫码
分享文档
扫一扫
联系客服