Skip to content

Commit 71d1c37

Browse files
authored
add-timestream-support (#384)
* add-timestream-support * clean up
1 parent 32d50b1 commit 71d1c37

17 files changed

+2047
-431
lines changed

package-lock.json

Lines changed: 1651 additions & 421 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.0.25",
3+
"version": "1.0.26",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",
@@ -69,6 +69,7 @@
6969
"@aws-sdk/client-sns": "^3.450.0",
7070
"@aws-sdk/client-sqs": "^3.450.0",
7171
"@aws-sdk/client-sts": "^3.450.0",
72+
"@aws-sdk/client-timestream-write": "^3.450.0",
7273
"@aws-sdk/lib-dynamodb": "^3.450.0",
7374
"@aws-sdk/util-dynamodb": "^3.450.0",
7475
"@babel/cli": "^7.10.0",

src/connectors/timestream.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */
2+
import { TimestreamWriteClient, WriteRecordsCommand } from '@aws-sdk/client-timestream-write';
3+
import { NodeHttpHandler } from '@smithy/node-http-handler';
4+
import Promise from 'bluebird';
5+
import { omit, pick } from 'lodash';
6+
import { defaultDebugLogger } from '../utils/log';
7+
8+
class Connector {
9+
constructor({
10+
debug,
11+
pipelineId,
12+
timeout = Number(process.env.CW_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
13+
additionalClientOpts = {},
14+
...opt
15+
}) {
16+
this.debug = (msg) => debug('%j', msg);
17+
this.client = Connector.getClient(pipelineId, debug, timeout, additionalClientOpts); this.opt = opt;
18+
}
19+
20+
static clients = {};
21+
22+
static getClient(pipelineId, debug, timeout, additionalClientOpts) {
23+
const addlRequestHandlerOpts = pick(additionalClientOpts, ['requestHandler']);
24+
const addlClientOpts = omit(additionalClientOpts, ['requestHandler']);
25+
26+
if (!this.clients[pipelineId]) {
27+
this.clients[pipelineId] = new TimestreamWriteClient({
28+
requestHandler: new NodeHttpHandler({
29+
requestTimeout: timeout,
30+
connectionTimeout: timeout,
31+
...addlRequestHandlerOpts,
32+
}),
33+
logger: defaultDebugLogger(debug),
34+
...addlClientOpts,
35+
});
36+
}
37+
return this.clients[pipelineId];
38+
}
39+
40+
writeRecords(params, ctx) {
41+
return this._sendCommand(new WriteRecordsCommand(params), ctx);
42+
}
43+
44+
_sendCommand(command, ctx) {
45+
this.opt.metrics?.capture(this.client, command, 'timestream', this.opt, ctx);
46+
return Promise.resolve(this.client.send(command))
47+
.tap(this.debug)
48+
.tapCatch(this.debug);
49+
}
50+
}
51+
52+
export default Connector;

src/flavors/materializeTimestream.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import {
2+
printStartPipeline, printEndPipeline,
3+
faulty,
4+
splitObject, compact,
5+
} from '../utils';
6+
import {
7+
writeRecords,
8+
} from '../sinks/timestream';
9+
import {
10+
filterOnEventType, filterOnContent,
11+
} from '../filters';
12+
13+
export const materializeTimestream = (rule) => (s) => s // eslint-disable-line import/prefer-default-export
14+
.filter(onEventType(rule))
15+
.tap(printStartPipeline)
16+
17+
.filter(onContent(rule))
18+
19+
.through(compact(rule))
20+
.through(splitObject(rule))
21+
22+
.map(toWriteRequest(rule))
23+
.through(writeRecords(rule))
24+
25+
.tap(printEndPipeline);
26+
27+
const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow));
28+
const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow));
29+
30+
const toWriteRequest = (rule) => faulty((uow) => ({
31+
...uow,
32+
writeRequest: rule.toWriteRequest(uow, rule),
33+
}));

src/sinks/timestream.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import _ from 'highland';
2+
3+
import Connector from '../connectors/timestream';
4+
5+
import { rejectWithFault } from '../utils/faults';
6+
import { debug as d } from '../utils/print';
7+
8+
export const writeRecords = ({ // eslint-disable-line import/prefer-default-export
9+
id: pipelineId,
10+
debug = d('ts'),
11+
writeRequestField = 'writeRequest',
12+
writeResponseField = 'writeResponse',
13+
parallel = Number(process.env.TIMESTREAM_PARALLEL) || Number(process.env.PARALLEL) || 8,
14+
step = 'save',
15+
...opt
16+
} = {}) => {
17+
const connector = new Connector({ pipelineId, debug, ...opt });
18+
19+
const write = (uow) => {
20+
// istanbul ignore next
21+
if (!uow[writeRequestField]) return _(Promise.resolve(uow));
22+
23+
const p = () => connector.writeRecords(uow[writeRequestField])
24+
.then((writeResponse) => ({ ...uow, [writeResponseField]: writeResponse }))
25+
.catch(rejectWithFault(uow));
26+
27+
return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
28+
};
29+
30+
return (s) => s
31+
.map(write)
32+
.parallel(parallel);
33+
};
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
import debug from 'debug';
5+
import { TimestreamWriteClient, WriteRecordsCommand } from '@aws-sdk/client-timestream-write';
6+
import { mockClient } from 'aws-sdk-client-mock';
7+
8+
import Connector from '../../../src/connectors/timestream';
9+
10+
describe('connectors/timestream.js', () => {
11+
let mockCloudWatch;
12+
13+
beforeEach(() => {
14+
mockCloudWatch = mockClient(TimestreamWriteClient);
15+
});
16+
17+
afterEach(() => {
18+
mockCloudWatch.restore();
19+
});
20+
21+
it('should reuse client per pipeline', () => {
22+
const client1 = Connector.getClient('test1', debug('test'));
23+
const client2 = Connector.getClient('test1', debug('test'));
24+
const client3 = Connector.getClient('test2', debug('test'));
25+
26+
expect(client1).to.eq(client2);
27+
expect(client2).to.not.eq(client3);
28+
});
29+
30+
it('should write', async () => {
31+
const spy = sinon.spy((_) => ({
32+
RecordsIngested: {
33+
Total: 1,
34+
},
35+
}));
36+
mockCloudWatch.on(WriteRecordsCommand).callsFake(spy);
37+
38+
const params = {
39+
DatabaseName: 'd1',
40+
TableName: 't1',
41+
Records: [{
42+
Dimensions: [
43+
{
44+
Name: 'account',
45+
Value: 'dev',
46+
}, {
47+
Name: 'region',
48+
Value: 'us-east-1',
49+
}, {
50+
Name: 'source',
51+
Value: 'service-x',
52+
}, {
53+
Name: 'function',
54+
Value: 'f1',
55+
}, {
56+
Name: 'pipeline',
57+
Value: 'p1',
58+
}, {
59+
Name: 'type',
60+
Value: 'thing-created',
61+
},
62+
],
63+
MeasureName: 'domain.event',
64+
MeasureValue: '1',
65+
MeasureValueType: 'BIGINT',
66+
Time: '1726940256001',
67+
TimeUnit: 'MILLISECONDS',
68+
}],
69+
};
70+
const data = await new Connector({ debug: debug('cw') })
71+
.writeRecords(params);
72+
73+
expect(spy).to.have.been.calledOnce;
74+
expect(spy).to.have.been.calledWith({
75+
DatabaseName: 'd1',
76+
TableName: 't1',
77+
Records: [{
78+
Dimensions: [
79+
{
80+
Name: 'account',
81+
Value: 'dev',
82+
}, {
83+
Name: 'region',
84+
Value: 'us-east-1',
85+
}, {
86+
Name: 'source',
87+
Value: 'service-x',
88+
}, {
89+
Name: 'function',
90+
Value: 'f1',
91+
}, {
92+
Name: 'pipeline',
93+
Value: 'p1',
94+
}, {
95+
Name: 'type',
96+
Value: 'thing-created',
97+
},
98+
],
99+
MeasureName: 'domain.event',
100+
MeasureValue: '1',
101+
MeasureValueType: 'BIGINT',
102+
Time: '1726940256001',
103+
TimeUnit: 'MILLISECONDS',
104+
}],
105+
});
106+
expect(data).to.deep.equal({
107+
RecordsIngested: {
108+
Total: 1,
109+
},
110+
});
111+
});
112+
});
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
5+
import {
6+
initialize, initializeFrom,
7+
ttl,
8+
} from '../../../src';
9+
10+
import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis';
11+
12+
import Connector from '../../../src/connectors/timestream';
13+
14+
import { materializeTimestream } from '../../../src/flavors/materializeTimestream';
15+
16+
describe('flavors/materializeTimestream.js', () => {
17+
beforeEach(() => {
18+
sinon.stub(Connector.prototype, 'writeRecords').resolves({});
19+
});
20+
21+
afterEach(sinon.restore);
22+
23+
it('should execute', (done) => {
24+
const events = toKinesisRecords([
25+
{
26+
type: 'thing-submitted',
27+
timestamp: 1548967022000,
28+
thing: {
29+
id: '1',
30+
status: 's1',
31+
},
32+
},
33+
{
34+
type: 'thing-submitted',
35+
timestamp: 1548967022000,
36+
thing: {
37+
id: '2',
38+
status: 's1',
39+
},
40+
},
41+
]);
42+
43+
initialize({
44+
...initializeFrom(rules),
45+
})
46+
.assemble(fromKinesis(events), false)
47+
.collect()
48+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
49+
.tap((collected) => {
50+
expect(collected.length).to.equal(2);
51+
expect(collected[0].pipeline).to.equal('m1');
52+
expect(collected[0].event.type).to.equal('thing-submitted');
53+
expect(collected[0].writeRequest).to.deep.equal({
54+
DatabaseName: 'd1',
55+
TableName: 't1',
56+
Records: [
57+
{
58+
Dimensions: [
59+
{
60+
Name: 'type',
61+
Value: 'thing-submitted',
62+
},
63+
{
64+
Name: 'status',
65+
Value: 's1',
66+
},
67+
],
68+
MeasureName: 'domain.event',
69+
MeasureValue: '1',
70+
MeasureValueType: 'BIGINT',
71+
Time: '1548967022000',
72+
TimeUnit: 'MILLISECONDS',
73+
},
74+
],
75+
});
76+
})
77+
.done(done);
78+
});
79+
});
80+
81+
const toWriteRequest = (uow) => ({
82+
DatabaseName: 'd1',
83+
TableName: 't1',
84+
Records: [{
85+
Dimensions: [
86+
{
87+
Name: 'type',
88+
Value: uow.event.type,
89+
}, {
90+
Name: 'status',
91+
Value: uow.event.thing.status,
92+
},
93+
],
94+
MeasureName: 'domain.event',
95+
MeasureValue: '1',
96+
MeasureValueType: 'BIGINT',
97+
Time: `${uow.event.timestamp}`,
98+
TimeUnit: 'MILLISECONDS',
99+
}],
100+
});
101+
102+
const rules = [
103+
{
104+
id: 'm1',
105+
flavor: materializeTimestream,
106+
eventType: 'thing-submitted',
107+
toWriteRequest,
108+
},
109+
];

test/unit/sinks/cloudwatch.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { putMetrics } from '../../../src/sinks/cloudwatch';
77

88
import Connector from '../../../src/connectors/cloudwatch';
99

10-
describe('utils/cloudwatch.js', () => {
10+
describe('sinks/cloudwatch.js', () => {
1111
afterEach(sinon.restore);
1212

1313
it('should putMetrics', (done) => {

test/unit/sinks/dynamodb.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212

1313
import Connector from '../../../src/connectors/dynamodb';
1414

15-
describe('utils/dynamodb.js', () => {
15+
describe('sinks/dynamodb.js', () => {
1616
afterEach(sinon.restore);
1717

1818
it('should calculate ttl', () => {

test/unit/sinks/eventbridge.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { publishToEventBridge as publish } from '../../../src/sinks/eventbridge'
77

88
import Connector from '../../../src/connectors/eventbridge';
99

10-
describe('utils/eventbridge.js', () => {
10+
describe('sinks/eventbridge.js', () => {
1111
afterEach(sinon.restore);
1212

1313
it('should batch and publish', (done) => {

0 commit comments

Comments
 (0)