Skip to content

Commit 19590e4

Browse files
committed
initial commit
1 parent b2811a5 commit 19590e4

File tree

6 files changed

+44
-8
lines changed

6 files changed

+44
-8
lines changed

Examples/Streaming/Package.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
// swift-tools-version:6.0
1+
// swift-tools-version:6.1
22

33
import PackageDescription
4-
54
// needed for CI to test the local version of the library
65
import struct Foundation.URL
76

Examples/Streaming/Sources/main.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,24 @@
1414

1515
import AWSLambdaRuntime
1616
import NIOCore
17+
import NIOHTTP1
1718

1819
struct SendNumbersWithPause: StreamingLambdaHandler {
1920
func handle(
2021
_ event: ByteBuffer,
2122
responseWriter: some LambdaResponseStreamWriter,
2223
context: LambdaContext
2324
) async throws {
25+
context.logger.info("Received event: \(event)")
26+
try await responseWriter.writeHeaders(
27+
HTTPHeaders([
28+
("X-Example-Header", "This is an example header")
29+
])
30+
)
31+
32+
try await responseWriter.write(
33+
ByteBuffer(string: "Starting to send numbers with a pause...\n")
34+
)
2435
for i in 1...10 {
2536
// Send partial data
2637
try await responseWriter.write(ByteBuffer(string: "\(i)\n"))

Examples/Streaming/template.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ Resources:
1414
MemorySize: 128
1515
Architectures:
1616
- arm64
17+
Environment:
18+
Variables:
19+
LOG_LEVEL: trace
1720
FunctionUrlConfig:
1821
AuthType: AWS_IAM
1922
InvokeMode: RESPONSE_STREAM

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ internal struct LambdaHTTPServer {
248248
case .end:
249249
precondition(requestHead != nil, "Received .end without .head")
250250
// process the request
251+
// FIXME: this do not support response streaming
251252
let response = try await self.processRequest(
252253
head: requestHead,
253254
body: requestBody,

Sources/AWSLambdaRuntime/LambdaHandlers.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import NIOCore
16+
import NIOHTTP1
1617

1718
/// The base handler protocol that receives a `ByteBuffer` representing the incoming event and returns the response as a `ByteBuffer` too.
1819
/// This handler protocol supports response streaming. Bytes can be streamed outwards through the ``LambdaResponseStreamWriter``
@@ -46,6 +47,10 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
4647
/// A writer object to write the Lambda response stream into. The HTTP response is started lazily.
4748
/// before the first call to ``write(_:)`` or ``writeAndFinish(_:)``.
4849
public protocol LambdaResponseStreamWriter {
50+
/// Write the headers parts of the stream. This allows client to set headers before the first response part is written.
51+
/// - Parameter buffer: The buffer to write.
52+
func writeHeaders(_ headers: HTTPHeaders) async throws
53+
4954
/// Write a response part into the stream. Bytes written are streamed continually.
5055
/// - Parameter buffer: The buffer to write.
5156
func write(_ buffer: ByteBuffer) async throws

Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
4242
self.runtimeClient = runtimeClient
4343
}
4444

45+
@usableFromInline
46+
func writeHeaders(_ headers: HTTPHeaders) async throws {
47+
try await self.runtimeClient.appendHeaders(headers)
48+
}
49+
4550
@usableFromInline
4651
func write(_ buffer: NIOCore.ByteBuffer) async throws {
4752
try await self.runtimeClient.write(buffer)
@@ -188,6 +193,13 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
188193
}
189194
}
190195

196+
// we can use a var here because we are always isolated to this actor
197+
private var userHeaders = HTTPHeaders()
198+
private func appendHeaders(_ headers: HTTPHeaders) async throws {
199+
// buffer the data to send them when we will send the headers
200+
userHeaders.add(contentsOf: headers)
201+
}
202+
191203
private func write(_ buffer: NIOCore.ByteBuffer) async throws {
192204
switch self.lambdaState {
193205
case .idle, .sentResponse:
@@ -205,7 +217,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
205217
guard case .sendingResponse(requestID) = self.lambdaState else {
206218
fatalError("Invalid state: \(self.lambdaState)")
207219
}
208-
return try await handler.writeResponseBodyPart(buffer, requestID: requestID)
220+
return try await handler.writeResponseBodyPart(self.userHeaders, buffer, requestID: requestID)
209221
}
210222
}
211223

@@ -555,6 +567,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
555567

556568
func writeResponseBodyPart(
557569
isolation: isolated (any Actor)? = #isolation,
570+
_ userHeaders: HTTPHeaders,
558571
_ byteBuffer: ByteBuffer,
559572
requestID: String
560573
) async throws {
@@ -564,10 +577,10 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
564577

565578
case .connected(let context, .waitingForResponse):
566579
self.state = .connected(context, .sendingResponse)
567-
try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: requestID, context: context)
580+
try await self.sendResponseBodyPart(userHeaders, byteBuffer, sendHeadWithRequestID: requestID, context: context)
568581

569582
case .connected(let context, .sendingResponse):
570-
try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: nil, context: context)
583+
try await self.sendResponseBodyPart(userHeaders, byteBuffer, sendHeadWithRequestID: nil, context: context)
571584

572585
case .connected(_, .idle),
573586
.connected(_, .sentResponse):
@@ -616,6 +629,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
616629

617630
private func sendResponseBodyPart(
618631
isolation: isolated (any Actor)? = #isolation,
632+
_ userHeaders: HTTPHeaders,
619633
_ byteBuffer: ByteBuffer,
620634
sendHeadWithRequestID: String?,
621635
context: ChannelHandlerContext
@@ -625,13 +639,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
625639
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length
626640
let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix
627641

642+
var responseHeaders = self.streamingHeaders
643+
responseHeaders.add(contentsOf: userHeaders)
644+
logger.trace("sendResponseBodyPart : ========== Sending response headers: \(responseHeaders)")
628645
let httpRequest = HTTPRequestHead(
629646
version: .http1_1,
630647
method: .POST,
631648
uri: url,
632-
headers: self.streamingHeaders
649+
headers: responseHeaders
633650
)
634-
651+
635652
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
636653
}
637654

@@ -663,13 +680,13 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
663680
self.streamingHeaders
664681
}
665682

683+
logger.trace("sendResponseFinish : ========== Sending response headers: \(headers)")
666684
let httpRequest = HTTPRequestHead(
667685
version: .http1_1,
668686
method: .POST,
669687
uri: url,
670688
headers: headers
671689
)
672-
673690
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
674691
}
675692

0 commit comments

Comments
 (0)