Skip to content

Commit 2a24ed8

Browse files
committed
allow overriding the cdc event field
1 parent 91bd401 commit 2a24ed8

File tree

2 files changed

+61
-12
lines changed

2 files changed

+61
-12
lines changed

src/flavors/cdc.js

+18-10
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ export const cdc = (rule) => (s) => s // eslint-disable-line import/prefer-defau
2424
.map(toEvent(rule))
2525
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
2626

27-
.through(encryptEvent(rule))
27+
.through(encryptEvent({
28+
sourceField: rule.eventField || 'event',
29+
targetField: rule.eventField || 'event',
30+
...rule,
31+
}))
2832
.through(rule.publish(rule))
2933

3034
.tap(printEndPipeline);
@@ -50,12 +54,16 @@ const toGetRequest = (rule) => faulty((uow) => ({
5054
: undefined,
5155
}));
5256

53-
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent
54-
? uow
55-
: ({
56-
...uow,
57-
event: {
58-
...uow.event,
59-
...await faultify(rule.toEvent)(uow, rule),
60-
},
61-
})));
57+
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent // eslint-disable-line no-nested-ternary
58+
? uow : typeof rule.eventField === 'string' && rule.eventField !== 'event'
59+
? ({
60+
...uow,
61+
[rule.eventField]: await faultify(rule.toEvent)(uow, rule),
62+
})
63+
: ({
64+
...uow,
65+
event: {
66+
...uow.event,
67+
...await faultify(rule.toEvent)(uow, rule),
68+
},
69+
})));

test/unit/flavors/cdc.test.js

+43-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,22 @@ describe('flavors/cdc.js', () => {
6161
timestamp: 1548967022000,
6262
},
6363
},
64+
{
65+
timestamp: 1572832690,
66+
keys: {
67+
pk: '1',
68+
sk: 'override',
69+
},
70+
newImage: {
71+
pk: '1',
72+
sk: 'override',
73+
discriminator: 'override',
74+
name: 'Override One',
75+
description: 'This is override one',
76+
ttl: 1549053422,
77+
timestamp: 1548967022000,
78+
},
79+
},
6480
]);
6581

6682
initialize({
@@ -70,7 +86,7 @@ describe('flavors/cdc.js', () => {
7086
.collect()
7187
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
7288
.tap((collected) => {
73-
expect(collected.length).to.equal(2);
89+
expect(collected.length).to.equal(3);
7490
expect(collected[1].pipeline).to.equal('cdc1');
7591
expect(collected[1].event.type).to.equal('thing-created');
7692
expect(collected[1].event.thing).to.deep.equal({
@@ -86,7 +102,22 @@ describe('flavors/cdc.js', () => {
86102
});
87103
expect(collected[1].queryRequest).to.be.undefined;
88104

89-
// this pipeline speeds ahead since it does less async
105+
expect(collected[2].pipeline).to.equal('cdc3');
106+
expect(collected[2].event.type).to.equal('override-created');
107+
expect(collected[2].event.thing).to.be.undefined;
108+
expect(collected[2].emit.thing).to.deep.equal({
109+
id: '1',
110+
name: 'Ik92ZXJyaWRlIE9uZSI=', // 'Override One'
111+
description: 'This is override one',
112+
});
113+
expect(collected[2].emit.tags).to.deep.equal({
114+
region: 'us-west-2',
115+
field1: 'v1',
116+
...envTags('cdc3'),
117+
...skipTag(),
118+
});
119+
120+
// this pipeline speeds ahead since they do less async
90121
expect(collected[0].pipeline).to.equal('cdc2');
91122
expect(collected[0].queryRequest).to.not.be.undefined;
92123
expect(collected[0].queryResponse).to.not.be.undefined;
@@ -129,4 +160,14 @@ const rules = [
129160
flavor: cdc,
130161
eventType: 'x9',
131162
},
163+
{
164+
id: 'cdc3',
165+
flavor: cdc,
166+
toEvent,
167+
eventField: 'emit',
168+
eventType: /override-*/,
169+
eem: {
170+
fields: ['name'],
171+
},
172+
},
132173
];

0 commit comments

Comments
 (0)