Skip to content

Commit aa5a6b8

Browse files
committed
allow overriding the cdc event field for cdc short circuiting
1 parent d20f4a5 commit aa5a6b8

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

src/flavors/cdc.js

+12-8
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ export const cdc = (rule) => (s) => s // eslint-disable-line import/prefer-defau
2929
.map(toEvent(rule))
3030
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
3131

32-
.through(encryptEvent(rule))
32+
.through(encryptEvent({
33+
sourceField: rule.eventField || 'event',
34+
targetField: rule.eventField || 'event',
35+
...rule,
36+
}))
3337
.through(rule.publish(rule))
3438

3539
.tap(printEndPipeline);
@@ -55,12 +59,12 @@ const toGetRequest = (rule) => faulty((uow) => ({
5559
: undefined,
5660
}));
5761

58-
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent
59-
? uow
60-
: ({
62+
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent // eslint-disable-line no-nested-ternary
63+
? uow : ({
6164
...uow,
62-
event: {
63-
...uow.event,
64-
...await faultify(rule.toEvent)(uow, rule),
65-
},
65+
[rule.eventField || 'event']: rule.eventField
66+
? await faultify(rule.toEvent)(uow, rule) : {
67+
...uow.event,
68+
...await faultify(rule.toEvent)(uow, rule),
69+
},
6670
})));

test/unit/flavors/cdc.test.js

+35-5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,22 @@ describe('flavors/cdc.js', () => {
6161
timestamp: 1548967022000,
6262
},
6363
},
64+
{
65+
timestamp: 1572832690,
66+
keys: {
67+
pk: '1',
68+
sk: 'override',
69+
},
70+
newImage: {
71+
pk: '1',
72+
sk: 'override',
73+
discriminator: 'override',
74+
name: 'Override One',
75+
description: 'This is override one',
76+
ttl: 1549053422,
77+
timestamp: 1548967022000,
78+
},
79+
},
6480
]);
6581

6682
initialize({
@@ -70,7 +86,7 @@ describe('flavors/cdc.js', () => {
7086
.collect()
7187
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
7288
.tap((collected) => {
73-
expect(collected.length).to.equal(2);
89+
expect(collected.length).to.equal(3);
7490
expect(collected[1].pipeline).to.equal('cdc1');
7591
expect(collected[1].event.type).to.equal('thing-created');
7692
expect(collected[1].event.thing).to.deep.equal({
@@ -86,10 +102,17 @@ describe('flavors/cdc.js', () => {
86102
});
87103
expect(collected[1].queryRequest).to.be.undefined;
88104

89-
// this pipeline speeds ahead since it does less async
90-
expect(collected[0].pipeline).to.equal('cdc2');
91-
expect(collected[0].queryRequest).to.not.be.undefined;
92-
expect(collected[0].queryResponse).to.not.be.undefined;
105+
expect(collected[2].pipeline).to.equal('cdc2');
106+
expect(collected[2].queryRequest).to.not.be.undefined;
107+
expect(collected[2].queryResponse).to.not.be.undefined;
108+
109+
expect(collected[0].pipeline).to.equal('cdc3');
110+
expect(collected[0].event.type).to.equal('override-created');
111+
expect(collected[0].event.thing).to.be.undefined;
112+
expect(collected[0].emit).to.be.null;
113+
expect(collected[0].publishRequest).to.deep.equal({
114+
Entries: [],
115+
});
93116
})
94117
.done(done);
95118
});
@@ -129,4 +152,11 @@ const rules = [
129152
flavor: cdc,
130153
eventType: 'x9',
131154
},
155+
{
156+
id: 'cdc3',
157+
flavor: cdc,
158+
toEvent: () => null,
159+
eventField: 'emit',
160+
eventType: /override-*/,
161+
},
132162
];

0 commit comments

Comments
 (0)