-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy patheventbridge.js
94 lines (81 loc) · 3.11 KB
/
eventbridge.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import _ from 'highland';
import Connector from '../connectors/eventbridge';
import { toBatchUow, unBatchUow, batchWithSize } from '../utils/batch';
import { rejectWithFault, FAULT_COMPRESSION_IGNORE } from '../utils/faults';
import { debug as d } from '../utils/print';
import { adornStandardTags } from '../utils/tags';
import { compress } from '../utils/compression';
import { ratelimit } from '../utils/ratelimit';
import { storeClaimcheck } from './claimcheck';
export const publishToEventBridge = ({ // eslint-disable-line import/prefer-default-export
id: pipelineId,
debug = d('eventbridge'),
busName = process.env.BUS_NAME || 'undefined',
source = process.env.BUS_SRC || 'custom', // could change this to internal vs external/ingress/egress
eventField = 'event', // is often named emit
publishRequestEntryField = 'publishRequestEntry',
publishRequestField = 'publishRequest', // was inputParams
maxPublishRequestSize = Number(process.env.PUBLISH_MAX_REQ_SIZE) || Number(process.env.MAX_REQ_SIZE) || 256 * 1024,
batchSize = Number(process.env.PUBLISH_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 10,
parallel = Number(process.env.PUBLISH_PARALLEL) || Number(process.env.PARALLEL) || 8,
endpointId = process.env.BUS_ENDPOINT_ID,
handleErrors = true,
retryConfig,
step = 'publish',
...opt
} = {}) => {
const connector = new Connector({
pipelineId, debug, retryConfig, ...opt,
});
const toPublishRequestEntry = (uow) => ({
...uow,
[publishRequestEntryField]: uow[eventField] ? {
EventBusName: busName,
Source: source,
DetailType: uow[eventField].type,
Detail: JSON.stringify(uow[eventField],
compress(uow[eventField].type !== 'fault' ? opt : { ...opt, compressionIgnore: FAULT_COMPRESSION_IGNORE })),
} : undefined,
});
const toPublishRequest = (batchUow) => {
const Entries = batchUow.batch
.filter((uow) => uow[publishRequestEntryField])
.map((uow) => uow[publishRequestEntryField]);
return {
...batchUow,
[publishRequestField]: endpointId ? /* istanbul ignore next */ {
Entries,
EndpointId: endpointId,
} : {
Entries,
},
};
};
const putEvents = (batchUow) => {
if (!batchUow[publishRequestField].Entries.length) {
return _(Promise.resolve(batchUow));
}
const p = () => connector.putEvents(batchUow[publishRequestField], batchUow)
.catch(rejectWithFault(batchUow, !handleErrors))
.then((publishResponse) => ({ ...batchUow, publishResponse }));
return _(batchUow.batch[0].metrics?.w(p, step) || p()); // wrap promise in a stream
};
return (s) => s
.map(adornStandardTags(eventField))
.through(ratelimit(opt))
.map(toPublishRequestEntry)
.consume(batchWithSize({
...opt,
batchSize,
maxRequestSize: maxPublishRequestSize,
requestEntryField: publishRequestEntryField,
requestField: publishRequestField,
debug,
}))
.through(storeClaimcheck(opt))
.map(toBatchUow)
.map(toPublishRequest)
.map(putEvents)
.parallel(parallel)
.flatMap(unBatchUow); // for cleaner logging and testing
};