Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow overriding the cdc event field #307

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions src/flavors/cdc.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ export const cdc = (rule) => (s) => s // eslint-disable-line import/prefer-defau
.map(toEvent(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)

.through(encryptEvent(rule))
.through(encryptEvent({
sourceField: rule.eventField || 'event',
targetField: rule.eventField || 'event',
...rule,
}))
.through(rule.publish(rule))

.tap(printEndPipeline);
Expand All @@ -55,12 +59,12 @@ const toGetRequest = (rule) => faulty((uow) => ({
: undefined,
}));

const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent
? uow
: ({
const toEvent = (rule) => faultyAsyncStream(async (uow) => (!rule.toEvent // eslint-disable-line no-nested-ternary
? uow : ({
...uow,
event: {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would think we just need to override this field with [rule.eventField || 'event']

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my mind the purpose of overriding the event field would be to avoid spreading the uow.event in the toEvent function like in job. I think the commit I just added has this simplification but still allows avoiding spreading uow.event

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revisiting this after an update - the purpose of this MR is to be able to override the cdc event field so that the toEvent can short circuit and not publish. currently, if you return null or undefined in toEvent, cdc will still publish. we can't add a field like shortCircuit: true to the rule because it won't be backwards compatible

...uow.event,
...await faultify(rule.toEvent)(uow, rule),
},
[rule.eventField || 'event']: rule.eventField
? await faultify(rule.toEvent)(uow, rule) : {
...uow.event,
...await faultify(rule.toEvent)(uow, rule),
},
})));
40 changes: 35 additions & 5 deletions test/unit/flavors/cdc.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ describe('flavors/cdc.js', () => {
timestamp: 1548967022000,
},
},
{
timestamp: 1572832690,
keys: {
pk: '1',
sk: 'override',
},
newImage: {
pk: '1',
sk: 'override',
discriminator: 'override',
name: 'Override One',
description: 'This is override one',
ttl: 1549053422,
timestamp: 1548967022000,
},
},
]);

initialize({
Expand All @@ -70,7 +86,7 @@ describe('flavors/cdc.js', () => {
.collect()
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
.tap((collected) => {
expect(collected.length).to.equal(2);
expect(collected.length).to.equal(3);
expect(collected[1].pipeline).to.equal('cdc1');
expect(collected[1].event.type).to.equal('thing-created');
expect(collected[1].event.thing).to.deep.equal({
Expand All @@ -86,10 +102,17 @@ describe('flavors/cdc.js', () => {
});
expect(collected[1].queryRequest).to.be.undefined;

// this pipeline speeds ahead since it does less async
expect(collected[0].pipeline).to.equal('cdc2');
expect(collected[0].queryRequest).to.not.be.undefined;
expect(collected[0].queryResponse).to.not.be.undefined;
expect(collected[2].pipeline).to.equal('cdc2');
expect(collected[2].queryRequest).to.not.be.undefined;
expect(collected[2].queryResponse).to.not.be.undefined;

expect(collected[0].pipeline).to.equal('cdc3');
expect(collected[0].event.type).to.equal('override-created');
expect(collected[0].event.thing).to.be.undefined;
expect(collected[0].emit).to.be.null;
expect(collected[0].publishRequest).to.deep.equal({
Entries: [],
});
})
.done(done);
});
Expand Down Expand Up @@ -129,4 +152,11 @@ const rules = [
flavor: cdc,
eventType: 'x9',
},
{
id: 'cdc3',
flavor: cdc,
toEvent: () => null,
eventField: 'emit',
eventType: /override-*/,
},
];
Loading