diff --git a/dramatiq/brokers/rabbitmq.py b/dramatiq/brokers/rabbitmq.py index 032b8fcc..487d07fc 100644 --- a/dramatiq/brokers/rabbitmq.py +++ b/dramatiq/brokers/rabbitmq.py @@ -24,7 +24,6 @@ from threading import Event, local import pika -from pika.exceptions import ChannelClosedByBroker from ..broker import Broker, Consumer, MessageProxy from ..common import current_millis, dq_name, q_name, xq_name @@ -309,7 +308,8 @@ def enqueue(self, message, *, delay=None): ConnectionClosed: If the underlying channel or connection has been closed. """ - queue_name = message.queue_name + canonical_queue_name = message.queue_name + queue_name = canonical_queue_name if delay is not None: queue_name = dq_name(queue_name) @@ -324,7 +324,7 @@ def enqueue(self, message, *, delay=None): attempts = 1 while True: try: - self.declare_queue(queue_name, ensure=True) + self.declare_queue(canonical_queue_name, ensure=True) self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) self.emit_before("enqueue", message, delay) self.channel.basic_publish( @@ -345,13 +345,6 @@ def enqueue(self, message, *, delay=None): # next caller/attempt may initiate new ones of each. del self.connection - # When a queue has been deleted, attempt to get it - # redeclared by removing it from the known set. The next - # time a message is enqueued -- which could be when we - # retry this block -- it will be redeclared. - if isinstance(e, ChannelClosedByBroker) and e.reply_code == 404: - self.queues.remove(q_name(queue_name)) - attempts += 1 if attempts > MAX_ENQUEUE_ATTEMPTS: raise ConnectionClosed(e) from None diff --git a/tests/test_rabbitmq.py b/tests/test_rabbitmq.py index f12af772..20539550 100644 --- a/tests/test_rabbitmq.py +++ b/tests/test_rabbitmq.py @@ -1,6 +1,4 @@ import os -import random -import string import time from threading import Event from unittest.mock import Mock, patch