Skip to content

Commit 83a862b

Browse files
committed
allow overriding the cdc event field for cdc short circuiting
1 parent d20f4a5 commit 83a862b

File tree

2 files changed

+60
-17
lines changed

2 files changed

+60
-17
lines changed

src/flavors/cdc.js

+10-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ export const cdc = (rule) => (s) => s // eslint-disable-line import/prefer-defau
2929
.map(toEvent(rule))
3030
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
3131

32-
.through(encryptEvent(rule))
32+
.through(encryptEvent({
33+
sourceField: rule.eventField || 'event',
34+
targetField: rule.eventField || 'event',
35+
...rule,
36+
}))
3337
.through(rule.publish(rule))
3438

3539
.tap(printEndPipeline);
@@ -55,12 +59,11 @@ const toGetRequest = (rule) => faulty((uow) => ({
5559
: undefined,
5660
}));
5761

58-
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent
59-
? uow
60-
: ({
62+
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent // eslint-disable-line no-nested-ternary
63+
? uow : ({
6164
...uow,
62-
event: {
63-
...uow.event,
65+
[rule.eventField || 'event']: ({
66+
...(!rule.eventField && uow.event),
6467
...await faultify(rule.toEvent)(uow, rule),
65-
},
68+
}),
6669
})));

test/unit/flavors/cdc.test.js

+50-10
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,22 @@ describe('flavors/cdc.js', () => {
6161
timestamp: 1548967022000,
6262
},
6363
},
64+
{
65+
timestamp: 1572832690,
66+
keys: {
67+
pk: '1',
68+
sk: 'override',
69+
},
70+
newImage: {
71+
pk: '1',
72+
sk: 'override',
73+
discriminator: 'override',
74+
name: 'Override One',
75+
description: 'This is override one',
76+
ttl: 1549053422,
77+
timestamp: 1548967022000,
78+
},
79+
},
6480
]);
6581

6682
initialize({
@@ -70,26 +86,40 @@ describe('flavors/cdc.js', () => {
7086
.collect()
7187
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
7288
.tap((collected) => {
73-
expect(collected.length).to.equal(2);
74-
expect(collected[1].pipeline).to.equal('cdc1');
75-
expect(collected[1].event.type).to.equal('thing-created');
76-
expect(collected[1].event.thing).to.deep.equal({
89+
expect(collected.length).to.equal(3);
90+
expect(collected[0].pipeline).to.equal('cdc1');
91+
expect(collected[0].event.type).to.equal('thing-created');
92+
expect(collected[0].event.thing).to.deep.equal({
7793
id: '1',
7894
name: 'IlRoaW5nIE9uZSI=', // 'Thing One',
7995
description: 'This is thing one',
8096
});
81-
expect(collected[1].event.tags).to.deep.equal({
97+
expect(collected[0].event.tags).to.deep.equal({
8298
region: 'us-west-2',
8399
field1: 'v1',
84100
...envTags('cdc1'),
85101
...skipTag(),
86102
});
87-
expect(collected[1].queryRequest).to.be.undefined;
103+
expect(collected[0].queryRequest).to.be.undefined;
104+
105+
expect(collected[1].pipeline).to.equal('cdc2');
106+
expect(collected[1].queryRequest).to.not.be.undefined;
107+
expect(collected[1].queryResponse).to.not.be.undefined;
88108

89-
// this pipeline speeds ahead since it does less async
90-
expect(collected[0].pipeline).to.equal('cdc2');
91-
expect(collected[0].queryRequest).to.not.be.undefined;
92-
expect(collected[0].queryResponse).to.not.be.undefined;
109+
expect(collected[2].pipeline).to.equal('cdc3');
110+
expect(collected[2].event.type).to.equal('override-created');
111+
expect(collected[2].event.thing).to.be.undefined;
112+
expect(collected[2].emit.thing).to.deep.equal({
113+
id: '1',
114+
name: 'Ik92ZXJyaWRlIE9uZSI=', // 'Override One'
115+
description: 'This is override one',
116+
});
117+
expect(collected[2].emit.tags).to.deep.equal({
118+
region: 'us-west-2',
119+
field1: 'v1',
120+
...envTags('cdc3'),
121+
...skipTag(),
122+
});
93123
})
94124
.done(done);
95125
});
@@ -129,4 +159,14 @@ const rules = [
129159
flavor: cdc,
130160
eventType: 'x9',
131161
},
162+
{
163+
id: 'cdc3',
164+
flavor: cdc,
165+
toEvent,
166+
eventField: 'emit',
167+
eventType: /override-*/,
168+
eem: {
169+
fields: ['name'],
170+
},
171+
},
132172
];

0 commit comments

Comments
 (0)