Skip to content

Commit

Permalink
[DSM] Fix an issue where RabbitMQ producers when producing a message …
Browse files Browse the repository at this point in the history
…to the default exchange were setting checkpoints that didn't work in DSM (#5150)
  • Loading branch information
ericfirth authored Jan 28, 2025
1 parent 4f22cf7 commit f5bec49
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
10 changes: 9 additions & 1 deletion packages/datadog-plugin-amqplib/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ class AmqplibProducerPlugin extends ProducerPlugin {
if (this.config.dsmEnabled) {
const hasRoutingKey = fields.routingKey != null
const payloadSize = getAmqpMessageSize({ content: message, headers: fields.headers })

// there are two ways to send messages in RabbitMQ:
// 1. using an exchange and a routing key in which DSM connects via the exchange
// 2. using an unnamed exchange and a routing key in which DSM connects via the topic
const exchangeOrTopicTag = hasRoutingKey && !fields.exchange
? `topic:${fields.routingKey}`
: `exchange:${fields.exchange}`

const dataStreamsContext = this.tracer
.setCheckpoint(
['direction:out', `exchange:${fields.exchange}`, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq']
['direction:out', exchangeOrTopicTag, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq']
, span, payloadSize)
DsmPathwayCodec.encode(dataStreamsContext, fields.headers)
}
Expand Down
46 changes: 38 additions & 8 deletions packages/datadog-plugin-amqplib/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,10 @@ describe('Plugin', () => {
describe('when data streams monitoring is enabled', function () {
this.timeout(10000)

const expectedProducerHash = '17191234428405871432'
const expectedConsumerHash = '18277095184718602853'
const expectedProducerHashWithTopic = '16804605750389532869'
const expectedProducerHashWithExchange = '2722596631431228032'

const expectedConsumerHash = '17529824252700998941'

before(() => {
tracer = require('../../dd-trace')
Expand All @@ -322,7 +324,7 @@ describe('Plugin', () => {
return agent.close({ ritmReset: false })
})

it('Should emit DSM stats to the agent when sending a message', done => {
it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 1 dsm stats points
Expand All @@ -336,11 +338,11 @@ describe('Plugin', () => {
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'exchange:',
'has_routing_key:true',
'topic:testDSM',
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHash)).to.equal(true)
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
Expand All @@ -350,6 +352,34 @@ describe('Plugin', () => {
})
})

it('Should emit DSM stats to the agent when sending a message on an named exchange', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'exchange:namedExchange',
'has_routing_key:true',
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHashWithExchange)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => {
if (err) return done(err)

channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test'))
})
})

it('Should emit DSM stats to the agent when receiving a message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
Expand Down Expand Up @@ -390,11 +420,11 @@ describe('Plugin', () => {
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'exchange:',
'has_routing_key:true',
'topic:testDSM',
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHash)).to.equal(true)
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
Expand Down Expand Up @@ -445,7 +475,7 @@ describe('Plugin', () => {
}

expect(produceSpanMeta).to.include({
'pathway.hash': expectedProducerHash
'pathway.hash': expectedProducerHashWithTopic
})
}, { timeoutMs: 10000 }).then(done, done)
})
Expand Down

0 comments on commit f5bec49

Please sign in to comment.