1
+ import { DynamoDBClient , paginateScan } from "@aws-sdk/client-dynamodb" ;
2
+ import { LambdaClient , InvokeCommand } from "@aws-sdk/client-lambda" ;
3
+ import _ from 'lodash' ;
4
+
5
+ /*
6
+ Script to replay dynamodb documents to the ddbToES lambda. Best effort to only send the last version
7
+ This script is useful when an OpenSearch cluster has been corrupt, failed or migrating to a new cluster.
8
+
9
+ The script is relatively limited in that it's a single process, does not support resumption and can be logically
10
+ inaccurate if the system is receiving real workloads. So, this is useful for small to medium size implementations
11
+ DR/migration scenarios. For a point of reference, we've seen a codebuild small ARM instance replay ~1m dynamodb
12
+ documents to ElasticSearch per hour with a SEGMENT_COUNT=15 and SCAN_PAGE_LIMIT=300 in a FWoA `dev` stage deployment.
13
+ The throughput of the script is initially bound by the write throughput of the ElasticSearch cluster. However, at some
14
+ point that will switch from ElasticSearch write throughput to DynamoDB read throughput.
15
+
16
+ Low hanging fruit improvements would be to decouple the dynamodb reads from the ElasticSearch writes. An in-memory queue
17
+ could be used to let the readers buffer dynamodb documents until there is back pressure from the ElasticSearch writers.
18
+ Care would need to be taken to make sure there's a steady state of memory so we don't throw an OOM error and crash.
19
+
20
+ Long term, support for large datasets will require being able to horizontally scale the synchronization process. The
21
+ hard challenge here will be finding a way to gurantee that the last vid per id is written last. This script does the
22
+ guaranteeing of the last vid is always written last by using a shared idToGreatestVid and tossing any documents with a
23
+ smaller vid than the greatest known vid per id. To the best of our knowledge, there is not guarantees in dynamodb paging
24
+ segments to receive all shard keys in a single segment. Therefore, some shared state would be needed across nodes to synchronize
25
+ the greatest vid.
26
+
27
+ setup
28
+ ------
29
+ dirName=$RANDOM
30
+ dirPath="/tmp/${dirName}/"
31
+ mkdir -p "${dirPath}"
32
+ cp scripts/replay-ddbToES.js "${dirPath}"
33
+ cd "${dirPath}"
34
+ npm init -y
35
+ echo $(cat package.json | jq '. += {"type":"module"}') > package.json
36
+ npm install @aws -sdk/[email protected]
37
+ npm install @aws-sdk/[email protected]
38
+
39
+
40
+ run
41
+ -------
42
+ AWS_REGION="<my-region>" \
43
+ AWS_PROFILE="<my-profile>" \
44
+ SEGMENT_COUNT="3" \
45
+ SCAN_PAGE_LIMIT="90" \
46
+ TABLE_NAME="resource-db-dev" \
47
+ DDB_TO_ES_LAMBDA_ARN="<my-ddbToES-arn>" \
48
+ node --max-old-space-size=8192 replay-ddbToES.js
49
+ */
50
+
51
+ const dynamodbClient = new DynamoDBClient ( ) ;
52
+ const lambdaClient = new LambdaClient ( ) ;
53
+ const SEGMENT_COUNT = parseInt ( process . env . SEGMENT_COUNT ) ;
54
+ const TABLE_NAME = process . env . TABLE_NAME ;
55
+ const DDB_TO_ES_LAMBDA_ARN = process . env . DDB_TO_ES_LAMBDA_ARN ;
56
+ const SCAN_PAGE_LIMIT = parseInt ( process . env . SCAN_PAGE_LIMIT ) ; // 6x concurrent invokes/per segment
57
+ const idToGreatestVid = new Map ( ) ;
58
+
59
+ ( async ( ) => {
60
+ try {
61
+
62
+ const workerPromises = _ . range ( 0 , SEGMENT_COUNT ) . map ( ( segment ) => {
63
+ return new Promise ( async ( resolve , reject ) => {
64
+ try {
65
+ const paginator = paginateScan ( {
66
+ client : dynamodbClient ,
67
+ pageSize : SCAN_PAGE_LIMIT
68
+ } , {
69
+ TableName : TABLE_NAME ,
70
+ Segment : segment ,
71
+ TotalSegments : SEGMENT_COUNT ,
72
+ Limit : SCAN_PAGE_LIMIT
73
+ } ) ;
74
+ let i = 0 ;
75
+ const logSize = SCAN_PAGE_LIMIT * 100 ;
76
+ for await ( const page of paginator ) {
77
+ // find the greatest vid per id in this page
78
+ const itemsIdToVid = { } ;
79
+ _ . reduce (
80
+ _ . groupBy ( page . Items , 'id.S' ) ,
81
+ ( result , value , key ) => {
82
+ result [ key ] = parseInt ( value . sort ( ( a , b ) => {
83
+ return parseInt ( a . vid . N ) - parseInt ( b . vid . N ) ;
84
+ } ) [ 0 ] . vid . N ) ;
85
+
86
+ return result ;
87
+ } ,
88
+ itemsIdToVid
89
+ ) ;
90
+
91
+ // update our global
92
+ _ . keys ( itemsIdToVid ) . forEach ( ( key ) => {
93
+ const entry = idToGreatestVid . get ( key ) ;
94
+ if ( entry !== undefined ) {
95
+ if ( entry < itemsIdToVid [ key ] ) {
96
+ idToGreatestVid . set ( key , itemsIdToVid [ key ] ) ;
97
+ }
98
+ } else {
99
+ idToGreatestVid . set ( key , itemsIdToVid [ key ] ) ;
100
+ }
101
+ } ) ;
102
+
103
+ const items = page . Items . filter ( ( item ) => {
104
+ // best effort to filter out any writes < the last vid
105
+ return idToGreatestVid . get ( item . id . S ) <= parseInt ( item . vid . N ) ;
106
+ } ) . map ( ( item ) => {
107
+ // project to a ddb stream payload
108
+ return {
109
+ eventName : 'INSERT' ,
110
+ dynamodb : {
111
+ Keys : {
112
+ id : {
113
+ S : item . id . S
114
+ } ,
115
+ vid : {
116
+ N : item . vid . N
117
+ }
118
+ } ,
119
+ NewImage : item
120
+ }
121
+ }
122
+ } ) ;
123
+
124
+ // chunk by 15 since that's the max number of items for ddbToES lambda trigger
125
+ const chunks = _ . chunk ( items , 15 ) ;
126
+
127
+ // invoke the ddbToES lambda
128
+ const sendPromises = chunks . map ( ( chunk ) => {
129
+ const invokeRequest = new InvokeCommand ( {
130
+ Payload : JSON . stringify ( { Records :chunk } ) ,
131
+ FunctionName : DDB_TO_ES_LAMBDA_ARN ,
132
+ InvocationType : 'RequestResponse' ,
133
+ } ) ;
134
+
135
+ // bombs away
136
+ return lambdaClient . send ( invokeRequest ) ;
137
+ } ) ;
138
+
139
+ await Promise . all ( sendPromises ) ;
140
+
141
+ if ( i % logSize === 0 ) {
142
+ console . log ( `\tfinished ${ logSize } documents in segment ${ segment } ` ) ;
143
+ }
144
+ }
145
+
146
+ resolve ( ) ;
147
+ } catch ( err ) {
148
+ console . log ( 'error processing a page of scanned records' ) ;
149
+ console . error ( err ) ;
150
+
151
+ reject ( err ) ;
152
+ }
153
+ } ) ;
154
+
155
+ } ) ;
156
+
157
+ console . log ( 'starting to sync data from dynamodb to opensearch' ) ;
158
+
159
+ // start the various workers
160
+ await Promise . all ( workerPromises ) ;
161
+
162
+ console . log ( 'successfully syncd data from dynamodb to opensearch' ) ;
163
+ } catch ( err ) {
164
+ console . log ( 'Errors gumming up the works.' ) ;
165
+ console . error ( err ) ;
166
+ process . exit ( 1 ) ;
167
+ }
168
+ } ) ( ) ;
0 commit comments