Skip to content

Commit 2bd2131

Browse files
Added support for array enqueue and dequeue for AQ in thin mode.
1 parent 34280f0 commit 2bd2131

File tree

15 files changed

+701
-47
lines changed

15 files changed

+701
-47
lines changed

doc/src/api_manual/aq.rst

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ used to enqueue and dequeue messages.
2121
Queue Methods
2222
-------------
2323

24-
.. method:: Queue.deqmany(maxMessages)
24+
.. method:: Queue.deqmany(max_num_messages)
2525

2626
Dequeues up to the specified number of messages from the queue and returns
2727
a list of these messages. Each element of the returned list is a
@@ -50,12 +50,14 @@ Queue Methods
5050

5151
.. warning::
5252

53-
Prior to Oracle Database 21c, calling this function in parallel on
54-
different connections acquired from the same pool may fail due to
55-
Oracle bug 29928074. Either ensure that this function is not run in
56-
parallel, use standalone connections or connections from different
57-
pools, or make multiple calls to :meth:`Queue.enqone()` instead. The
58-
function :meth:`Queue.deqmany()` call is not affected.
53+
In python-oracledb Thick mode using Oracle Client libraries prior to
54+
21c, calling :meth:`Queue.enqmany()` in parallel on different
55+
connections acquired from the same connection pool may fail due to
56+
Oracle bug 29928074. To avoid this, do one of: upgrade the client
57+
libraries, ensure that :meth:`Queue.enqmany()` is not run in parallel,
58+
use standalone connections or connections from different pools, or make
59+
multiple calls to :meth:`Queue.enqone()`. The function
60+
:meth:`Queue.deqmany()` call is not affected.
5961

6062
For consistency and compliance with the PEP 8 naming style, the name of
6163
the method was changed from `enqMany()`. The old name will continue

doc/src/api_manual/async_aq.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,25 @@ are used to enqueue and dequeue messages.
2323
AsyncQueue Methods
2424
------------------
2525

26+
.. method:: AsyncQueue.deqmany(max_num_messages)
27+
28+
Dequeues up to the specified number of messages from the queue and returns
29+
a list of these messages. Each element of the returned list is a
30+
:ref:`message property <msgproperties>` object.
31+
2632
.. method:: AsyncQueue.deqone()
2733

2834
Dequeues at most one message from the queue. If a message is dequeued, it
2935
will be a :ref:`message property <asyncmsgproperties>` object; otherwise,
3036
the value *None* will be returned.
3137

38+
.. method:: AsyncQueue.enqmany(messages)
39+
40+
Enqueues multiple messages into the queue. The ``messages`` parameter must
41+
be a sequence containing :ref:`message property <msgproperties>` objects
42+
which have all had their payload attribute set to a value that the queue
43+
supports.
44+
3245
.. method:: AsyncQueue.enqone(message)
3346

3447
Enqueues a single message into the queue. The message must be a

doc/src/release_notes.rst

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ oracledb 3.1.0 (TBD)
1717
Thin Mode Changes
1818
+++++++++++++++++
1919

20-
#) Added :ref:`Oracle Advanced Queuing <aqusermanual>` support for single
21-
enqueue and dequeue of JSON payloads.
22-
#) Added Async :ref:`Oracle Advanced Queuing <asyncaq>` support for single
23-
enqueue and dequeue of RAW and Oracle object payload types.
20+
#) Improved support for :ref:`Oracle Advanced Queuing <aqusermanual>`:
21+
22+
- added support for JSON payloads
23+
- added support for bulk enqueuing and dequeuing
24+
- added support for using AQ with asyncio
25+
2426
#) Improved error message when the cryptography package cannot be imported
2527
(`issue 455 <https://github.com/oracle/python-oracledb/issues/455>`__).
2628
#) Fixed decoding of nested PL/SQL records

doc/src/user_guide/aq.rst

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ types.
2626
- JSON payloads require Oracle Database 21c (or later). In python-oracle Thick
2727
mode, Oracle Client libraries 21c (or later) are also needed.
2828

29-
JMS payloads, array message queuing and dequeuing operations, and
30-
:ref:`Recipient Lists <reciplists>` are only supported in python-oracledb
31-
:ref:`Thick mode <enablingthick>`.
29+
JMS payloads and :ref:`Recipient Lists <reciplists>` are only supported in
30+
python-oracledb :ref:`Thick mode <enablingthick>`.
3231

3332
There are examples of AQ Classic Queues in the `GitHub samples
3433
<https://github.com/oracle/python-oracledb/tree/main/samples>`__ directory.
@@ -343,8 +342,9 @@ message will be dropped from the queue.
343342
Bulk Enqueue and Dequeue
344343
========================
345344

346-
The :meth:`~Queue.enqmany()` and :meth:`~Queue.deqmany()` methods can be used
347-
for efficient bulk message handling.
345+
The :meth:`Queue.enqmany()`, :meth:`Queue.deqmany()`,
346+
:meth:`AsyncQueue.enqmany()`, and :meth:`AsyncQueue.deqmany()` methods can be
347+
used for efficient bulk message handling.
348348

349349
The :meth:`~Queue.enqmany()` method is similar to :meth:`~Queue.enqone()` but
350350
accepts an array of messages:
@@ -362,16 +362,18 @@ accepts an array of messages:
362362
363363
.. warning::
364364

365-
Calling :meth:`~Queue.enqmany()` in parallel on different connections
366-
acquired from the same pool may fail due to Oracle bug 29928074. To avoid
367-
this, ensure that :meth:`~Queue.enqmany()` is not run in parallel, use
368-
standalone connections or connections from different pools, or make
369-
multiple calls to :meth:`~Queue.enqone()` instead. The function
370-
:meth:`~Queue.deqmany()` call is not affected.
371-
372-
To dequeue multiple messages at one time, use :meth:`~Queue.deqmany()`. This
373-
takes an argument specifying the maximum number of messages to dequeue at one
374-
time:
365+
In python-oracledb Thick mode using Oracle Client libraries prior to 21c,
366+
calling :meth:`Queue.enqmany()` in parallel on different connections
367+
acquired from the same connection pool may fail due to Oracle
368+
bug 29928074. To avoid this, do one of: upgrade the client libraries,
369+
ensure that :meth:`Queue.enqmany()` is not run in parallel, use standalone
370+
connections or connections from different pools, or make multiple calls to
371+
:meth:`Queue.enqone()`. The function :meth:`Queue.deqmany()` call is not
372+
affected.
373+
374+
To dequeue multiple messages at one time, use :meth:`Queue.deqmany()` or
375+
:meth:`AsyncQueue.deqmany()`. This takes an argument specifying the maximum
376+
number of messages to dequeue at one time:
375377

376378
.. code-block:: python
377379

samples/bulk_aq.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@
5353
"The twelfth and final message",
5454
]
5555

56-
# this script is currently only supported in python-oracledb thick mode
57-
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
56+
# determine whether to use python-oracledb thin mode or thick mode
57+
if not sample_env.get_is_thin():
58+
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
5859

5960
# connect to database
6061
connection = oracledb.connect(

samples/bulk_aq_async.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# -----------------------------------------------------------------------------
2+
# Copyright (c) 2025, Oracle and/or its affiliates.
3+
#
4+
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
5+
#
6+
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
7+
# Canada. All rights reserved.
8+
#
9+
# This software is dual-licensed to you under the Universal Permissive License
10+
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
11+
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
12+
# either license.
13+
#
14+
# If you elect to accept the software under the Apache License, Version 2.0,
15+
# the following applies:
16+
#
17+
# Licensed under the Apache License, Version 2.0 (the "License");
18+
# you may not use this file except in compliance with the License.
19+
# You may obtain a copy of the License at
20+
#
21+
# https://www.apache.org/licenses/LICENSE-2.0
22+
#
23+
# Unless required by applicable law or agreed to in writing, software
24+
# distributed under the License is distributed on an "AS IS" BASIS,
25+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26+
# See the License for the specific language governing permissions and
27+
# limitations under the License.
28+
# -----------------------------------------------------------------------------
29+
30+
# -----------------------------------------------------------------------------
31+
# bulk_aq_async.py
32+
#
33+
# Demonstrates how to use bulk enqueuing and dequeuing of messages with
34+
# advanced queuing using asyncio. It makes use of a RAW queue created in the
35+
# sample setup.
36+
# -----------------------------------------------------------------------------
37+
38+
import asyncio
39+
40+
import oracledb
41+
import sample_env
42+
43+
QUEUE_NAME = "DEMO_RAW_QUEUE"
44+
PAYLOAD_DATA = [
45+
"The first message",
46+
"The second message",
47+
"The third message",
48+
"The fourth message",
49+
"The fifth message",
50+
"The sixth message",
51+
"The seventh message",
52+
"The eighth message",
53+
"The ninth message",
54+
"The tenth message",
55+
"The eleventh message",
56+
"The twelfth and final message",
57+
]
58+
59+
60+
async def main():
61+
62+
# connect to database
63+
async with oracledb.connect_async(
64+
user=sample_env.get_main_user(),
65+
password=sample_env.get_main_password(),
66+
dsn=sample_env.get_connect_string(),
67+
) as connection:
68+
69+
# create a queue
70+
queue = connection.queue(QUEUE_NAME)
71+
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
72+
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
73+
74+
# dequeue all existing messages to ensure the queue is empty, just so
75+
# that the results are consistent
76+
while await queue.deqone():
77+
pass
78+
79+
# enqueue a few messages
80+
print("Enqueuing messages...")
81+
batch_size = 6
82+
data_to_enqueue = PAYLOAD_DATA
83+
while data_to_enqueue:
84+
batch_data = data_to_enqueue[:batch_size]
85+
data_to_enqueue = data_to_enqueue[batch_size:]
86+
messages = [
87+
connection.msgproperties(payload=d) for d in batch_data
88+
]
89+
for data in batch_data:
90+
print(data)
91+
await queue.enqmany(messages)
92+
await connection.commit()
93+
94+
# dequeue the messages
95+
print("\nDequeuing messages...")
96+
batch_size = 8
97+
while True:
98+
messages = await queue.deqmany(batch_size)
99+
if not messages:
100+
break
101+
for props in messages:
102+
print(props.payload.decode())
103+
await connection.commit()
104+
print("\nDone.")
105+
106+
107+
asyncio.run(main())

src/oracledb/aq.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def enqmany(self, messages: list) -> None:
180180
acquired from the same pool may fail due to Oracle bug 29928074. Ensure
181181
that this function is not run in parallel, use standalone connections
182182
or connections from different pools, or make multiple calls to
183-
enqOne() instead. The function Queue.deqMany() call is not affected.
183+
enqone() instead. The function Queue.deqmany() call is not affected.
184184
"""
185185
for message in messages:
186186
self._verify_message(message)
@@ -211,6 +211,14 @@ def enqOne(self, message: "MessageProperties") -> None:
211211

212212
class AsyncQueue(BaseQueue):
213213

214+
async def deqmany(self, max_num_messages: int) -> list:
215+
"""
216+
Dequeues up to the specified number of messages from the queue and
217+
returns a list of these messages.
218+
"""
219+
message_impls = await self._impl.deq_many(max_num_messages)
220+
return [MessageProperties._from_impl(impl) for impl in message_impls]
221+
214222
async def deqone(self) -> Union["MessageProperties", None]:
215223
"""
216224
Dequeues at most one message from the queue and returns it. If no
@@ -220,6 +228,23 @@ async def deqone(self) -> Union["MessageProperties", None]:
220228
if message_impl is not None:
221229
return MessageProperties._from_impl(message_impl)
222230

231+
async def enqmany(self, messages: list) -> None:
232+
"""
233+
Enqueues multiple messages into the queue. The messages parameter must
234+
be a sequence containing message property objects which have all had
235+
their payload attribute set to a value that the queue supports.
236+
237+
Warning: calling this function in parallel on different connections
238+
acquired from the same pool may fail due to Oracle bug 29928074. Ensure
239+
that this function is not run in parallel, use standalone connections
240+
or connections from different pools, or make multiple calls to
241+
enqone() instead. The function Queue.deqmany() call is not affected.
242+
"""
243+
for message in messages:
244+
self._verify_message(message)
245+
message_impls = [m._impl for m in messages]
246+
await self._impl.enq_many(message_impls)
247+
223248
async def enqone(self, message: "MessageProperties") -> None:
224249
"""
225250
Enqueues a single message into the queue. The message must be a message

src/oracledb/impl/thin/constants.pxi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ cdef enum:
103103
cdef enum:
104104
TNS_AQ_MSG_NO_DELAY = 0
105105
TNS_AQ_MSG_NO_EXPIRATION = -1
106+
TNS_AQ_ARRAY_ENQ = 0x01
107+
TNS_AQ_ARRAY_DEQ = 0x02
108+
TNS_AQ_ARRAY_FLAGS_RETURN_MESSAGE_ID = 0x01
109+
TNS_TTC_ENQ_STREAMING_ENABLED = 0x00000001
110+
TNS_TTC_ENQ_STREAMING_DISABLED = 0x00000000
106111

107112
# AQ flags
108113
cdef enum:
@@ -340,6 +345,7 @@ cdef enum:
340345
TNS_FUNC_LOB_OP = 96
341346
TNS_FUNC_AQ_ENQ = 121
342347
TNS_FUNC_AQ_DEQ = 122
348+
TNS_FUNC_ARRAY_AQ = 145
343349
TNS_FUNC_LOGOFF = 9
344350
TNS_FUNC_PING = 147
345351
TNS_FUNC_PIPELINE_BEGIN = 199

0 commit comments

Comments
 (0)