Skip to content

Commit 5c72a55

Browse files
author
John Gilbert
committed
add-firehose-support-2
1 parent e00f09e commit 5c72a55

File tree

8 files changed

+299
-2
lines changed

8 files changed

+299
-2
lines changed

src/filters/event.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export const filterOnEventType = (rule, uow) => {
1919

2020
export const prefilterOnEventTypes = (rules) =>
2121
(uow) =>
22-
rules.reduce((a, c) => a || filterOnEventType(c, uow), false);
22+
rules.reduce((a, r) => a || filterOnEventType(r, uow), false);
2323

2424
export const filterOnContent = (rule, uow) => {
2525
/* istanbul ignore else */
@@ -29,3 +29,7 @@ export const filterOnContent = (rule, uow) => {
2929
return true;
3030
}
3131
};
32+
33+
export const prefilterOnContent = (rules) =>
34+
(uow) =>
35+
rules.reduce((a, r) => a || filterOnContent(r, uow), false);

src/flavors/firehose.js

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import {
2+
printStartPipeline, printEndPipeline,
3+
faulty,
4+
} from '../utils';
5+
import {
6+
filterOnEventType, filterOnContent,
7+
prefilterOnEventTypes, prefilterOnContent,
8+
} from '../filters';
9+
10+
export const firehoseTransform = (rule) => (s) => s
11+
.filter(onEventType(rule))
12+
.tap(printStartPipeline)
13+
14+
.filter(onContent(rule))
15+
16+
.map(transform(rule))
17+
18+
.tap(printEndPipeline);
19+
20+
export const firehoseDrop = (rules) => (opt) => {
21+
console.log('%j', { opt });
22+
console.log('%j', { rules });
23+
return (s) => s
24+
.filter((uow) => !(prefilterOnEventTypes(rules)(uow) && prefilterOnContent(rules)(uow)))
25+
.tap(printEndPipeline);
26+
};
27+
28+
const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow));
29+
const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow));
30+
31+
const spreadDateTime = (dt) => {
32+
const date = new Date(dt);
33+
34+
return {
35+
year: `${date.getFullYear()}`,
36+
month: `${date.getMonth() + 1}`, // JavaScript months are 0-indexed
37+
day: `${date.getDate()}`,
38+
hour: `${date.getHours()}`,
39+
minute: `${date.getMinutes()}`,
40+
};
41+
};
42+
43+
const metadata = (rule, uow) => (rule.metadata
44+
? /* istanbul ignore next */ rule.metadata(uow, rule)
45+
: {
46+
partitionKeys: {
47+
table: rule.tableName,
48+
...spreadDateTime(uow.event.timestamp),
49+
},
50+
});
51+
52+
const transform = (rule) => faulty((uow) => {
53+
const transformed = rule.transform
54+
? rule.transform(uow, rule)
55+
: /* istanbul ignore next */ {};
56+
57+
return {
58+
...uow,
59+
transformed,
60+
result: 'Ok',
61+
data: Buffer.from(JSON.stringify(transformed), 'utf-8').toString('base64'),
62+
metadata: metadata(rule, uow),
63+
};
64+
});

src/flavors/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export * from './correlate';
33
export * from './cdc';
44
export * from './evaluate';
55
export * from './expired';
6+
export * from './firehose';
67
export * from './job';
78
export * from './materialize';
89
export * from './materializeS3';

src/from/firehose.js

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import _ from 'highland';
2+
3+
import {
4+
faulty, decompress, compress,
5+
} from '../utils';
6+
import { outSkip } from '../filters';
7+
import { redeemClaimcheck } from '../queries';
8+
9+
export const fromFirehose = (event) =>
10+
11+
_(event.records)
12+
.map((record) =>
13+
// create a unit-of-work for each event
14+
// so we can correlate related work for error handling
15+
({
16+
record,
17+
event: Buffer.from(record.data, 'base64').toString('utf8'),
18+
recordId: record.recordId,
19+
result: 'Dropped', // by default, set to 'Ok' in transform, 'ProcessingFailed' in fault processing ???
20+
}))
21+
22+
.map(faulty((uow) => ({
23+
...uow,
24+
event: JSON.parse(uow.event, decompress),
25+
})))
26+
.filter(outSkip)
27+
.through(redeemClaimcheck());
28+
29+
// test helper
30+
export const toFirehoseRecords = (events, approximateArrivalTimestamp) => ({
31+
invocationId: 'invocationIdExample',
32+
deliveryStreamArn: 'arn:aws:kinesis:TEST',
33+
region: process.env.AWS_REGION || /* istanbul ignore next */ 'us-west-2',
34+
records: events.map((e, i) =>
35+
({
36+
recordId: `${i}`, // "49546986683135544286507457936321625675700192471156785154",
37+
data: Buffer.from(JSON.stringify(e, compress())).toString('base64'),
38+
approximateArrivalTimestamp, // format: 1495072949453
39+
})),
40+
});
41+
42+
export const UNKNOWN_FIREHOSE_EVENT_TYPE = toFirehoseRecords([{ type: 'unknown-type' }]);

src/utils/handler.js

+18
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,21 @@ export const toPromise = (s) => {
3939
});
4040
}
4141
};
42+
43+
export const toFirehose = (s) => new Promise((resolve, reject) => {
44+
const records = [];
45+
s.consume((err, x, push, next) => {
46+
/* istanbul ignore if */
47+
if (err) {
48+
reject(err);
49+
} else if (x === _.nil) {
50+
resolve({
51+
records,
52+
});
53+
} else {
54+
records.push(x);
55+
next();
56+
}
57+
})
58+
.resume();
59+
});

test/unit/flavors/firehose.test.js

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
5+
import {
6+
initialize, initializeFrom,
7+
} from '../../../src';
8+
9+
import { toFirehoseRecords, fromFirehose } from '../../../src/from/firehose';
10+
import { firehoseTransform, firehoseDrop } from '../../../src/flavors/firehose';
11+
12+
describe('flavors/firehose.js', () => {
13+
beforeEach(() => {
14+
});
15+
16+
afterEach(sinon.restore);
17+
18+
it('should execute', (done) => {
19+
const events = toFirehoseRecords([
20+
{
21+
type: 't1',
22+
timestamp: 1734754684001,
23+
thing: {
24+
id: '1',
25+
name: 'Thing One',
26+
description: 'This is thing one',
27+
},
28+
},
29+
{
30+
type: 't2',
31+
timestamp: 1734754684001,
32+
thing: {
33+
id: '2',
34+
name: 'Thing Two',
35+
description: 'This is thing two',
36+
},
37+
},
38+
{
39+
type: 't3', // should be dropped
40+
timestamp: 1734754684001,
41+
},
42+
]);
43+
44+
initialize({
45+
...initializeFrom(rules),
46+
drop: firehoseDrop(rules),
47+
})
48+
.assemble(fromFirehose(events), false)
49+
.collect()
50+
// .tap((records) => console.log(JSON.stringify(records, null, 2)))
51+
.tap((records) => {
52+
expect(records.length).to.equal(3);
53+
expect(records[0].pipeline).to.equal('ft1');
54+
expect(records[0].event.type).to.equal('t1');
55+
expect(records[0].transformed).to.deep.equal({
56+
ID: '1',
57+
NM: 'Thing One',
58+
DESC: 'This is thing one',
59+
});
60+
expect(records[0].metadata).to.deep.equal({
61+
partitionKeys: {
62+
table: 'T1',
63+
year: '2024',
64+
month: '12',
65+
day: '20',
66+
hour: '23',
67+
minute: '18',
68+
},
69+
});
70+
expect(records[0].data).to.equal('eyJJRCI6IjEiLCJOTSI6IlRoaW5nIE9uZSIsIkRFU0MiOiJUaGlzIGlzIHRoaW5nIG9uZSJ9');
71+
expect(records[0].result).to.equal('Ok');
72+
expect(records[1].result).to.equal('Ok');
73+
expect(records[2].result).to.equal('Dropped');
74+
})
75+
.done(done);
76+
});
77+
});
78+
79+
const transform = (uow) => ({
80+
ID: uow.event.thing.id,
81+
NM: uow.event.thing.name,
82+
DESC: uow.event.thing.description,
83+
});
84+
85+
const rules = [
86+
{
87+
id: 'ft1',
88+
flavor: firehoseTransform,
89+
eventType: 't1',
90+
tableName: 'T1',
91+
transform,
92+
},
93+
{
94+
id: 'ft2',
95+
flavor: firehoseTransform,
96+
eventType: 't2',
97+
filters: [() => true],
98+
tableName: 'T2',
99+
transform,
100+
},
101+
{
102+
id: 'other1',
103+
eventType: 'x9',
104+
flavor: firehoseTransform,
105+
},
106+
];

test/unit/from/firehose.test.js

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
4+
import { fromFirehose, toFirehoseRecords } from '../../../src/from/firehose';
5+
6+
describe('from/firehose.js', () => {
7+
it('should parse records', (done) => {
8+
const event = toFirehoseRecords([
9+
{
10+
id: '1',
11+
type: 't1',
12+
partitionKey: '1',
13+
},
14+
{
15+
id: 'x',
16+
type: 't1',
17+
partitionKey: '1',
18+
tags: {
19+
skip: true,
20+
},
21+
},
22+
]);
23+
24+
// console.log(JSON.stringify({ event }, null, 2));
25+
26+
fromFirehose(event)
27+
.collect()
28+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
29+
.tap((collected) => {
30+
expect(collected.length).to.equal(1);
31+
expect(collected[0]).to.deep.equal({
32+
record: {
33+
recordId: '0',
34+
approximateArrivalTimestamp: undefined,
35+
data: 'eyJpZCI6IjEiLCJ0eXBlIjoidDEiLCJwYXJ0aXRpb25LZXkiOiIxIn0=',
36+
},
37+
event: {
38+
id: '1',
39+
type: 't1',
40+
partitionKey: '1',
41+
},
42+
recordId: '0',
43+
result: 'Dropped',
44+
});
45+
})
46+
.done(done);
47+
});
48+
});

test/unit/utils/handler.test.js

+15-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import sinon from 'sinon';
55
import _ from 'highland';
66
import Promise from 'bluebird';
77

8-
import { mw, toPromise } from '../../../src/utils/handler';
8+
import { mw, toPromise, toFirehose } from '../../../src/utils/handler';
99

1010
describe('utils/handler.js', () => {
1111
afterEach(sinon.restore);
@@ -78,4 +78,18 @@ describe('utils/handler.js', () => {
7878
.then(() => Promise.reject(new Error('failed')))
7979
.catch((err) => ('Caught'));
8080
});
81+
82+
it('should return firehose payload', async () => {
83+
const handlerWithPromise = async (event, context) =>
84+
_(event.records)
85+
.through(toFirehose);
86+
87+
const result = await handlerWithPromise({
88+
records: [{ data: 'r11' }, { data: 'r12' }],
89+
}, {});
90+
91+
expect(result).to.deep.equal({
92+
records: [{ data: 'r11' }, { data: 'r12' }],
93+
});
94+
});
8195
});

0 commit comments

Comments
 (0)