5.5 DBMS_AQADM: Performing AQ Administrative Tasks (Oracle8 only)Before AQ users can enqueue and dequeue, they must have queues with which to work. The AQ administrator must create queue tables and queues within those tables and then start the queues. Additional administrative tasks include stopping queues and removing queue tables, managing lists of subscribers, and starting and stopping the Queue Monitor. The DBMS_AQADM package provides an interface to the administrative tasks of Oracle AQ. The DBMS_AQADM programs are listed in Table 5.2 . In order to use these procedures, a DBMS_AQADM user must have been granted the role AQ_ADMINISTRATOR_ROLE from the SYS account.
The following sections describe these programs in a number of categories. 5.5.1 Creating Queue TablesFirst, you need to create your queue tables and grant the necessary capabilities. 5.5.1.1 The DBMS_AQADM. GRANT_TYPE_ACCESS procedureIf you would like to support multiple consumers with your queue (that is, so that the same or different messages can be dequeued by more than one agent), call the GRANT_TYPE_ACCESS program to grant that capability, PROCEDURE DBMS_AQADM.GRANT_TYPE_ACCESS (user_name IN VARCHAR2); where user_name is the name of the user who needs to perform multiple consumer queue activities. The SYS account must call this procedure to grant the privilege to the AQ administrator. The AQ administrator then runs this program to grant the privilege to AQ users. Here is an example of granting multiple consumer capabilities to the SCOTT account: SQL> exec DBMS_AQADM.GRANT_TYPE_ACCESS ('scott'); 5.5.1.2 The DBMS_AQADM. CREATE_QUEUE_TABLE procedureThe CREATE_QUEUE_TABLE procedure creates a queue table. A queue table is the named repository for a set of queues and their messages. A queue table may contain numerous queues, each of which may have many messages. But a given queue and its messages may exist in only one queue table. The specification for the procedure follows: PROCEDURE DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table IN VARCHAR2 ,queue_payload_type IN VARCHAR2 ,storage_clause IN VARCHAR2 DEFAULT NULL ,sort_list IN VARCHAR2 DEFAULT NULL ,multiple_consumers IN BOOLEAN DEFAULT FALSE ,message_grouping IN BINARY_INTEGER DEFAULT NONE ,comment IN VARCHAR2 DEFAULT NULL ,auto_commit IN BOOLEAN DEFAULT TRUE); Parameters are summarized in the following table.
The sort_list has the following format, '<sort_column_1>,<sort_column_2>' where each sort_column_N is either PRIORITY or ENQ_TIME. If both columns are specified, then <sort_column_1> defines the most significant order. In other words, these are the only valid values for sort_list besides NULL: 'PRIORITY' 'PRIORITY,ENQ_TIME' 'ENQ_TIME' (this is the default) 'ENQ_TIME,PRIORITY' Once a queue table is created with a specific ordering mechanism, all queues in the queue table inherit the same default ordering. This order cannot be altered once the queue table has been created. If no sort list is specified, all the queues in this queue table will be sorted by the enqueue time in ascending order. This order is equivalent to FIFO (first-in-first-out) order. Even with the default ordering defined, a consumer can override this order by specifying the msgid or correlation value for a specific message. The msgid, correlation, and sequence_deviation take precedence over the default dequeueing order if they are specified. When you create a queue table, the following objects are created:
5.5.1.2.1 ExampleIn the following example, I construct a basic queue table in the current schema with a comment as to when it was created: BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'msg', queue_payload_type => 'message_type', comment => 'General message queue table created on ' || TO_CHAR(SYSDATE,'MON-DD-YYYY HH24:MI:SS')); END; / Notice that I pass the payload type as a string: the name of the object type I defined in the section explaining how to enqueue messages. I can verify the creation of this queue table by querying the USER_QUEUE_TABLES. SQL> SELECT queue_table, user_comment FROM USER_QUEUE_TABLES; QUEUETABLE USER_COMMENT MSG General message queue table created on JUN-08-1997 14:22:01 The following request to create a queue table specifies support for multiple consumers of a single message and also enables message grouping: BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'msg', queue_payload_type => 'message_type', multiple_consumers => TRUE, message_grouping => DBMS_AQADM.TRANSACTIONAL, comment => 'Specialized queue table created on ' || TO_CHAR(SYSDATE,'MON-DD-YYYY HH24:MI:SS')); END; / Notice the extensive use of named notation (the "=>" symbols). This feature of PL/SQL comes in very handy when working with programs that have long lists of parameters, or with programs that are used infrequently. The named notation approach, which explicitly associates a parameter with an argument value, documents more clearly how the program is being used. See the Section 5.7 " section for a more thorough explanation of the message grouping and multiple consumers feature. 5.5.1.2.2 Notes on usageNote the following about using the CREATE_QUEUE_TABLE procedure:
5.5.2 Creating and Starting QueuesOnce a queue table has been created, you can create and then start individual queues in that queue table. 5.5.2.1 The DBMS_AQADM. CREATE_QUEUE procedureCall the CREATE_QUEUE to create a queue in the specified queue table. All queue names must be unique within a schema. Once a queue is created, it can be enabled by calling DBMS_AQADM.START_QUEUE. By default, the queue is created with both enqueue and dequeue disabled. PROCEDURE DBMS_AQADM.CREATE_QUEUE (queue_name IN VARCHAR2, queue_table IN VARCHAR2, queue_type IN BINARY_INTEGER default DBMS_AQADM.NORMAL_QUEUE, max_retries IN NUMBER default 0, retry_delay IN NUMBER default 0, retention_time IN NUMBER default 0, dependency_tracking IN BOOLEAN default FALSE, comment IN VARCHAR2 default NULL, auto_commit IN BOOLEAN default TRUE); Parameters are summarized in the following table.
5.5.2.1.1 ExampleIn the following example, I create a new message queue within the previously created message queue table. I want it to allow for up to ten retries at hourly delays and keep ten days worth of history before deleting processed messages. BEGIN DBMS_AQADM.CREATE_QUEUE( queue_name => 'never_give_up_queue', queue_table => 'msg', max_retries => 10, retry_delay => 3600, retention_time => 10 * 24 * 60 * 60, comment => 'Test Queue Number 1'); END; / 5.5.2.2 The DBMS_AQADM. START_QUEUE procedureIt is not enough to simply create a queue inside a queue table. You must also enable it for enqueuing and/or dequeuing operation, with the START_QUEUE procedure: PROCEDURE DBMS_AQADM.START_QUEUE (queue_name IN VARCHAR2, enqueue IN BOOLEAN DEFAULT TRUE, dequeue IN BOOLEAN DEFAULT TRUE); Parameters are summarized in the following table.
Notice that a value of FALSE for either of the Boolean parameters does not disable a queue for the respective operation. It simply does not change the current status of that operation on the specified queue. To disable queuing or enqueuing on a queue, you must call the DBMS_AQADM.STOP_QUEUE procedure. 5.5.2.2.1 ExampleThe following block starts a queue for enqueuing actions only: BEGIN VP DBMS_AQADM.START_QUEUE ('number_stack', dequeue=>FALSE); END;VPU / You will often want to perform the following steps in sequence:
The following files on the companion disk provide various examples of these steps:
5.5.2.3 The DBMS_AQADM. ALTER_QUEUE procedureThe ALTER_QUEUE procedure of the DBMS_AQADM package modifies an existing message queue. An error is returned if the message queue does not exist. The specification for the procedure follows: PROCEDURE DBMS_AQADM.ALTER_QUEUE queue_name IN VARCHAR2, max_retries IN NUMBER default NULL, retry_delay IN NUMBER default NULL, retention_time IN NUMBER default NULL, auto_commit IN BOOLEAN default TRUE); Parameters are summarized in the following table.
5.5.2.3.1 ExampleIn the following example, I modify the properties of the queue created under CREATE_QUEUE. I now want it to allow for up to 20 retries at hourly delays and to keep 30 days worth of history before deleting processed messages. BEGIN DBMS_AQADM.ALTER_QUEUE( queue_name => 'never_give_up_queue', max_retries => 20, retention_time => 30 * 24 * 60 * 60); END; / I can verify the impact of this call by querying the USER_QUEUES data dictionary view. SQL> SELECT name, max_retries, retention FROM USER_QUEUES; NAME MAX_RETRIES RETENTION ------------------------------ ----------- ---------- AQ$_MSG_E 0 0 MSGQUEUE 0 0 NEVER_GIVE_UP_QUEUE 20 2592000 The first line in the listing is the exception queue for the "msg" queue table. The "msgqueue" queue in the "msg" queue table is a previously defined queue. The third line displays the information for the queue modified by the call to DBMS_AQADM.ALTER_QUEUE. 5.5.3 Managing Queue SubscribersA program can enqueue messages to a specific list of recipients or to the default list of subscribers. A subscriber to a queue is an agent that is registered to dequeue messages from a queue. You can add and remove subscribers, as well as retrieve the current set of subscribers for a queue. These operations will work only with queues that allow multiple consumers (i.e., the multiple_consumers parameter is set to TRUE when you called DBMS_AQADM.CREATE_QUEUE_TABLE). The command takes effect immediately, and the containing transaction is committed. Enqueue requests executed after the completion of this call will reflect the new behavior. Users attempting to modify the subscriber list of a queue must have been granted type access by executing the DBMS_AQADM.GRANT_TYPE_ACCESS procedure. 5.5.3.1 The DBMS_AQADM. ADD_SUBSCRIBER procedureTo add a subscriber to a queue, call the ADD_SUBSCRIBER procedure: PROCEDURE DBMS_AQADM.ADD_SUBSCRIBER (queue_name IN VARCHAR2, subscriber IN SYS.AQ$_AGENT); Parameters are summarized in the following table. 5.5.3.1.1 ExampleHere is an example of adding a subscriber to a queue: BEGIN DBMS_AQADM.ADD_SUBSCRIBER ('msgqueue', SYS.AQ$_AGENT ('multiconsqueue', NULL, NULL)); In this case, I have embedded the call to the object constructor method to convert a name to an agent. You can also perform this task in two steps as follows: DECLARE v_agent SYS.AQ$_AGENT; BEGIN v_agent := SYS.AQ$_AGENT ('Danielle', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER ('multiconsqueue', v_agent); 5.5.3.2 The DBMS_AQADM. REMOVE_SUBSCRIBER procedureTo remove a default subscriber from a queue, call the REMOVE_SUBSCRIBER procedure: PROCEDURE DBMS_AQADM.REMOVE_SUBSCRIBER (queue_name IN VARCHAR2, subscriber IN SYS.AQ$_AGENT); Parameters are summarized in the following table.
5.5.3.2.1 ExampleHere is an example of removing a subscriber from a queue: BEGIN DBMS_AQADM.REMOVE_SUBSCRIBER ('multiconsqueue', SYS.AQ$_AGENT ('CEO', NULL, NULL)); In this case I have embedded the call to the object constructor method to convert a name to an agent. You can also perform this task in two steps as follows: DECLARE v_agent SYS.AQ$_AGENT; BEGIN v_agent := SYS.AQ$_AGENT ('CEO', NULL, NULL); DBMS_AQADM.REMOVE_SUBSCRIBER ('multiconsqueue', v_agent); All references to the subscriber in existing messages are removed as part of the operation. If you try to remove a subscriber that does not exist for this queue, you will receive this error message: ORA-24035: application <subscriber> is not a subscriber for queue <queue> 5.5.3.3 The DBMS_AQADM. QUEUE_SUBSCRIBERS procedureThe QUEUE_SUBSCRIBERS function returns the list of subscribers associated with the specified queue. This list is an index-by table, as shown in the header, FUNCTION DBMS_AQADM.QUEUE_SUBSCRIBERS (queue_name IN VARCHAR2) RETURN DBMS_AQADM.AQ$_SUBSCRIBER_LIST_T; where queue_name is the name of the queue. 5.5.3.3.1 ExampleThe following procedure encapsulates the steps needed to obtain this list and then to display it: /* Filename on companion disk: showsubs.sp */* CREATE OR REPLACE PROCEDURE showsubs (qname IN VARCHAR2) IS sublist DBMS_AQADM.AQ$_SUBSCRIBER_LIST_T; v_row PLS_INTEGER; BEGIN /* Retrieve the list. */ sublist := DBMS_AQADM.QUEUE_SUBSCRIBERS (qname); v_row := sublist.FIRST; LOOP EXIT WHEN v_row IS NULL; DBMS_OUTPUT.PUT_LINE (v_row); DBMS_OUTPUT.PUT_LINE (sublist(v_row).name); v_row := sublist.NEXT (v_row); END LOOP; END; / Now let's put the procedure to use. First of all, you can associate a set of subscribers only with a queue that supports multiple consumers. Here are the steps: /* Filename on companion disk: aqcremq.sql */* BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'multicons', queue_payload_type => 'message_type', multiple_consumers => TRUE); DBMS_AQADM.CREATE_QUEUE (queue_name => 'multiconsqueue', queue_table => 'multicons'); DBMS_AQADM.START_QUEUE (queue_name => 'multiconsqueue'); END; / You can then add subscribers to the multicons queue and display the results: /* Filename on companion disk: showsubs.sql */* DECLARE v_queue VARCHAR2(10) := 'multiconsqueue'; BEGIN DBMS_AQADM.ADD_SUBSCRIBER (v_queue, SYS.AQ$_AGENT ('Danielle', NULL, NULL)); DBMS_AQADM.ADD_SUBSCRIBER (v_queue, SYS.AQ$_AGENT ('Benjamin', NULL, NULL)); DBMS_AQADM.ADD_SUBSCRIBER (v_queue, SYS.AQ$_AGENT ('Masada', NULL, NULL)); DBMS_AQADM.ADD_SUBSCRIBER (v_queue, SYS.AQ$_AGENT ('Timnah', NULL, NULL)); showsubs (v_queue); END; / 5.5.4 Stopping and Dropping QueuesDBMS_AQADM offers two programs to clean up queues: STOP_QUEUE and DROP_QUEUE. The stop program disables activity on the queue. The drop program actually removes that queue from the queue table. 5.5.4.1 The DBMS_AQADM. STOP_QUEUE procedureTo disable enqueuing and/or dequeuing on a particular queue, call the STOP_QUEUE procedure: PROCEDURE DBMS_AQADM.STOP_QUEUE (queue_name IN VARCHAR2, enqueue IN BOOLEAN DEFAULT TRUE, dequeue IN BOOLEAN DEFAULT TRUE, wait IN BOOLEAN DEFAULT TRUE); Parameters are summarized in the following table.
5.5.4.1.1 ExampleThe following example shows the disabling of a queue for enqueuing purposes only. I also request that the program wait until all outstanding transactions are completed. You might take these steps in order to allow consumers to empty the queue, while not allowing any new messages to be placed on the queue. BEGIN DBMS_AQADM.STOP_QUEUE ('msgqueue', enqueue=>TRUE, dequeue=>FALSE, wait=>TRUE); END; You can check the status of your queue by querying the USER_QUEUES data dictionary view: SQL> SELECT name, enqueue, dequeue FROM USER_QUEUES 2 WHERE name = 'MSGQUEUE'; NAME ENQUEUE DEQUEUE MSGQUEUE NO YES 5.5.4.2 The DBMS_AQADM.DROP_QUEUE procedureThe DROP_QUEUE procedure drops an existing message queue. An error is returned if the message queue does not exist. In addition, this operation is not allowed unless DBMS_AQADM.STOP_QUEUE has been called to disable both enqueuing and dequeuing. If the message queue has not been stopped, then DROP_QUEUE returns an error of queue resource (ORA-24023). Here's the header for the procedure: PROCEDURE DBMS_AQADM.DROP_QUEUE (queue_name IN VARCHAR2, auto_commit IN BOOLEAN DEFAULT TRUE); Parameters are summarized in the following table.
5.5.4.3 The DBMS_AQADM.DROP_QUEUE_TABLE procedureOnce you have stopped and dropped all queues in a queue table, you can remove that entire queue table with the DROP_QUEUE_TABLE procedure: PROCEDURE DBMS_AQADM.DROP_QUEUE_TABLE (queue_table IN VARCHAR2, force IN BOOLEAN default FALSE, auto_commit IN BOOLEAN default TRUE); Parameters are summarized in the following table.
5.5.4.3.1 ExampleThe following example forces the dropping of the msg queue table, stopping and dropping all queues along the way. BEGIN DBMS_AQADM.DROP_QUEUE_TABLE ('msg', force => TRUE); END; / 5.5.5 Managing Propagation of MessagesIn order to propagate messages from one queue to another (an Oracle 8.0.4 and later feature), you need to schedule propagation between queues. You can also unschedule propagation of those messages. 5.5.5.1 The DBMS_AQADM.SCHEDULE_PROPAGATION procedureCall the SCHEDULE_PROPAGATION procedure to schedule propagation of messages. The header for this procedure follows: PROCEDURE DBMS_AQADM.SCHEDULE_PROPAGATION (src_queue_name IN VARCHAR2, destination IN VARCHAR2 DEFAULT NULL, start_time IN DATE DEFAULT SYSDATE, duration IN NUMBER DEFAULT NULL, next_time IN VARCHAR2 DEFAULT NULL, latency IN NUMBER DEFAULT 60); Parameters are summarized in the following table.
To summarize that relatively complex set of parameters and their interactions, when you schedule propagation you identify an initial start date/time and a span of time (number of seconds) in which messages are available to be propagated out of the queue to other queues. You can request that this window of time be opened on a regular basis (every day, once a week, every morning at 10 a.m., etc.). Finally, you can specify that the Queue Monitor check no less frequently than every N seconds (latency) during the time the propagation window is open to see if there are messages to propagate. 5.5.5.1.1 ExampleIn this example, I schedule the propagation of a queue to the Boston brokerage office to occur every two hours. The propagation window is five minutes, and during that period of time, I want messages to be flushed out at least every 30 seconds. BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION ('sell_orders', 'broker@boston', SYSDATE, 5 * 60, 'SYSDATE + 2/24', 30); END; / If I do not specify a destination, then propagation occurs to the same database in which the source queue is defined. The following call to DBMS_AQADM.SCHEDULE_PROPAGATION takes all default values (including a local destination database), except that it requests a latency of ten minutes by using named notation: BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION ('share_the_blame', latency => 60 * 10); END; / 5.5.5.2 The DBMS_AQADM.UNSCHEDULE_PROPAGATION procedureYou can stop or unschedule propagation of a queue with the UNSCHEDULE_PROPAGATION procedure: PROCEDURE DBMS_AQADM.UNSCHEDULE_PROPAGATION (src_queue_name IN VARCHAR2, destination IN VARCHAR2 DEFAULT NULL); Parameters are summarized in the following table:
5.5.6 Verifying Queue Types5.5.6.1 The DBMS_AQADM.VERIFY_QUEUE_TYPES procedureThe VERIFY_QUEUE_TYPES procedure allows you to determine whether two different queues have the same payload type. Here's the header: PROCEDURE DBMS_AQADM.VERIFY_QUEUE_TYPES (src_queue_name IN VARCHAR2, dest_queue_name IN VARCHAR2 destination IN VARCHAR2 DEFAULT NULL, rc OUT BINARY_INTEGER); Parameters are summarized in the following table.
Whenever this program is run (either by Oracle AQ itself or by an AQ administrator), it updates the SYS.AQ$_MESSAGE_TYPES table. You can access this table (to verify the success of the type match after propagation has taken place) using the OID (Object ID) of the source queue and the address of the destination queue. 5.5.7 Starting and Stopping the Queue MonitorIf you want to use the delay and expiration features of AQ, you must have the Queue Monitor process running in the background. Before you can do this, you must add an AQ_TM_PROCESS parameter to your database initialization file (see Section 5.2, "Getting Started with Oracle AQ" " at the beginning of this chapter for more information). 5.5.7.1 The DBMS_AQADM. START_TIME_MANAGER procedureTo start the Queue Monitor process, call the START_TIME_MANAGER procedure: PROCEDURE DBMS_AQADM.START_TIME_MANAGER; The operation takes effect when the call completes; there are no transactional dependencies. You can use the START_TIME_MANAGER to restart the Queue Monitor after you stopped it with a call to STOP_TIME_MANAGER. You can also use it to start the Queue Monitor process if the database initialization parameter was set to 0. In other words, you can override the default state of the database with this programmatic call. 5.5.7.2 The DBMS_AQADM. STOP_TIME_MANAGER procedureTo stop the Queue Monitor process, call the STOP_TIME_MANAGER procedure: PROCEDURE DBMS_AQADM.STOP_TIME_MANAGER; The operation takes effect when the call completes; there are no transactional dependencies. The STOP_TIME_MANAGER procedure does not actually stop the physical process running in the background. This process is started when the database is initialized. The procedure simply disables the time management features of Oracle AQ in that database instance. Copyright (c) 2000 O'Reilly & Associates. All rights reserved. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|