Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.

Commit af490f5

Browse files
rubenhgawsvgkowski
andauthored
feat: Additional tasks for batchReplayer to enable reactive pipelines (#629)
* batchReplayer: additional tasks for reactive pipelines * updated e2e test with additionalStepFunctionTasks * Update unit tests for batch replayer additional tasks --------- Co-authored-by: Vincent Gromakowski <[email protected]>
1 parent 7b1dc17 commit af490f5

File tree

10 files changed

+1362
-1036
lines changed

10 files changed

+1362
-1036
lines changed

core/API.md

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/package.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/data-generator/batch-replayer.ts

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { SfnStateMachine } from 'aws-cdk-lib/aws-events-targets';
77
import { PolicyStatement } from 'aws-cdk-lib/aws-iam';
88
import { LayerVersion, Runtime } from 'aws-cdk-lib/aws-lambda';
99
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs';
10-
import { JsonPath, LogLevel, Map, StateMachine, TaskInput } from 'aws-cdk-lib/aws-stepfunctions';
10+
import { JsonPath, LogLevel, Map, StateMachine,
11+
TaskInput, INextable, IChainable } from 'aws-cdk-lib/aws-stepfunctions';
1112
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';
1213
import { Construct } from 'constructs';
1314
import { PreBundledFunction } from '../common/pre-bundled-function';
@@ -66,6 +67,34 @@ export interface BatchReplayerProps {
6667
* VPC for the WriteInBatch Lambda function
6768
*/
6869
readonly vpc?: IVpc;
70+
/**
71+
* Additional StupFunction Tasks to run sequentially after the BatchReplayer finishes
72+
* @default - The BatchReplayer do not have additional Tasks
73+
*
74+
* The expected input for the first Task in this sequence is:
75+
*
76+
* input = [
77+
* {
78+
* "processedRecords": Int,
79+
* "outputPaths": String [],
80+
* "startTimeinIso": String,
81+
* "endTimeinIso": String
82+
* }
83+
* ]
84+
*
85+
* Each element in input represents the output of each lambda iterator that replays the data.
86+
*
87+
* param: processedRecods -> Number of records processed
88+
* param: ouputPaths -> List of files created in S3
89+
* ** eg. "s3://<sinkBucket name>/<s3ObjectKeySink prefix, if any>/<dataset name>/ingestion_start=<timestamp>/ingestion_end=<timestamp>/<s3 filename>.csv",
90+
91+
* param: startTimeinIso -> Start Timestamp on original dataset
92+
* param: endTimeinIso -> End Timestamp on original dataset
93+
*
94+
* *outputPaths* can be used to extract and aggregate new partitions on data and
95+
* trigger additional Tasks.
96+
*/
97+
readonly additionalStepFunctionTasks?: IChainable [];
6998
}
7099

71100
/**
@@ -142,6 +171,13 @@ export class BatchReplayer extends Construct {
142171
*/
143172
public readonly vpc?: IVpc;
144173

174+
/**
175+
* Optional Sequence of additional Tasks to append at the end of the Step Function
176+
* that replays data that will execute after data has been replayed
177+
*/
178+
private readonly additionalStepFunctionTasks?: IChainable [];
179+
180+
145181
/**
146182
* Constructs a new instance of the BatchReplayer construct
147183
* @param {Construct} scope the Scope of the CDK Construct
@@ -153,6 +189,7 @@ export class BatchReplayer extends Construct {
153189

154190
this.dataset = props.dataset;
155191
this.frequency = props.frequency?.toSeconds() || 60;
192+
this.additionalStepFunctionTasks = props.additionalStepFunctionTasks;
156193

157194
// Properties for S3 target
158195
if (props.s3Props) {
@@ -337,7 +374,9 @@ export class BatchReplayer extends Construct {
337374

338375
// Overarching Step Function StateMachine
339376
const batchReplayStepFn = new StateMachine(this, 'BatchReplayStepFn', {
340-
definition: findFilePathsFnTask.next(writeInBatchMapTask),
377+
definition: this.chainStepFunctionTasks(
378+
findFilePathsFnTask.next(writeInBatchMapTask)
379+
),
341380
timeout: Duration.minutes(20),
342381
logs: {
343382
destination: new LogGroup(this, 'LogGroup', {
@@ -354,4 +393,18 @@ export class BatchReplayer extends Construct {
354393
targets: [new SfnStateMachine(batchReplayStepFn, {})],
355394
});
356395
}
396+
397+
private chainStepFunctionTasks(requiredTasks: IChainable & INextable) {
398+
399+
let base = requiredTasks;
400+
401+
if (this.additionalStepFunctionTasks) {
402+
403+
this.additionalStepFunctionTasks.forEach(newTask => {
404+
base = base.next(newTask)
405+
});
406+
}
407+
return base;
408+
}
409+
357410
}

core/test/e2e/batch-replayer.test.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
*/
99

1010
import { Bucket } from 'aws-cdk-lib/aws-s3';
11-
import { App, Stack, aws_ec2, aws_rds, aws_dynamodb, RemovalPolicy, CfnOutput } from 'aws-cdk-lib';
11+
import { App, Stack, aws_ec2, aws_rds, aws_stepfunctions_tasks,
12+
aws_dynamodb, RemovalPolicy, CfnOutput, Duration } from 'aws-cdk-lib';
1213
import { Cluster } from '@aws-cdk/aws-redshift-alpha';
1314
import { deployStack, destroyStack } from './utils';
1415

1516
import { BatchReplayer } from '../../src/data-generator/batch-replayer';
1617
import { IS3Sink, DynamoDbSink, DbSink } from '../../src/data-generator/batch-replayer-helpers';
1718
import { PreparedDataset } from '../../src/data-generator/prepared-dataset';
19+
import { PreBundledFunction } from '../../src/common/pre-bundled-function';
20+
import { Runtime } from 'aws-cdk-lib/aws-lambda';
21+
import { RetentionDays } from 'aws-cdk-lib/aws-logs';
1822

1923
jest.setTimeout(3000000);
2024
// GIVEN
@@ -66,6 +70,29 @@ const rdsPostgres = new aws_rds.DatabaseInstance(stack, 'PostgreSQL', {
6670
const rdsPostgresCreds = rdsPostgres.secret ? rdsPostgres.secret.secretArn : '';
6771
let rdsProps: DbSink = { table: defaultName, connection: rdsPostgresCreds, schema: defaultName, type: 'postgresql' };
6872

73+
/**
74+
* Lambda to be used as additional task
75+
*/
76+
const summaryOutputState = new PreBundledFunction(stack, 'WriteInBatch', {
77+
memorySize: 128,
78+
codePath: 'data-generator/resources/lambdas/summary-additional-task',
79+
runtime: Runtime.PYTHON_3_9,
80+
handler: 'summary-additional-task.handler',
81+
logRetention: RetentionDays.ONE_WEEK,
82+
timeout: Duration.minutes(1),
83+
});
84+
85+
/**
86+
* Additional Task
87+
*/
88+
const additionalTask = new aws_stepfunctions_tasks
89+
.LambdaInvoke(stack, "SummarizeOutput", {
90+
lambdaFunction: summaryOutputState,
91+
outputPath: "$",
92+
retryOnServiceExceptions: true
93+
});
94+
95+
6996
const batchReplayer = new BatchReplayer(stack, 'BatchReplay', {
7097
dataset: PreparedDataset.RETAIL_1_GB_STORE_SALE,
7198
s3Props: s3Props,
@@ -75,6 +102,7 @@ const batchReplayer = new BatchReplayer(stack, 'BatchReplay', {
75102
rdsProps: rdsProps,
76103
vpc: vpc,
77104
secGroup: secGroup,
105+
additionalStepFunctionTasks: [additionalTask]
78106
});
79107

80108
new BatchReplayer(stack, 'BatchReplay2', {
@@ -86,6 +114,7 @@ new BatchReplayer(stack, 'BatchReplay2', {
86114
rdsProps: rdsProps,
87115
vpc: vpc,
88116
secGroup: secGroup,
117+
additionalStepFunctionTasks: [additionalTask]
89118
});
90119

91120
new CfnOutput(stack, 'DatasetName', {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
os
2+
json
3+
logging
4+
typing
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
2+
import os
3+
import json
4+
import logging
5+
from typing import Set, Any, Dict
6+
7+
logging.basicConfig(level=logging.INFO)
8+
log = logging.getLogger()
9+
log.setLevel(os.getenv('LOG_LEVEL', logging.INFO))
10+
11+
default_value = 'NOTFOUND'
12+
13+
def get_dict_value(dictionary: dict, key: str, nullable:bool = False):
14+
val = dictionary.get(key, default_value)
15+
if not nullable and val == default_value:
16+
raise Exception(f"Key: {key} not found in {dictionary}")
17+
18+
log.info(f"Got parameter: {key}:{val}")
19+
20+
return val
21+
22+
def parseLambdaIterationOutput(event: dict)->Dict[str,Any]:
23+
processedRecords: int = event.get('processedRecords', 0)
24+
startTimeinIso: str = event.get('startTimeinIso', "")
25+
endTimeinIso: str = event.get('endTimeinIso', "")
26+
27+
return {
28+
"processedRecords": processedRecords,
29+
"startTimeinIso": startTimeinIso,
30+
"endTimeinIso": endTimeinIso,
31+
}
32+
33+
34+
35+
def handler(outputList, ctx):
36+
processedRecords: int = 0
37+
startTimeinIso: Set[str] = set()
38+
endTimeinIso: Set[str] = set()
39+
40+
for event in outputList:
41+
42+
resp = parseLambdaIterationOutput(event)
43+
44+
processedRecords += resp.get("processedRecords", 0)
45+
startTimeinIso.add(resp.get('startTimeinIso', ""))
46+
endTimeinIso .add(resp.get('endTimeinIso', ""))
47+
48+
49+
return {
50+
"processedRecords": processedRecords,
51+
"startTimeinIso": min(list(startTimeinIso)),
52+
"endTimeinIso": max(list(endTimeinIso)),
53+
}
54+
55+
# Test from local machine
56+
if __name__ == '__main__':
57+
event=[
58+
{
59+
"processedRecords": 122,
60+
"startTimeinIso": "2021-01-01T00:03:34",
61+
"endTimeinIso": "2021-01-01T00:13:34"
62+
},
63+
{
64+
"processedRecords": 389,
65+
"startTimeinIso": "2021-01-01T00:03:34",
66+
"endTimeinIso": "2021-01-01T00:13:34"
67+
},
68+
{
69+
"processedRecords": 221,
70+
"startTimeinIso": "2021-01-01T00:03:34",
71+
"endTimeinIso": "2021-01-01T00:13:34"
72+
}
73+
]
74+
result = handler(event,None)
75+
log.info(f'result = {json.dumps(result, indent=2)}')

core/test/unit/data-generator/batch-replayer.test.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { Cluster } from '@aws-cdk/aws-redshift-alpha';
1111
import {
1212
aws_dynamodb,
1313
aws_ec2,
14-
aws_rds, Duration,
14+
aws_rds, aws_stepfunctions_tasks, Duration,
1515
RemovalPolicy,
1616
Stack,
1717
} from 'aws-cdk-lib';
@@ -20,6 +20,9 @@ import { Template } from 'aws-cdk-lib/assertions';
2020
import { Bucket } from 'aws-cdk-lib/aws-s3';
2121
import { BatchReplayer, DbSink, DynamoDbSink, IS3Sink, PreparedDataset } from '../../../src';
2222
import { IVpc, SecurityGroup } from 'aws-cdk-lib/aws-ec2';
23+
import { PreBundledFunction } from '../../../src/common/pre-bundled-function';
24+
import { Runtime } from 'aws-cdk-lib/aws-lambda';
25+
import { RetentionDays } from 'aws-cdk-lib/aws-logs';
2326

2427
let testStack: Stack;
2528
let s3Props: IS3Sink;
@@ -32,6 +35,7 @@ let rdsProps: DbSink;
3235
let defaultName = 'test';
3336
let vpc: IVpc;
3437
let secGroup: SecurityGroup;
38+
const expectedAdditionalTaskName = "SummarizeOutput"
3539

3640
beforeEach(() => {
3741
testStack = new Stack();
@@ -138,4 +142,65 @@ test("BatchReplayer should create a step function", () => {
138142
template.resourceCountIs("AWS::StepFunctions::StateMachine", 1);
139143
});
140144

145+
test("BatchReplayer should not have additionalStepFunctionTasks", ()=> {
146+
147+
const resources = template.toJSON().Resources
148+
const stepFnDefinition:any[] = resources.TestBatchReplayerBatchReplayStepFnBB59B3E9.Properties
149+
.DefinitionString["Fn::Join"][1]
150+
151+
// filter strings that contains : expectedAdditionalTaskName
152+
const found = stepFnDefinition.map<String>((a) => (typeof a === 'string' || a instanceof String) ? a : "")
153+
.filter((a:String)=> a.includes(expectedAdditionalTaskName))
154+
155+
expect(found.length).toBe(0)
156+
157+
});
158+
159+
test("BatchReplayer should have one additionalStepFunctionTask : expectedAdditionalTaskName", ()=> {
160+
const testStack2 = new Stack();
161+
162+
/**
163+
* Lambda to be used as additional task
164+
*/
165+
const summaryOutputState = new PreBundledFunction(testStack2, 'SummaryFn', {
166+
memorySize: 128,
167+
codePath: '../test/unit/data-generator/resources/lambdas/summary-additional-task',
168+
runtime: Runtime.PYTHON_3_9,
169+
handler: 'summary-additional-task.handler',
170+
logRetention: RetentionDays.ONE_WEEK,
171+
timeout: Duration.minutes(1),
172+
});
173+
174+
/**
175+
* Additional Task
176+
*/
177+
const additionalTask = new aws_stepfunctions_tasks
178+
.LambdaInvoke(testStack2, expectedAdditionalTaskName, {
179+
lambdaFunction: summaryOutputState,
180+
outputPath: "$",
181+
retryOnServiceExceptions: true
182+
});
183+
184+
const bucket2 = new Bucket(testStack2, 'Bucket');
185+
const s3Props2 = { sinkBucket: bucket2 };
186+
187+
new BatchReplayer(testStack2, 'TestBatchReplayer', {
188+
dataset: PreparedDataset.RETAIL_1_GB_WEB_SALE,
189+
frequency: Duration.seconds(120),
190+
s3Props: s3Props2,
191+
additionalStepFunctionTasks: [additionalTask]
192+
});
193+
194+
const template2 = Template.fromStack(testStack2);
195+
196+
const resources = template2.toJSON().Resources
197+
const stepFnDefinition: any[] = resources.TestBatchReplayerBatchReplayStepFnBB59B3E9.Properties
198+
.DefinitionString["Fn::Join"][1]
199+
200+
// filter strings that contains : expectedAdditionalTaskName
201+
const found = stepFnDefinition.map<String>((a) => (typeof a === 'string' || a instanceof String) ? a : "")
202+
.filter((a:String)=> a.includes(expectedAdditionalTaskName))
203+
204+
expect(found.length > 0).toBeTruthy()
205+
});
141206

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
os
2+
json
3+
logging
4+
typing

0 commit comments

Comments
 (0)