Skip to content

Commit e19b69c

Browse files
committed
feat: migrate to new pamqp
1 parent d97fd2b commit e19b69c

28 files changed

+943
-236
lines changed

.travis.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
dist: xenial
22
language: python
33
python:
4-
- 2.7
5-
- 3.4
6-
- 3.5
74
- 3.6
85
- 3.7
6+
- 3.8
97
services:
108
- docker
119
install:

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
rabbitmq:
2-
image: rabbitmq:3.7
2+
image: rabbitmq:3.8
33
ports:
44
- 5672
55
- 15672

docs/api/exceptions.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ rabbitpy contains two types of exceptions, exceptions that are specific to rabbi
2828
self._write_frame(frame_value)
2929
File "rabbitpy/base.py", line 311, in _write_frame
3030
raise exception
31-
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.specification.Channel.Close object at 0x10e86bd50>
31+
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.commands.Channel.Close object at 0x10e86bd50>
3232
3333
In this example, the channel that was created on the second line was closed and RabbitMQ is raising the :class:`AMQPPreconditionFailed <rabbitpy.exceptions.AMQPPreconditionFailed>` exception via RPC sent to your application using the AMQP Channel.Close method.
3434

rabbitpy/amqp.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
AMQP Adapter
33
44
"""
5-
from pamqp import specification as spec
6-
5+
from pamqp import commands
76
from rabbitpy import base
87
from rabbitpy import message
98
from rabbitpy import exceptions
@@ -35,7 +34,7 @@ def basic_ack(self, delivery_tag=0, multiple=False):
3534
:param bool multiple: Acknowledge multiple messages
3635
3736
"""
38-
self._write_frame(spec.Basic.Ack(delivery_tag, multiple))
37+
self._write_frame(commands.Basic.Ack(delivery_tag, multiple))
3938

4039
def basic_consume(self, queue='', consumer_tag='', no_local=False,
4140
no_ack=False, exclusive=False, nowait=False,
@@ -70,7 +69,7 @@ def basic_consume(self, queue='', consumer_tag='', no_local=False,
7069
consumer_tag = self.consumer_tag
7170
# pylint: disable=protected-access
7271
self.channel._consumers[consumer_tag] = (self, no_ack)
73-
self._rpc(spec.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
72+
self._rpc(commands.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
7473
exclusive, nowait, arguments))
7574
self._consuming = True
7675
try:
@@ -119,7 +118,7 @@ def basic_get(self, queue='', no_ack=False):
119118
:param bool no_ack: No acknowledgement needed
120119
121120
"""
122-
self._rpc(spec.Basic.Get(0, queue, no_ack))
121+
self._rpc(commands.Basic.Get(0, queue, no_ack))
123122

124123
def basic_nack(self, delivery_tag=0, multiple=False, requeue=True):
125124
"""Reject one or more incoming messages.
@@ -137,7 +136,7 @@ def basic_nack(self, delivery_tag=0, multiple=False, requeue=True):
137136
:param bool requeue: Requeue the message
138137
139138
"""
140-
self._write_frame(spec.Basic.Nack(delivery_tag, multiple, requeue))
139+
self._write_frame(commands.Basic.Nack(delivery_tag, multiple, requeue))
141140

142141
def basic_publish(self, exchange='', routing_key='', body='',
143142
properties=None, mandatory=False, immediate=False):
@@ -178,7 +177,7 @@ def basic_qos(self, prefetch_size=0, prefetch_count=0, global_flag=False):
178177
:param bool global_flag: Apply to entire connection
179178
180179
"""
181-
self._rpc(spec.Basic.Qos(prefetch_size, prefetch_count, global_flag))
180+
self._rpc(commands.Basic.Qos(prefetch_size, prefetch_count, global_flag))
182181

183182
def basic_reject(self, delivery_tag=0, requeue=True):
184183
"""Reject an incoming message
@@ -192,7 +191,7 @@ def basic_reject(self, delivery_tag=0, requeue=True):
192191
:param bool requeue: Requeue the message
193192
194193
"""
195-
self._write_frame(spec.Basic.Reject(delivery_tag, requeue))
194+
self._write_frame(commands.Basic.Reject(delivery_tag, requeue))
196195

197196
def basic_recover(self, requeue=False):
198197
"""Redeliver unacknowledged messages
@@ -204,14 +203,14 @@ def basic_recover(self, requeue=False):
204203
:param bool requeue: Requeue the message
205204
206205
"""
207-
self._rpc(spec.Basic.Recover(requeue))
206+
self._rpc(commands.Basic.Recover(requeue))
208207

209208
def confirm_select(self):
210209
"""This method sets the channel to use publisher acknowledgements. The
211210
client can only use this method on a non-transactional channel.
212211
213212
"""
214-
self._rpc(spec.Confirm.Select())
213+
self._rpc(commands.Confirm.Select())
215214

216215
def exchange_declare(self, exchange='', exchange_type='direct',
217216
passive=False, durable=False, auto_delete=False,
@@ -232,7 +231,7 @@ def exchange_declare(self, exchange='', exchange_type='direct',
232231
:param dict arguments: Arguments for declaration
233232
234233
"""
235-
self._rpc(spec.Exchange.Declare(0, exchange, exchange_type, passive,
234+
self._rpc(commands.Exchange.Declare(0, exchange, exchange_type, passive,
236235
durable, auto_delete, internal, nowait,
237236
arguments))
238237

@@ -248,7 +247,7 @@ def exchange_delete(self, exchange='', if_unused=False,
248247
:param bool nowait: Do not send a reply method
249248
250249
"""
251-
self._rpc(spec.Exchange.Delete(0, exchange, if_unused, nowait))
250+
self._rpc(commands.Exchange.Delete(0, exchange, if_unused, nowait))
252251

253252
def exchange_bind(self, destination='', source='',
254253
routing_key='', nowait=False, arguments=None):
@@ -263,7 +262,7 @@ def exchange_bind(self, destination='', source='',
263262
:param dict arguments: Optional arguments
264263
265264
"""
266-
self._rpc(spec.Exchange.Bind(0, destination, source, routing_key,
265+
self._rpc(commands.Exchange.Bind(0, destination, source, routing_key,
267266
nowait, arguments))
268267

269268
def exchange_unbind(self, destination='', source='',
@@ -279,7 +278,7 @@ def exchange_unbind(self, destination='', source='',
279278
:param dict arguments: Optional arguments
280279
281280
"""
282-
self._rpc(spec.Exchange.Unbind(0, destination, source, routing_key,
281+
self._rpc(commands.Exchange.Unbind(0, destination, source, routing_key,
283282
nowait, arguments))
284283

285284
def queue_bind(self, queue='', exchange='', routing_key='',
@@ -298,7 +297,7 @@ def queue_bind(self, queue='', exchange='', routing_key='',
298297
:param dict arguments: Arguments for binding
299298
300299
"""
301-
self._rpc(spec.Queue.Bind(0, queue, exchange, routing_key, nowait,
300+
self._rpc(commands.Queue.Bind(0, queue, exchange, routing_key, nowait,
302301
arguments))
303302

304303
def queue_declare(self, queue='', passive=False, durable=False,
@@ -319,7 +318,7 @@ def queue_declare(self, queue='', passive=False, durable=False,
319318
:param dict arguments: Arguments for declaration
320319
321320
"""
322-
self._rpc(spec.Queue.Declare(0, queue, passive, durable, exclusive,
321+
self._rpc(commands.Queue.Declare(0, queue, passive, durable, exclusive,
323322
auto_delete, nowait, arguments))
324323

325324
def queue_delete(self, queue='', if_unused=False, if_empty=False,
@@ -336,7 +335,7 @@ def queue_delete(self, queue='', if_unused=False, if_empty=False,
336335
:param bool nowait: Do not send a reply method
337336
338337
"""
339-
self._rpc(spec.Queue.Delete(0, queue, if_unused, if_empty, nowait))
338+
self._rpc(commands.Queue.Delete(0, queue, if_unused, if_empty, nowait))
340339

341340
def queue_purge(self, queue='', nowait=False):
342341
"""Purge a queue
@@ -348,7 +347,7 @@ def queue_purge(self, queue='', nowait=False):
348347
:param bool nowait: Do not send a reply method
349348
350349
"""
351-
self._rpc(spec.Queue.Purge(0, queue, nowait))
350+
self._rpc(commands.Queue.Purge(0, queue, nowait))
352351

353352
def queue_unbind(self, queue='', exchange='', routing_key='',
354353
arguments=None):
@@ -362,7 +361,7 @@ def queue_unbind(self, queue='', exchange='', routing_key='',
362361
:param dict arguments: Arguments of binding
363362
364363
"""
365-
self._rpc(spec.Queue.Unbind(0, queue, exchange, routing_key,
364+
self._rpc(commands.Queue.Unbind(0, queue, exchange, routing_key,
366365
arguments))
367366

368367
def tx_select(self):
@@ -373,7 +372,7 @@ def tx_select(self):
373372
or Rollback methods.
374373
375374
"""
376-
self._rpc(spec.Tx.Select())
375+
self._rpc(commands.Tx.Select())
377376

378377
def tx_commit(self):
379378
"""Commit the current transaction
@@ -383,7 +382,7 @@ def tx_commit(self):
383382
immediately after a commit.
384383
385384
"""
386-
self._rpc(spec.Tx.Commit())
385+
self._rpc(commands.Tx.Commit())
387386

388387
def tx_rollback(self):
389388
"""Abandon the current transaction
@@ -395,4 +394,4 @@ def tx_rollback(self):
395394
recover call should be issued.
396395
397396
"""
398-
self._rpc(spec.Tx.Rollback())
397+
self._rpc(commands.Tx.Rollback())

rabbitpy/amqp_queue.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import logging
3030
import warnings
3131

32-
from pamqp import specification
32+
from pamqp import commands
3333

3434
from rabbitpy import base
3535
from rabbitpy import exceptions
@@ -172,12 +172,12 @@ def bind(self, source, routing_key=None, arguments=None):
172172
"""
173173
if hasattr(source, 'name'):
174174
source = source.name
175-
frame = specification.Queue.Bind(queue=self.name,
175+
frame = commands.Queue.Bind(queue=self.name,
176176
exchange=source,
177177
routing_key=routing_key or '',
178178
arguments=arguments)
179179
response = self._rpc(frame)
180-
return isinstance(response, specification.Queue.BindOk)
180+
return isinstance(response, commands.Queue.BindOk)
181181

182182
def consume(self, no_ack=False, prefetch=None, priority=None,
183183
consumer_tag=None):
@@ -286,7 +286,7 @@ def delete(self, if_unused=False, if_empty=False):
286286
:param bool if_empty: Delete only if empty
287287
288288
"""
289-
self._rpc(specification.Queue.Delete(queue=self.name,
289+
self._rpc(commands.Queue.Delete(queue=self.name,
290290
if_unused=if_unused,
291291
if_empty=if_empty))
292292

@@ -305,7 +305,7 @@ def get(self, acknowledge=True):
305305
:rtype: :class:`~rabbitpy.Message` or None
306306
307307
"""
308-
self._write_frame(specification.Basic.Get(queue=self.name,
308+
self._write_frame(commands.Basic.Get(queue=self.name,
309309
no_ack=not acknowledge))
310310

311311
return self.channel._get_message() # pylint: disable=protected-access
@@ -332,7 +332,7 @@ def ha_declare(self, nodes=None):
332332

333333
def purge(self):
334334
"""Purge the queue of all of its messages."""
335-
self._rpc(specification.Queue.Purge())
335+
self._rpc(commands.Queue.Purge())
336336

337337
def stop_consuming(self):
338338
"""Stop consuming messages. This is usually invoked if you want to
@@ -361,7 +361,7 @@ def unbind(self, source, routing_key=None):
361361
if hasattr(source, 'name'):
362362
source = source.name
363363
routing_key = routing_key or self.name
364-
self._rpc(specification.Queue.Unbind(queue=self.name, exchange=source,
364+
self._rpc(commands.Queue.Unbind(queue=self.name, exchange=source,
365365
routing_key=routing_key))
366366

367367
def _consume(self, no_ack=False, prefetch=None, priority=None):
@@ -381,12 +381,12 @@ def _consume(self, no_ack=False, prefetch=None, priority=None):
381381
self.consuming = True
382382

383383
def _declare(self, passive=False):
384-
"""Return a specification.Queue.Declare class pre-composed for the rpc
384+
"""Return a commands.Queue.Declare class pre-composed for the rpc
385385
method since this can be called multiple times.
386386
387387
:param bool passive: Passive declare to retrieve message count and
388388
consumer count information
389-
:rtype: pamqp.specification.Queue.Declare
389+
:rtype: pamqp.commands.Queue.Declare
390390
391391
"""
392392
arguments = dict(self.arguments)
@@ -406,7 +406,7 @@ def _declare(self, passive=False):
406406
'exclusive=%s, auto_delete=%s, arguments=%r',
407407
self.name, self.durable, passive, self.exclusive,
408408
self.auto_delete, arguments)
409-
return specification.Queue.Declare(queue=self.name,
409+
return commands.Queue.Declare(queue=self.name,
410410
durable=self.durable,
411411
passive=passive,
412412
exclusive=self.exclusive,

0 commit comments

Comments
 (0)