Skip to content

Commit

Permalink
feat: remove dead letter queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bugrakocabay committed Jun 16, 2024
1 parent 64026d1 commit da419b0
Showing 1 changed file with 14 additions and 69 deletions.
83 changes: 14 additions & 69 deletions pkg/queue/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,76 +50,15 @@ func (c *Consumer) Connect() error {

// Consume consumes messages from RabbitMQ
func (c *Consumer) Consume(queueName string, handler func(msg []byte) error) error {
dlExchange := "dlx.exchange"
dlQueue := queueName + ".dlx"

err := c.channel.ExchangeDeclare(
dlExchange, // name of the exchange
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("error declaring the DLX: %w", err)
}

_, err = c.channel.QueueDeclare(
dlQueue, // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("error declaring the DLQ: %w", err)
}

err = c.channel.QueueBind(
dlQueue, // queue name
queueName, // routing key
dlExchange, // exchange
false,
nil,
)
if err != nil {
return fmt.Errorf("error binding the DLQ: %w", err)
}

args := amqp.Table{
"x-dead-letter-exchange": dlExchange,
"x-dead-letter-routing-key": queueName,
}

_, err = c.channel.QueueDeclare(
queueName, // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments for dead-lettering
)
if err != nil {
return fmt.Errorf("error declaring the main queue: %w", err)
}

slog.Debug("Starting to consume messages from RabbitMQ", "queueName", queueName)

return c.startConsuming(queueName, handler)
}

func (c *Consumer) startConsuming(queueName string, handler func(msg []byte) error) error {
msgs, err := c.channel.Consume(
queueName,
"", // Consumer tag - Identifier for the consumer
false, // Auto-Acknowledge, set to false for manual ack
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Arguments
"konsume", // Consumer tag - Identifier for the consumer
false, // Auto-Acknowledge, set to false for manual ack
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("error starting to consume: %w", err)
Expand All @@ -130,9 +69,15 @@ func (c *Consumer) startConsuming(queueName string, handler func(msg []byte) err
err = handler(d.Body)
if err != nil {
slog.Error("Failed to process message sending to dead letter exchange", "message", string(d.Body), "error", err)
d.Nack(false, false)
err = d.Nack(false, false)
if err != nil {
slog.Error("Failed to nack the message", "error", err)
}
} else {
d.Ack(false)
err = d.Ack(false)
if err != nil {
slog.Error("Failed to ack the message", "error", err)
}
}
}
}()
Expand Down

0 comments on commit da419b0

Please sign in to comment.