diff --git a/pkg/queue/rabbitmq/rabbitmq.go b/pkg/queue/rabbitmq/rabbitmq.go index 6429533..6dda3ca 100644 --- a/pkg/queue/rabbitmq/rabbitmq.go +++ b/pkg/queue/rabbitmq/rabbitmq.go @@ -50,76 +50,15 @@ func (c *Consumer) Connect() error { // Consume consumes messages from RabbitMQ func (c *Consumer) Consume(queueName string, handler func(msg []byte) error) error { - dlExchange := "dlx.exchange" - dlQueue := queueName + ".dlx" - - err := c.channel.ExchangeDeclare( - dlExchange, // name of the exchange - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) - if err != nil { - return fmt.Errorf("error declaring the DLX: %w", err) - } - - _, err = c.channel.QueueDeclare( - dlQueue, // name of the queue - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - return fmt.Errorf("error declaring the DLQ: %w", err) - } - - err = c.channel.QueueBind( - dlQueue, // queue name - queueName, // routing key - dlExchange, // exchange - false, - nil, - ) - if err != nil { - return fmt.Errorf("error binding the DLQ: %w", err) - } - - args := amqp.Table{ - "x-dead-letter-exchange": dlExchange, - "x-dead-letter-routing-key": queueName, - } - - _, err = c.channel.QueueDeclare( - queueName, // name of the queue - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - args, // arguments for dead-lettering - ) - if err != nil { - return fmt.Errorf("error declaring the main queue: %w", err) - } - slog.Debug("Starting to consume messages from RabbitMQ", "queueName", queueName) - - return c.startConsuming(queueName, handler) -} - -func (c *Consumer) startConsuming(queueName string, handler func(msg []byte) error) error { msgs, err := c.channel.Consume( queueName, - "", // Consumer tag - Identifier for the consumer - false, // Auto-Acknowledge, set to false for manual ack - false, // Exclusive - false, // No-local - false, // No-wait - nil, // Arguments + "konsume", // Consumer tag - Identifier for the consumer + false, // Auto-Acknowledge, set to false for manual ack + false, // Exclusive + false, // No-local + false, // No-wait + nil, // Arguments ) if err != nil { return fmt.Errorf("error starting to consume: %w", err) @@ -130,9 +69,15 @@ func (c *Consumer) startConsuming(queueName string, handler func(msg []byte) err err = handler(d.Body) if err != nil { slog.Error("Failed to process message sending to dead letter exchange", "message", string(d.Body), "error", err) - d.Nack(false, false) + err = d.Nack(false, false) + if err != nil { + slog.Error("Failed to nack the message", "error", err) + } } else { - d.Ack(false) + err = d.Ack(false) + if err != nil { + slog.Error("Failed to ack the message", "error", err) + } } } }()