Skip to content

Commit 05f5a08

Browse files
author
John Gilbert
committed
add-s3-cdc
1 parent c7403f6 commit 05f5a08

File tree

6 files changed

+316
-6
lines changed

6 files changed

+316
-6
lines changed

package-lock.json

Lines changed: 119 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.8",
3+
"version": "1.1.9",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",
@@ -71,6 +71,7 @@
7171
"@aws-sdk/client-sts": "^3.450.0",
7272
"@aws-sdk/client-timestream-write": "^3.450.0",
7373
"@aws-sdk/lib-dynamodb": "^3.450.0",
74+
"@aws-sdk/s3-request-presigner": "3.490.0",
7475
"@aws-sdk/signature-v4-crt": "^3.450.0",
7576
"@aws-sdk/util-dynamodb": "^3.450.0",
7677
"@babel/cli": "^7.10.0",

src/connectors/s3.js

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33
import { Readable } from 'stream';
44
import {
55
CopyObjectCommand,
6-
DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
6+
DeleteObjectCommand,
7+
GetObjectCommand,
8+
HeadObjectCommand,
9+
ListObjectVersionsCommand,
10+
ListObjectsV2Command,
11+
PutObjectCommand,
12+
S3Client,
713
} from '@aws-sdk/client-s3';
814
import { NodeHttpHandler } from '@smithy/node-http-handler';
915
import Promise from 'bluebird';
@@ -104,6 +110,23 @@ class Connector {
104110
.then((response) => Readable.from(response.Body));
105111
}
106112

113+
getSignedUrl(operation, Key, other = {}) {
114+
const params = {
115+
Bucket: this.bucketName,
116+
Key,
117+
...other,
118+
};
119+
120+
return import('@aws-sdk/s3-request-presigner')
121+
.then(({ getSignedUrl }) => Promise.resolve(getSignedUrl(this.client,
122+
operation === 'putObject'
123+
? new PutObjectCommand(params)
124+
: new GetObjectCommand(params),
125+
other))
126+
.tap(this.debug)
127+
.tapCatch(this.debug));
128+
}
129+
107130
listObjects(inputParams, ctx) {
108131
const params = {
109132
Bucket: this.bucketName,
@@ -113,6 +136,30 @@ class Connector {
113136
return this._sendCommand(new ListObjectsV2Command(params), ctx);
114137
}
115138

139+
listObjectVersions({
140+
last, limit, Bucket, Prefix,
141+
}, ctx) {
142+
const params = {
143+
Bucket: Bucket || this.bucketName,
144+
Prefix,
145+
MaxKeys: limit,
146+
...(last
147+
? /* istanbul ignore next */ JSON.parse(Buffer.from(last, 'base64').toString())
148+
: {}),
149+
};
150+
151+
return this._sendCommand(new ListObjectVersionsCommand(params), ctx)
152+
.then((data) => ({
153+
last: data.IsTruncated
154+
? /* istanbul ignore next */ Buffer.from(JSON.stringify({
155+
KeyMarker: data.NextKeyMarker,
156+
VersionIdMarker: data.NextVersionIdMarker,
157+
})).toString('base64')
158+
: undefined,
159+
data,
160+
}));
161+
}
162+
116163
copyObject(inputParams, ctx) {
117164
const params = {
118165
Bucket: this.bucketName,

src/from/s3.js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,32 @@ export const fromS3 = (event) =>
1515
record,
1616
}));
1717

18+
export const fromSqsS3 = (event) =>
19+
_(event.Records)
20+
// sqs
21+
.map((record) =>
22+
// create a unit-of-work for each message
23+
// so we can correlate related work for error handling
24+
({
25+
record: {
26+
sqs: record,
27+
},
28+
}))
29+
// s3
30+
.map((uow) => ({
31+
record: {
32+
...uow.record,
33+
s3: JSON.parse(uow.record.sqs.body),
34+
},
35+
}))
36+
.flatMap((uow) => fromS3(uow.record.s3)
37+
.map((uow2) => ({
38+
record: {
39+
...uow.record,
40+
s3: uow2.record,
41+
},
42+
})));
43+
1844
export const fromSqsSnsS3 = (event) =>
1945
_(event.Records)
2046
// sqs
@@ -109,6 +135,16 @@ export const toS3Records = (notifications) => ({
109135
})),
110136
});
111137

138+
export const toSqsS3Records = (notifications) => ({
139+
Records: ([{
140+
body: JSON.stringify({
141+
Records: notifications.map((s3) => ({
142+
s3,
143+
})),
144+
}),
145+
}]),
146+
});
147+
112148
export const toSqsSnsS3Records = (notifications) => ({
113149
Records: ([{
114150
body: JSON.stringify({

test/unit/connectors/s3.test.js

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,16 @@ import { Readable } from 'stream';
66
import { mockClient } from 'aws-sdk-client-mock';
77
import {
88
CopyObjectCommand,
9-
DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client,
9+
DeleteObjectCommand,
10+
GetObjectCommand,
11+
HeadObjectCommand,
12+
ListObjectsV2Command,
13+
ListObjectVersionsCommand,
14+
PutObjectCommand,
15+
S3Client,
1016
} from '@aws-sdk/client-s3';
1117
import { sdkStreamMixin } from '@smithy/util-stream';
18+
import * as s3RequestPresigner from '@aws-sdk/s3-request-presigner/dist-cjs/getSignedUrl';
1219

1320
import { v4 } from 'uuid';
1421
import Connector from '../../../src/connectors/s3';
@@ -27,6 +34,55 @@ describe('connectors/s3.js', () => {
2734
sinon.restore();
2835
});
2936

37+
it('should get a signed url', async () => {
38+
// const spy = sinon.spy(() => 'https://123/456');
39+
// mockS3.on(PutObjectCommand).callsFake(spy);
40+
const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456');
41+
42+
const data = await new Connector({ debug: debug('s3') })
43+
.getSignedUrl('putObject', '1/2');
44+
expect(spy).to.have.been.calledOnce;
45+
// expect(spy).to.have.been.calledWith('putObject', {
46+
// Bucket: 'b1',
47+
// Key: '1/2',
48+
// });
49+
expect(data).to.equal('https://123/456');
50+
});
51+
52+
it('should get a signed url for putObject', async () => {
53+
// const spy = sinon.spy(() => 'https://123/456');
54+
// mockS3.on(PutObjectCommand).callsFake(spy);
55+
const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456');
56+
57+
const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' })
58+
.getSignedUrl('putObject', '1/2');
59+
60+
expect(spy).to.have.been.calledOnce;
61+
// expect(spy).to.have.been.calledWith('putObject', {
62+
// Bucket: 'b1',
63+
// Key: '1/2',
64+
// // ContentType: 'application/pdf',
65+
// // ACL: 'private',
66+
// });
67+
expect(data).to.equal('https://123/456');
68+
});
69+
70+
it('should get a signed url for getObject', async () => {
71+
// const spy = sinon.spy(() => 'https://123/456');
72+
// mockS3.on(GetObjectCommand).callsFake(spy);
73+
const spy = sinon.stub(s3RequestPresigner, 'getSignedUrl').resolves('https://123/456');
74+
75+
const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' })
76+
.getSignedUrl('getObject', '1/2');
77+
78+
expect(spy).to.have.been.calledOnce;
79+
// expect(spy).to.have.been.calledWith('getObject', {
80+
// Bucket: 'b1',
81+
// Key: '1/2',
82+
// });
83+
expect(data).to.equal('https://123/456');
84+
});
85+
3086
it('should reuse client per pipeline', () => {
3187
const client1 = Connector.getClient('test1', debug('test'));
3288
const client2 = Connector.getClient('test1', debug('test'));
@@ -161,6 +217,29 @@ describe('connectors/s3.js', () => {
161217
expect(data).to.deep.equal({ DeleteMarker: false });
162218
});
163219

220+
it('should list object versions', async () => {
221+
const spy = sinon.spy(() => [{ VersionId: 'v1' }]);
222+
mockS3.on(ListObjectVersionsCommand).callsFake(spy);
223+
224+
const inputParams = {
225+
Prefix: 'k1',
226+
limit: 20,
227+
};
228+
229+
const data = await new Connector({ debug: debug('s3'), bucketName: 'b1' })
230+
.listObjectVersions(inputParams);
231+
232+
expect(spy).to.have.been.calledWith({
233+
Bucket: 'b1',
234+
Prefix: 'k1',
235+
MaxKeys: 20,
236+
});
237+
expect(data).to.deep.equal({
238+
last: undefined,
239+
data: [{ VersionId: 'v1' }],
240+
});
241+
});
242+
164243
it('should list objects', async () => {
165244
const spy = sinon.spy(() => ({
166245
IsTruncated: false,

0 commit comments

Comments
 (0)