Skip to content

Commit 039d4f3

Browse files
committed
Add mock client + test for runLoop
1 parent d496cbb commit 039d4f3

File tree

2 files changed

+302
-0
lines changed

2 files changed

+302
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import AWSLambdaRuntimeCore
16+
import Foundation
17+
import Logging
18+
import NIOCore
19+
20+
package struct LambdaMockWriter: LambdaResponseStreamWriter {
21+
var underlying: LambdaMockClient
22+
23+
package init(underlying: LambdaMockClient) {
24+
self.underlying = underlying
25+
}
26+
27+
package mutating func write(_ buffer: ByteBuffer) async throws {
28+
try await self.underlying.write(buffer)
29+
}
30+
31+
package consuming func finish() async throws {
32+
try await self.underlying.finish()
33+
}
34+
35+
package consuming func writeAndFinish(_ buffer: ByteBuffer) async throws {
36+
try await self.write(buffer)
37+
try await self.finish()
38+
}
39+
40+
package func reportError(_ error: any Error) async throws {
41+
}
42+
}
43+
44+
package struct LambdaError: Error, Equatable {
45+
private enum Code: Equatable {
46+
case cannotCallNextEndpointWhenAlreadyWaitingForEvent
47+
case cannotCallNextEndpointWhenAlreadyProcessingAnEvent
48+
case cannotReportResultWhenNoEventHasBeenProcessed
49+
}
50+
51+
private let code: Code
52+
53+
private init(code: Code) {
54+
self.code = code
55+
}
56+
57+
package func shortDescription() -> String {
58+
switch self.code {
59+
case .cannotCallNextEndpointWhenAlreadyWaitingForEvent:
60+
"Cannot call the next endpoint when already waiting for an event"
61+
case .cannotCallNextEndpointWhenAlreadyProcessingAnEvent:
62+
"Cannot call the next endpoint when an event is already being processed"
63+
case .cannotReportResultWhenNoEventHasBeenProcessed:
64+
"Cannot report a result when no event has been processed"
65+
}
66+
}
67+
68+
package static let cannotCallNextEndpointWhenAlreadyWaitingForEvent = LambdaError(
69+
code: .cannotCallNextEndpointWhenAlreadyWaitingForEvent
70+
)
71+
package static let cannotCallNextEndpointWhenAlreadyProcessingAnEvent = LambdaError(
72+
code: .cannotCallNextEndpointWhenAlreadyProcessingAnEvent
73+
)
74+
package static let cannotReportResultWhenNoEventHasBeenProcessed = LambdaError(
75+
code: .cannotReportResultWhenNoEventHasBeenProcessed
76+
)
77+
}
78+
79+
package final actor LambdaMockClient: LambdaRuntimeClientProtocol {
80+
package typealias Writer = LambdaMockWriter
81+
82+
private struct StateMachine {
83+
private enum State {
84+
// The Lambda has just started, or an event has finished processing and the runtime is ready to receive more events.
85+
// Expecting a next() call by the runtime.
86+
case initialState
87+
88+
// The next endpoint has been called but no event has arrived yet.
89+
case waitingForNextEvent(eventArrivedHandler: CheckedContinuation<Invocation, any Error>)
90+
91+
// The handler is processing the event. Buffers written to the writer are accumulated.
92+
case handlerIsProcessing(
93+
accumulatedResponse: [ByteBuffer],
94+
eventProcessedHandler: CheckedContinuation<ByteBuffer, any Error>
95+
)
96+
}
97+
98+
private var state: State = .initialState
99+
100+
// Queue incoming events if the runtime is busy handling an event.
101+
private var eventQueue = [Event]()
102+
103+
enum InvokeAction {
104+
// The next endpoint is waiting for an event. Deliver this newly arrived event to it.
105+
case readyToProcess(_ eventArrivedHandler: CheckedContinuation<Invocation, any Error>)
106+
107+
// The next endpoint has not been called yet. This event has been added to the queue.
108+
case wait
109+
}
110+
111+
enum NextAction {
112+
// There is an event available to be processed.
113+
case readyToProcess(Invocation)
114+
115+
// No events available yet. Wait for an event to arrive.
116+
case wait
117+
118+
case fail(LambdaError)
119+
}
120+
121+
enum ResultAction {
122+
case readyForMore
123+
124+
case fail(LambdaError)
125+
}
126+
127+
mutating func next(_ eventArrivedHandler: CheckedContinuation<Invocation, any Error>) -> NextAction {
128+
switch self.state {
129+
case .initialState:
130+
if self.eventQueue.isEmpty {
131+
// No event available yet -- store the continuation for the next invoke() call.
132+
self.state = .waitingForNextEvent(eventArrivedHandler: eventArrivedHandler)
133+
return .wait
134+
} else {
135+
// An event is already waiting to be processed
136+
let event = self.eventQueue.removeFirst() // TODO: use Deque
137+
138+
self.state = .handlerIsProcessing(
139+
accumulatedResponse: [],
140+
eventProcessedHandler: event.eventProcessedHandler
141+
)
142+
return .readyToProcess(event.invocation)
143+
}
144+
case .waitingForNextEvent:
145+
return .fail(.cannotCallNextEndpointWhenAlreadyWaitingForEvent)
146+
case .handlerIsProcessing:
147+
return .fail(.cannotCallNextEndpointWhenAlreadyProcessingAnEvent)
148+
}
149+
}
150+
151+
mutating func invoke(_ event: Event) -> InvokeAction {
152+
switch self.state {
153+
case .initialState, .handlerIsProcessing:
154+
// next() hasn't been called yet. Add to the event queue.
155+
self.eventQueue.append(event)
156+
return .wait
157+
case .waitingForNextEvent(let eventArrivedHandler):
158+
// The runtime is already waiting for an event
159+
self.state = .handlerIsProcessing(
160+
accumulatedResponse: [],
161+
eventProcessedHandler: event.eventProcessedHandler
162+
)
163+
return .readyToProcess(eventArrivedHandler)
164+
}
165+
}
166+
167+
mutating func writeResult(buffer: ByteBuffer) -> ResultAction {
168+
switch self.state {
169+
case .handlerIsProcessing(var accumulatedResponse, let eventProcessedHandler):
170+
accumulatedResponse.append(buffer)
171+
self.state = .handlerIsProcessing(
172+
accumulatedResponse: accumulatedResponse,
173+
eventProcessedHandler: eventProcessedHandler
174+
)
175+
return .readyForMore
176+
case .initialState, .waitingForNextEvent:
177+
return .fail(.cannotReportResultWhenNoEventHasBeenProcessed)
178+
}
179+
}
180+
181+
mutating func finish() throws {
182+
switch self.state {
183+
case .handlerIsProcessing(let accumulatedResponse, let eventProcessedHandler):
184+
let finalResult: ByteBuffer = accumulatedResponse.reduce(ByteBuffer()) { (accumulated, current) in
185+
var accumulated = accumulated
186+
accumulated.writeBytes(current.readableBytesView)
187+
return accumulated
188+
}
189+
190+
eventProcessedHandler.resume(returning: finalResult)
191+
// reset back to the initial state
192+
self.state = .initialState
193+
case .initialState, .waitingForNextEvent:
194+
throw LambdaError.cannotReportResultWhenNoEventHasBeenProcessed
195+
}
196+
}
197+
}
198+
199+
private var stateMachine: StateMachine = .init()
200+
201+
struct Event {
202+
let invocation: Invocation
203+
let eventProcessedHandler: CheckedContinuation<ByteBuffer, any Error>
204+
}
205+
206+
package func invoke(event: ByteBuffer) async throws -> ByteBuffer {
207+
try await withCheckedThrowingContinuation { eventProcessedHandler in
208+
do {
209+
let metadata = try InvocationMetadata(
210+
headers: .init([
211+
("Lambda-Runtime-Aws-Request-Id", "100"), // arbitrary values
212+
("Lambda-Runtime-Deadline-Ms", "100"),
213+
("Lambda-Runtime-Invoked-Function-Arn", "100"),
214+
])
215+
)
216+
let invocation = Invocation(metadata: metadata, event: event)
217+
218+
let invokeAction = self.stateMachine.invoke(
219+
Event(
220+
invocation: invocation,
221+
eventProcessedHandler: eventProcessedHandler
222+
)
223+
)
224+
225+
switch invokeAction {
226+
case .readyToProcess(let eventArrivedHandler):
227+
// nextInvocation had been called earlier and is currently waiting for an event; deliver
228+
eventArrivedHandler.resume(returning: invocation)
229+
case .wait:
230+
// The event has been added to the event queue; wait for it to be picked up
231+
break
232+
}
233+
} catch {
234+
eventProcessedHandler.resume(throwing: error)
235+
}
236+
}
237+
}
238+
239+
package func nextInvocation() async throws -> (Invocation, Writer) {
240+
let invocation = try await withCheckedThrowingContinuation { eventArrivedHandler in
241+
switch self.stateMachine.next(eventArrivedHandler) {
242+
case .readyToProcess(let event):
243+
eventArrivedHandler.resume(returning: event)
244+
case .fail(let error):
245+
eventArrivedHandler.resume(throwing: error)
246+
case .wait:
247+
break
248+
}
249+
}
250+
251+
return (invocation, Writer(underlying: self))
252+
}
253+
254+
package func write(_ buffer: ByteBuffer) async throws {
255+
switch self.stateMachine.writeResult(buffer: buffer) {
256+
case .readyForMore:
257+
break
258+
case .fail(let error):
259+
throw error
260+
}
261+
}
262+
263+
package func finish() async throws {
264+
try self.stateMachine.finish()
265+
}
266+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import Foundation
2+
import Logging
3+
import NIOCore
4+
import Testing
5+
6+
@testable import AWSLambdaRuntimeCore
7+
8+
struct LambdaRunLoopTests {
9+
struct MockEchoHandler: StreamingLambdaHandler {
10+
func handle(
11+
_ event: ByteBuffer,
12+
responseWriter: some LambdaResponseStreamWriter,
13+
context: NewLambdaContext
14+
) async throws {
15+
try await responseWriter.writeAndFinish(event)
16+
}
17+
}
18+
19+
let mockClient = LambdaMockClient()
20+
let mockEchoHandler = MockEchoHandler()
21+
22+
@Test func testRunLoop() async throws {
23+
_ = Task { () in
24+
try await Lambda.runLoop(
25+
runtimeClient: self.mockClient,
26+
handler: self.mockEchoHandler,
27+
logger: Logger(label: "RunLoopTest")
28+
)
29+
}
30+
31+
let inputEvent = ByteBuffer(string: "Test Invocation Event")
32+
let response = try await self.mockClient.invoke(event: inputEvent)
33+
34+
#expect(response == inputEvent)
35+
}
36+
}

0 commit comments

Comments
 (0)