Skip to content

feat: migrate to new pamqp #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
dist: xenial
language: python
python:
- 2.7
- 3.4
- 3.5
- 3.6
- 3.7
- 3.8
services:
- docker
install:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
rabbitmq:
image: rabbitmq:3.7
image: rabbitmq:3.8
ports:
- 5672
- 15672
2 changes: 1 addition & 1 deletion docs/api/exceptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rabbitpy contains two types of exceptions, exceptions that are specific to rabbi
self._write_frame(frame_value)
File "rabbitpy/base.py", line 311, in _write_frame
raise exception
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.specification.Channel.Close object at 0x10e86bd50>
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.commands.Channel.Close object at 0x10e86bd50>

In this example, the channel that was created on the second line was closed and RabbitMQ is raising the :class:`AMQPPreconditionFailed <rabbitpy.exceptions.AMQPPreconditionFailed>` exception via RPC sent to your application using the AMQP Channel.Close method.

Expand Down
43 changes: 21 additions & 22 deletions rabbitpy/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
AMQP Adapter

"""
from pamqp import specification as spec

from pamqp import commands
from rabbitpy import base
from rabbitpy import message
from rabbitpy import exceptions
Expand Down Expand Up @@ -35,7 +34,7 @@ def basic_ack(self, delivery_tag=0, multiple=False):
:param bool multiple: Acknowledge multiple messages

"""
self._write_frame(spec.Basic.Ack(delivery_tag, multiple))
self._write_frame(commands.Basic.Ack(delivery_tag, multiple))

def basic_consume(self, queue='', consumer_tag='', no_local=False,
no_ack=False, exclusive=False, nowait=False,
Expand Down Expand Up @@ -70,7 +69,7 @@ def basic_consume(self, queue='', consumer_tag='', no_local=False,
consumer_tag = self.consumer_tag
# pylint: disable=protected-access
self.channel._consumers[consumer_tag] = (self, no_ack)
self._rpc(spec.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
self._rpc(commands.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
exclusive, nowait, arguments))
self._consuming = True
try:
Expand Down Expand Up @@ -119,7 +118,7 @@ def basic_get(self, queue='', no_ack=False):
:param bool no_ack: No acknowledgement needed

"""
self._rpc(spec.Basic.Get(0, queue, no_ack))
self._rpc(commands.Basic.Get(0, queue, no_ack))

def basic_nack(self, delivery_tag=0, multiple=False, requeue=True):
"""Reject one or more incoming messages.
Expand All @@ -137,7 +136,7 @@ def basic_nack(self, delivery_tag=0, multiple=False, requeue=True):
:param bool requeue: Requeue the message

"""
self._write_frame(spec.Basic.Nack(delivery_tag, multiple, requeue))
self._write_frame(commands.Basic.Nack(delivery_tag, multiple, requeue))

def basic_publish(self, exchange='', routing_key='', body='',
properties=None, mandatory=False, immediate=False):
Expand Down Expand Up @@ -178,7 +177,7 @@ def basic_qos(self, prefetch_size=0, prefetch_count=0, global_flag=False):
:param bool global_flag: Apply to entire connection

"""
self._rpc(spec.Basic.Qos(prefetch_size, prefetch_count, global_flag))
self._rpc(commands.Basic.Qos(prefetch_size, prefetch_count, global_flag))

def basic_reject(self, delivery_tag=0, requeue=True):
"""Reject an incoming message
Expand All @@ -192,7 +191,7 @@ def basic_reject(self, delivery_tag=0, requeue=True):
:param bool requeue: Requeue the message

"""
self._write_frame(spec.Basic.Reject(delivery_tag, requeue))
self._write_frame(commands.Basic.Reject(delivery_tag, requeue))

def basic_recover(self, requeue=False):
"""Redeliver unacknowledged messages
Expand All @@ -204,14 +203,14 @@ def basic_recover(self, requeue=False):
:param bool requeue: Requeue the message

"""
self._rpc(spec.Basic.Recover(requeue))
self._rpc(commands.Basic.Recover(requeue))

def confirm_select(self):
"""This method sets the channel to use publisher acknowledgements. The
client can only use this method on a non-transactional channel.

"""
self._rpc(spec.Confirm.Select())
self._rpc(commands.Confirm.Select())

def exchange_declare(self, exchange='', exchange_type='direct',
passive=False, durable=False, auto_delete=False,
Expand All @@ -232,7 +231,7 @@ def exchange_declare(self, exchange='', exchange_type='direct',
:param dict arguments: Arguments for declaration

"""
self._rpc(spec.Exchange.Declare(0, exchange, exchange_type, passive,
self._rpc(commands.Exchange.Declare(0, exchange, exchange_type, passive,
durable, auto_delete, internal, nowait,
arguments))

Expand All @@ -248,7 +247,7 @@ def exchange_delete(self, exchange='', if_unused=False,
:param bool nowait: Do not send a reply method

"""
self._rpc(spec.Exchange.Delete(0, exchange, if_unused, nowait))
self._rpc(commands.Exchange.Delete(0, exchange, if_unused, nowait))

def exchange_bind(self, destination='', source='',
routing_key='', nowait=False, arguments=None):
Expand All @@ -263,7 +262,7 @@ def exchange_bind(self, destination='', source='',
:param dict arguments: Optional arguments

"""
self._rpc(spec.Exchange.Bind(0, destination, source, routing_key,
self._rpc(commands.Exchange.Bind(0, destination, source, routing_key,
nowait, arguments))

def exchange_unbind(self, destination='', source='',
Expand All @@ -279,7 +278,7 @@ def exchange_unbind(self, destination='', source='',
:param dict arguments: Optional arguments

"""
self._rpc(spec.Exchange.Unbind(0, destination, source, routing_key,
self._rpc(commands.Exchange.Unbind(0, destination, source, routing_key,
nowait, arguments))

def queue_bind(self, queue='', exchange='', routing_key='',
Expand All @@ -298,7 +297,7 @@ def queue_bind(self, queue='', exchange='', routing_key='',
:param dict arguments: Arguments for binding

"""
self._rpc(spec.Queue.Bind(0, queue, exchange, routing_key, nowait,
self._rpc(commands.Queue.Bind(0, queue, exchange, routing_key, nowait,
arguments))

def queue_declare(self, queue='', passive=False, durable=False,
Expand All @@ -319,7 +318,7 @@ def queue_declare(self, queue='', passive=False, durable=False,
:param dict arguments: Arguments for declaration

"""
self._rpc(spec.Queue.Declare(0, queue, passive, durable, exclusive,
self._rpc(commands.Queue.Declare(0, queue, passive, durable, exclusive,
auto_delete, nowait, arguments))

def queue_delete(self, queue='', if_unused=False, if_empty=False,
Expand All @@ -336,7 +335,7 @@ def queue_delete(self, queue='', if_unused=False, if_empty=False,
:param bool nowait: Do not send a reply method

"""
self._rpc(spec.Queue.Delete(0, queue, if_unused, if_empty, nowait))
self._rpc(commands.Queue.Delete(0, queue, if_unused, if_empty, nowait))

def queue_purge(self, queue='', nowait=False):
"""Purge a queue
Expand All @@ -348,7 +347,7 @@ def queue_purge(self, queue='', nowait=False):
:param bool nowait: Do not send a reply method

"""
self._rpc(spec.Queue.Purge(0, queue, nowait))
self._rpc(commands.Queue.Purge(0, queue, nowait))

def queue_unbind(self, queue='', exchange='', routing_key='',
arguments=None):
Expand All @@ -362,7 +361,7 @@ def queue_unbind(self, queue='', exchange='', routing_key='',
:param dict arguments: Arguments of binding

"""
self._rpc(spec.Queue.Unbind(0, queue, exchange, routing_key,
self._rpc(commands.Queue.Unbind(0, queue, exchange, routing_key,
arguments))

def tx_select(self):
Expand All @@ -373,7 +372,7 @@ def tx_select(self):
or Rollback methods.

"""
self._rpc(spec.Tx.Select())
self._rpc(commands.Tx.Select())

def tx_commit(self):
"""Commit the current transaction
Expand All @@ -383,7 +382,7 @@ def tx_commit(self):
immediately after a commit.

"""
self._rpc(spec.Tx.Commit())
self._rpc(commands.Tx.Commit())

def tx_rollback(self):
"""Abandon the current transaction
Expand All @@ -395,4 +394,4 @@ def tx_rollback(self):
recover call should be issued.

"""
self._rpc(spec.Tx.Rollback())
self._rpc(commands.Tx.Rollback())
20 changes: 10 additions & 10 deletions rabbitpy/amqp_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import logging
import warnings

from pamqp import specification
from pamqp import commands

from rabbitpy import base
from rabbitpy import exceptions
Expand Down Expand Up @@ -172,12 +172,12 @@ def bind(self, source, routing_key=None, arguments=None):
"""
if hasattr(source, 'name'):
source = source.name
frame = specification.Queue.Bind(queue=self.name,
frame = commands.Queue.Bind(queue=self.name,
exchange=source,
routing_key=routing_key or '',
arguments=arguments)
response = self._rpc(frame)
return isinstance(response, specification.Queue.BindOk)
return isinstance(response, commands.Queue.BindOk)

def consume(self, no_ack=False, prefetch=None, priority=None,
consumer_tag=None):
Expand Down Expand Up @@ -286,7 +286,7 @@ def delete(self, if_unused=False, if_empty=False):
:param bool if_empty: Delete only if empty

"""
self._rpc(specification.Queue.Delete(queue=self.name,
self._rpc(commands.Queue.Delete(queue=self.name,
if_unused=if_unused,
if_empty=if_empty))

Expand All @@ -305,7 +305,7 @@ def get(self, acknowledge=True):
:rtype: :class:`~rabbitpy.Message` or None

"""
self._write_frame(specification.Basic.Get(queue=self.name,
self._write_frame(commands.Basic.Get(queue=self.name,
no_ack=not acknowledge))

return self.channel._get_message() # pylint: disable=protected-access
Expand All @@ -332,7 +332,7 @@ def ha_declare(self, nodes=None):

def purge(self):
"""Purge the queue of all of its messages."""
self._rpc(specification.Queue.Purge())
self._rpc(commands.Queue.Purge())

def stop_consuming(self):
"""Stop consuming messages. This is usually invoked if you want to
Expand Down Expand Up @@ -361,7 +361,7 @@ def unbind(self, source, routing_key=None):
if hasattr(source, 'name'):
source = source.name
routing_key = routing_key or self.name
self._rpc(specification.Queue.Unbind(queue=self.name, exchange=source,
self._rpc(commands.Queue.Unbind(queue=self.name, exchange=source,
routing_key=routing_key))

def _consume(self, no_ack=False, prefetch=None, priority=None):
Expand All @@ -381,12 +381,12 @@ def _consume(self, no_ack=False, prefetch=None, priority=None):
self.consuming = True

def _declare(self, passive=False):
"""Return a specification.Queue.Declare class pre-composed for the rpc
"""Return a commands.Queue.Declare class pre-composed for the rpc
method since this can be called multiple times.

:param bool passive: Passive declare to retrieve message count and
consumer count information
:rtype: pamqp.specification.Queue.Declare
:rtype: pamqp.commands.Queue.Declare

"""
arguments = dict(self.arguments)
Expand All @@ -406,7 +406,7 @@ def _declare(self, passive=False):
'exclusive=%s, auto_delete=%s, arguments=%r',
self.name, self.durable, passive, self.exclusive,
self.auto_delete, arguments)
return specification.Queue.Declare(queue=self.name,
return commands.Queue.Declare(queue=self.name,
durable=self.durable,
passive=passive,
exclusive=self.exclusive,
Expand Down
Loading