Skip to content

Commit 13daa67

Browse files
committed
give rules control over throwing conditional check failures
1 parent d20f4a5 commit 13daa67

File tree

5 files changed

+65
-6
lines changed

5 files changed

+65
-6
lines changed

package-lock.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.0.29",
3+
"version": "1.0.30",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/connectors/dynamodb.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ class Connector {
2727
removeUndefinedValues = true,
2828
timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
2929
retryConfig = defaultRetryConfig,
30+
throwConditionFailure = false,
3031
additionalClientOpts = {},
3132
...opt
3233
}) {
3334
this.debug = (msg) => debug('%j', msg);
3435
this.tableName = tableName || /* istanbul ignore next */ 'undefined';
3536
this.client = Connector.getClient(pipelineId, debug, convertEmptyValues, removeUndefinedValues, timeout, additionalClientOpts);
3637
this.retryConfig = retryConfig;
38+
this.throwConditionFailure = throwConditionFailure;
3739
this.opt = opt;
3840
}
3941

@@ -73,7 +75,7 @@ class Connector {
7375
return this._sendCommand(new UpdateCommand(params), ctx)
7476
.catch((err) => {
7577
/* istanbul ignore else */
76-
if (err.name === 'ConditionalCheckFailedException') {
78+
if (err.name === 'ConditionalCheckFailedException' && !this.throwConditionFailure) {
7779
return {};
7880
}
7981
/* istanbul ignore next */

src/sinks/dynamodb.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,12 @@ export const updateDynamoDB = ({
5959
parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4,
6060
timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
6161
removeUndefinedValues = true,
62+
throwConditionFailure = false,
6263
step = 'save',
6364
...opt
6465
} = {}) => {
6566
const connector = new Connector({
66-
pipelineId, debug, tableName, timeout, removeUndefinedValues, ...opt,
67+
pipelineId, debug, tableName, timeout, removeUndefinedValues, throwConditionFailure, ...opt,
6768
});
6869

6970
const invoke = (uow) => {

test/unit/flavors/update.test.js

+58-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { expect } from 'chai';
33
import sinon from 'sinon';
44

55
import { KmsConnector, MOCK_GEN_DK_RESPONSE } from 'aws-kms-ee';
6+
import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
7+
import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb';
8+
import { mockClient } from 'aws-sdk-client-mock';
69

710
import {
811
initialize, initializeFrom,
@@ -17,16 +20,21 @@ import {
1720
import {
1821
updateExpression, timestampCondition,
1922
} from '../../../src/sinks/dynamodb';
20-
import { DynamoDBConnector } from '../../../src/connectors';
23+
import { DynamoDBConnector, EventBridgeConnector } from '../../../src/connectors';
2124

2225
import { update } from '../../../src/flavors/update';
2326

2427
describe('flavors/update.js', () => {
28+
let mockDdb;
29+
2530
beforeEach(() => {
2631
sinon.stub(DynamoDBConnector.prototype, 'update').resolves({});
2732
});
2833

29-
afterEach(sinon.restore);
34+
afterEach(() => {
35+
sinon.restore();
36+
mockDdb?.restore();
37+
});
3038

3139
it('should execute', (done) => {
3240
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
@@ -157,6 +165,54 @@ describe('flavors/update.js', () => {
157165
})
158166
.done(done);
159167
});
168+
169+
it('should optionally throw conditional check', (done) => {
170+
sinon.restore();
171+
sinon.stub(EventBridgeConnector.prototype, 'putEvents').resolves({});
172+
mockDdb = mockClient(DynamoDBDocumentClient);
173+
mockDdb.on(UpdateCommand).rejects(new ConditionalCheckFailedException({}));
174+
175+
const events = toDynamodbRecords([
176+
{
177+
timestamp: 1572832690,
178+
keys: {
179+
pk: '1',
180+
sk: 'thing',
181+
},
182+
newImage: {
183+
pk: '1',
184+
sk: 'thing',
185+
discriminator: 'thing',
186+
ttl: 1549053422,
187+
timestamp: 1548967022000,
188+
},
189+
},
190+
]);
191+
192+
const rule = {
193+
id: 'updateThrow',
194+
flavor: update,
195+
eventType: /thing-*/,
196+
toUpdateRequest: () => ({}),
197+
throwConditionFailure: true,
198+
};
199+
200+
initialize({
201+
...initializeFrom([
202+
rule,
203+
]),
204+
}, { ...defaultOptions, AES: false })
205+
.assemble(fromDynamodb(events), true)
206+
.collect()
207+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
208+
.tap((collected) => {
209+
expect(collected.length).to.equal(1);
210+
expect(collected[0].event.tags.pipeline).to.equal('updateThrow');
211+
expect(collected[0].event.type).to.equal('fault');
212+
expect(collected[0].event.err.name).to.equal('ConditionalCheckFailedException');
213+
})
214+
.done(done);
215+
});
160216
});
161217

162218
const toUpdateRequest = (uow) => ({

0 commit comments

Comments
 (0)