Skip to content

feat(sns-client): add support for batch publish. #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 142 additions & 83 deletions packages/lambda-powertools-sns-client/__tests__/index.js
Original file line number Diff line number Diff line change
@@ -1,110 +1,169 @@
const AWS = require('aws-sdk')
const AWS = require("aws-sdk");

global.console.log = jest.fn()
global.console.log = jest.fn();

const mockPublish = jest.fn()
AWS.SNS.prototype.publish = mockPublish
const mockPublish = jest.fn();
const mockPublishBatch = jest.fn();
AWS.SNS.prototype.publish = mockPublish;
AWS.SNS.prototype.publishBatch = mockPublishBatch;

const SNS = require('../index')
const CorrelationIds = require('@dazn/lambda-powertools-correlation-ids')
const SNS = require("../index");
const CorrelationIds = require("@dazn/lambda-powertools-correlation-ids");

beforeEach(() => {
mockPublish.mockReturnValueOnce({
promise: async () => Promise.resolve()
})
})
promise: async () => Promise.resolve(),
});
mockPublishBatch.mockReturnValueOnce({
promise: async () => Promise.resolve(),
});
});

afterEach(() => {
mockPublish.mockClear()
CorrelationIds.clearAll()
})

describe('SNS client', () => {
describe('.publish', () => {
describe('when there are no correlationIds', () => {
it('sends empty MessageAttributes', async () => {
mockPublish.mockClear();
mockPublishBatch.mockClear();
CorrelationIds.clearAll();
});

describe("SNS client", () => {
describe(".publish", () => {
describe("when there are no correlationIds", () => {
it("sends empty MessageAttributes", async () => {
const params = {
Message: 'test',
TopicArn: 'topic-arn'
}
await SNS.publish(params).promise()
Message: "test",
TopicArn: "topic-arn",
};
await SNS.publish(params).promise();

expect(mockPublish).toBeCalledWith({
Message: 'test',
TopicArn: 'topic-arn',
MessageAttributes: {}
})
})
})

describe('when there are global correlationIds', () => {
it('forwards them in MessageAttributes', async () => {
Message: "test",
TopicArn: "topic-arn",
MessageAttributes: {},
});
});
});

describe("when there are global correlationIds", () => {
it("forwards them in MessageAttributes", async () => {
CorrelationIds.replaceAllWith({
'x-correlation-id': 'id',
'debug-log-enabled': 'true',
'call-chain-length': 1
})
"x-correlation-id": "id",
"debug-log-enabled": "true",
"call-chain-length": 1,
});

const params = {
Message: 'test',
TopicArn: 'topic-arn'
}
await SNS.publish(params).promise()
Message: "test",
TopicArn: "topic-arn",
};
await SNS.publish(params).promise();

expect(mockPublish).toBeCalledWith({
Message: 'test',
TopicArn: 'topic-arn',
Message: "test",
TopicArn: "topic-arn",
MessageAttributes: {
'x-correlation-id': {
DataType: 'String',
StringValue: 'id'
"x-correlation-id": {
DataType: "String",
StringValue: "id",
},
'debug-log-enabled': {
DataType: 'String',
StringValue: 'true'
"debug-log-enabled": {
DataType: "String",
StringValue: "true",
},
'call-chain-length': {
DataType: 'String',
StringValue: '1'
}
}
})
})
})
})

describe('.publishWithCorrelationIds', () => {
it('forwards given correlationIds in MessageAttributes field', async () => {
"call-chain-length": {
DataType: "String",
StringValue: "1",
},
},
});
});
});
});

describe(".publishWithCorrelationIds", () => {
it("forwards given correlationIds in MessageAttributes field", async () => {
const correlationIds = new CorrelationIds({
'x-correlation-id': 'child-id',
'debug-log-enabled': 'true',
'call-chain-length': 1
})
"x-correlation-id": "child-id",
"debug-log-enabled": "true",
"call-chain-length": 1,
});

const params = {
Message: 'test',
TopicArn: 'topic-arn'
}
await SNS.publishWithCorrelationIds(correlationIds, params).promise()
Message: "test",
TopicArn: "topic-arn",
};
await SNS.publishWithCorrelationIds(correlationIds, params).promise();

expect(mockPublish).toBeCalledWith({
Message: 'test',
TopicArn: 'topic-arn',
Message: "test",
TopicArn: "topic-arn",
MessageAttributes: {
'x-correlation-id': {
DataType: 'String',
StringValue: 'child-id'
"x-correlation-id": {
DataType: "String",
StringValue: "child-id",
},
"debug-log-enabled": {
DataType: "String",
StringValue: "true",
},
'debug-log-enabled': {
DataType: 'String',
StringValue: 'true'
"call-chain-length": {
DataType: "String",
StringValue: "1",
},
'call-chain-length': {
DataType: 'String',
StringValue: '1'
}
}
})
})
})
})
},
});
});
});

describe(".publishBatch", () => {
describe("when there are no correlationIds", () => {
it("sends empty MessageAttributes", async () => {
const params = {
PublishBatchRequestEntries: [{
Message: "test",
Id: "1"
}],
TopicArn: "topic-arn",
};
await SNS.publishBatch(params).promise();

expect(mockPublishBatch).toBeCalledWith({
PublishBatchRequestEntries: [{
Message: "test",
Id: "1",
MessageAttributes: {}
}],
TopicArn: "topic-arn",
});
});
});

describe("when there are global correlationIds", () => {
it("forwards them in MessageAttributes", async () => {
CorrelationIds.replaceAllWith({
"x-correlation-id": "id",
"debug-log-enabled": "true",
"call-chain-length": 1,
});

const params = {
PublishBatchRequestEntries: [{
Message: "test",
Id: "1"
}],
TopicArn: "topic-arn",
};
await SNS.publishBatch(params).promise();

expect(mockPublishBatch).toBeCalledWith({
PublishBatchRequestEntries:
[{
Id: "1",
Message: "test",
MessageAttributes: {"call-chain-length": {"DataType": "String", "StringValue": "1"}, "debug-log-enabled": {"DataType": "String", "StringValue": "true"}, "x-correlation-id": {"DataType": "String", "StringValue": "id"}}
}],
TopicArn: "topic-arn"
});
});
});
});
});
7 changes: 6 additions & 1 deletion packages/lambda-powertools-sns-client/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,10 @@ declare const SNS: awsSNS & {
params: awsSNS.Types.PublishInput,
callback?: (err: AWSError, data: awsSNS.Types.PublishResponse) => void
): Request<awsSNS.Types.PublishResponse, AWSError>;
};
publishBatchWithCorrelationIds(
correlationId: CorrelationIds,
params: awsSNS.Types.PublishBatchInput,
callback?: (err: AWSError, data: awsSNS.Types.PublishBatchResponse) => void
): Request<awsSNS.Types.PublishBatchResponse, AWSError>;
}
export default SNS;
69 changes: 48 additions & 21 deletions packages/lambda-powertools-sns-client/index.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,64 @@
process.env.AWS_NODEJS_CONNECTION_REUSE_ENABLED = '1'
const SNS = require('aws-sdk/clients/sns')
const client = new SNS()
const CorrelationIds = require('@dazn/lambda-powertools-correlation-ids')

function addCorrelationIds (correlationIds, messageAttributes) {
const attributes = {}
const ids = correlationIds.get()
process.env.AWS_NODEJS_CONNECTION_REUSE_ENABLED = "1";
const SNS = require("aws-sdk/clients/sns");
const client = new SNS();
const CorrelationIds = require("@dazn/lambda-powertools-correlation-ids");

function addCorrelationIds(correlationIds, messageAttributes) {
const attributes = {};
const ids = correlationIds.get();
for (const key in ids) {
attributes[key] = {
DataType: 'String',
StringValue: `${ids[key]}`
}
DataType: "String",
StringValue: `${ids[key]}`,
};
}

// use `attributes` as base so if the user's message attributes would override
// our correlation IDs
return Object.assign(attributes, messageAttributes || {})
return Object.assign(attributes, messageAttributes || {});
}

client._publish = client.publish
client._publish = client.publish;

client.publish = (...args) => {
return client.publishWithCorrelationIds(CorrelationIds, ...args)
}
return client.publishWithCorrelationIds(CorrelationIds, ...args);
};

client.publishWithCorrelationIds = (correlationIds, params, ...args) => {
const newMessageAttributes = addCorrelationIds(correlationIds, params.MessageAttributes)
const newMessageAttributes = addCorrelationIds(
correlationIds,
params.MessageAttributes
);
const extendedParams = {
...params,
MessageAttributes: newMessageAttributes
}
MessageAttributes: newMessageAttributes,
};

return client._publish(extendedParams, ...args)
}
return client._publish(extendedParams, ...args);
};

client._publishBatch = client.publishBatch;

client.publishBatch = (...args) => {
return client.publishBatchWithCorrelationIds(CorrelationIds, ...args);
};

client.publishBatchWithCorrelationIds = (correlationIds, params, ...args) => {
const extendedBatchEntries = params.PublishBatchRequestEntries.map(entry => {
const newMessageAttributes = addCorrelationIds(
correlationIds,
entry.MessageAttributes
);
return {
...entry,
MessageAttributes: newMessageAttributes
}
});
const extendedParams = {
...params,
PublishBatchRequestEntries: extendedBatchEntries
}
return client._publishBatch(extendedParams, ...args);
};

module.exports = client
module.exports = client;
2 changes: 1 addition & 1 deletion packages/lambda-powertools-sns-client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/lambda-powertools-sns-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dazn/lambda-powertools-sns-client",
"version": "1.28.1",
"version": "1.28.2",
"description": "SNS client wrapper that knows how to forward correlation IDs (captured via @dazn/lambda-powertools-correlation-ids)",
"main": "index.js",
"types": "index.d.ts",
Expand Down