Skip to content

Commit abe53f5

Browse files
committed
Add batching util for SQS sink.
2 parents 462dc2f + c7403f6 commit abe53f5

File tree

10 files changed

+504
-10
lines changed

10 files changed

+504
-10
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.3",
3+
"version": "1.1.9",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/connectors/s3.js

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import { Readable } from 'stream';
44
import {
55
CopyObjectCommand,
6-
DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
6+
DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
77
} from '@aws-sdk/client-s3';
88
import { NodeHttpHandler } from '@smithy/node-http-handler';
99
import Promise from 'bluebird';
@@ -47,6 +47,15 @@ class Connector {
4747
return this.clients[pipelineId];
4848
}
4949

50+
headObject(inputParams, ctx) {
51+
const params = {
52+
Bucket: this.bucketName,
53+
...inputParams,
54+
};
55+
56+
return this._sendCommand(new HeadObjectCommand(params), ctx);
57+
}
58+
5059
putObject(inputParams, ctx) {
5160
const params = {
5261
Bucket: this.bucketName,
@@ -75,6 +84,16 @@ class Connector {
7584
.then(async (response) => ({ ...response, Body: await response.Body.transformToString() }));
7685
}
7786

87+
getObjectAsByteArray(inputParams, ctx) {
88+
const params = {
89+
Bucket: this.bucketName,
90+
...inputParams,
91+
};
92+
93+
return this._sendCommand(new GetObjectCommand(params), ctx)
94+
.then(async (response) => ({ ...response, Body: await response.Body.transformToByteArray() }));
95+
}
96+
7897
getObjectStream(inputParams, ctx) {
7998
const params = {
8099
Bucket: this.bucketName,

src/flavors/circuitbreaker.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ const toUpdateRequest = (opt) => faulty((uow) => ({
1515
UUID: process.env.ESM_ID, // Ref: TriggerEventSourceMappingDynamodbEntitiesTable
1616
Enabled: uow.event.alarmData.state.value !== 'ALARM',
1717
BatchSize: uow.event.alarmData.state.value === 'INSUFFICIENT_DATA'
18-
? 1 : process.env.BATCH_SIZE || 100,
18+
? 1 : Number(opt.circuitBreakerMaxBatchSize)
19+
|| Number(process.env.CIRCUIT_BREAKER_MAX_BATCH_SIZE)
20+
|| Number(process.env.BATCH_SIZE)
21+
|| 100,
1922
},
2023
}));

src/queries/s3.js

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,35 @@ export const getObjectFromS3AsStream = ({
8080
.flatMap(getObject);
8181
};
8282

83+
export const getObjectFromS3AsByteArray = ({
84+
id: pipelineId,
85+
debug = d('s3'),
86+
bucketName = process.env.BUCKET_NAME,
87+
getRequestField = 'getRequest',
88+
getResponseField = 'getResponse',
89+
parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8,
90+
step = 'get',
91+
...opt
92+
} = {}) => {
93+
const connector = new Connector({
94+
pipelineId, debug, bucketName, ...opt,
95+
});
96+
97+
const getObject = (uow) => {
98+
if (!uow[getRequestField]) return _(Promise.resolve(uow));
99+
100+
const p = () => connector.getObjectAsByteArray(uow[getRequestField], uow)
101+
.then((getResponse) => ({ ...uow, [getResponseField]: getResponse })) // TODO decompress
102+
.catch(rejectWithFault(uow));
103+
104+
return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
105+
};
106+
107+
return (s) => s
108+
.map(getObject)
109+
.parallel(parallel);
110+
};
111+
83112
export const splitS3Object = ({
84113
delimiter = '\n',
85114
getResponseField = 'getResponse',
@@ -193,3 +222,32 @@ export const pageObjectsFromS3 = ({
193222
.map(listObjects)
194223
.parallel(parallel);
195224
};
225+
226+
export const headS3Object = ({
227+
id: pipelineId,
228+
debug = d('s3'),
229+
bucketName = process.env.BUCKET_NAME,
230+
headRequestField = 'headRequest',
231+
headResponseField = 'headResponse',
232+
parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8,
233+
step = 'get',
234+
...opt
235+
} = {}) => {
236+
const connector = new Connector({
237+
pipelineId, debug, bucketName, ...opt,
238+
});
239+
240+
const headObject = (uow) => {
241+
if (!uow[headRequestField]) return _(Promise.resolve(uow));
242+
243+
const p = () => connector.headObject(uow[headRequestField], uow)
244+
.then((headResponse) => ({ ...uow, [headResponseField]: headResponse }))
245+
.catch(rejectWithFault(uow));
246+
247+
return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
248+
};
249+
250+
return (s) => s
251+
.map(headObject)
252+
.parallel(parallel);
253+
};

src/utils/encryption.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ export const encryptEvent = ({
128128
return _(Promise.resolve(uow));
129129
}
130130

131+
const eemMetadata = typeof eem === 'function' ? eem(uow[sourceField], uow) : eem;
131132
const p = encryptObject(uow[sourceField], {
132133
masterKeyAlias,
133134
regions,
134-
...eem, // fields and overrides
135+
...eemMetadata, // fields and overrides
135136
AES,
136137
})
137138
// .tap(debug)
@@ -165,11 +166,12 @@ export const encryptData = ({
165166
masterKeyAlias = process.env.MASTER_KEY_ALIAS,
166167
regions = (process.env.KMS_REGIONS && process.env.KMS_REGIONS.split(',')),
167168
AES = true,
168-
} = {}) => async (data) => {
169+
} = {}) => async (data, ctx = {}) => {
170+
const eemMetadata = typeof eem === 'function' ? eem(data, ctx) : eem;
169171
const result = await encryptObject(data, {
170172
masterKeyAlias,
171173
regions,
172-
...eem, // fields and overrides
174+
...eemMetadata, // fields and overrides
173175
AES,
174176
})
175177
// .tap(debug)

test/unit/connectors/s3.test.js

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Readable } from 'stream';
66
import { mockClient } from 'aws-sdk-client-mock';
77
import {
88
CopyObjectCommand,
9-
DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
9+
DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
1010
} from '@aws-sdk/client-s3';
1111
import { sdkStreamMixin } from '@smithy/util-stream';
1212

@@ -95,6 +95,26 @@ describe('connectors/s3.js', () => {
9595
});
9696
expect(data).to.deep.equal({ Body: 'b' });
9797
});
98+
it('should get object as byte array', async () => {
99+
const arr = new Uint8Array([104, 101, 108, 108, 111]);
100+
const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from('hello')) }));
101+
mockS3.on(GetObjectCommand).callsFake(spy);
102+
103+
const inputParams = {
104+
Key: 'k1',
105+
};
106+
107+
const data = await new Connector({
108+
debug: debug('s3'),
109+
bucketName: 'b1',
110+
}).getObjectAsByteArray(inputParams);
111+
112+
expect(spy).to.have.been.calledWith({
113+
Bucket: 'b1',
114+
Key: 'k1',
115+
});
116+
expect(data).to.deep.equal({ Body: arr });
117+
});
98118

99119
it('should get object as stream', (done) => {
100120
const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from(Buffer.from('data'))) }));
@@ -199,4 +219,20 @@ describe('connectors/s3.js', () => {
199219
});
200220
expect(data).to.deep.equal({});
201221
});
222+
it('should head object', async () => {
223+
const spy = sinon.spy(() => ({}));
224+
mockS3.on(HeadObjectCommand).callsFake(spy);
225+
const inputParams = {
226+
Key: 'k1',
227+
};
228+
const data = await new Connector({
229+
debug: debug('s3'),
230+
bucketName: 'b1',
231+
}).headObject(inputParams);
232+
expect(spy).to.have.been.calledWith({
233+
Key: 'k1',
234+
Bucket: 'b1',
235+
});
236+
expect(data).to.deep.equal({});
237+
});
202238
});

test/unit/flavors/circuitbreaker.test.js

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import 'mocha';
22
import { expect } from 'chai';
33
import sinon from 'sinon';
44

5-
import { initialize } from '../../../src';
5+
import { initialize, initializeFrom } from '../../../src';
66

77
import { fromAlarm } from '../../../src/from/cw';
88

@@ -125,4 +125,85 @@ describe('flavors/circuitbreaker.js', () => {
125125
})
126126
.done(done);
127127
});
128+
129+
it('should enable event source mapping with batch size from opt)', (done) => {
130+
const rules = [
131+
{
132+
id: 'circuitBreaker',
133+
flavor: circuitBreaker,
134+
circuitBreakerMaxBatchSize: '10',
135+
},
136+
];
137+
const event = {
138+
source: 'aws.cloudwatch',
139+
alarmArn:
140+
'arn:aws:cloudwatch:us-east-1:444455556666:alarm:lambda-demo-metric-alarm',
141+
accountId: '444455556666',
142+
time: '2023-08-04T12:36:15.490+0000',
143+
region: 'us-east-1',
144+
alarmData: {
145+
alarmName: 'lambda-demo-metric-alarm',
146+
state: {
147+
value: 'OK',
148+
},
149+
previousState: {
150+
value: 'INSUFFICIENT_DATA',
151+
},
152+
},
153+
};
154+
155+
initialize({
156+
...initializeFrom(rules),
157+
})
158+
.assemble(fromAlarm(event), false)
159+
.collect()
160+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
161+
.tap((collected) => {
162+
expect(collected.length).to.equal(1);
163+
expect(collected[0].pipeline).to.equal('circuitBreaker');
164+
expect(collected[0].updateRequest).to.deep.equal({
165+
UUID: 'a092f90d-9948-4964-95b5-32c46093f734',
166+
Enabled: true,
167+
BatchSize: 10,
168+
});
169+
})
170+
.done(done);
171+
});
172+
it('should enable event source mapping and force Number type for env var of CIRCUIT_BREAKER_MAX_BATCH_SIZE', (done) => {
173+
process.env.CIRCUIT_BREAKER_MAX_BATCH_SIZE = '10';
174+
const event = {
175+
source: 'aws.cloudwatch',
176+
alarmArn:
177+
'arn:aws:cloudwatch:us-east-1:444455556666:alarm:lambda-demo-metric-alarm',
178+
accountId: '444455556666',
179+
time: '2023-08-04T12:36:15.490+0000',
180+
region: 'us-east-1',
181+
alarmData: {
182+
alarmName: 'lambda-demo-metric-alarm',
183+
state: {
184+
value: 'OK',
185+
},
186+
previousState: {
187+
value: 'INSUFFICIENT_DATA',
188+
},
189+
},
190+
};
191+
192+
initialize({
193+
circuitBreaker,
194+
})
195+
.assemble(fromAlarm(event), false)
196+
.collect()
197+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
198+
.tap((collected) => {
199+
expect(collected.length).to.equal(1);
200+
expect(collected[0].pipeline).to.equal('circuitBreaker');
201+
expect(collected[0].updateRequest).to.deep.equal({
202+
UUID: 'a092f90d-9948-4964-95b5-32c46093f734',
203+
Enabled: true,
204+
BatchSize: 10,
205+
});
206+
})
207+
.done(done);
208+
});
128209
});

0 commit comments

Comments
 (0)