Skip to content

Commit 473c283

Browse files
committed
wip - support streamable
1 parent bf2385a commit 473c283

File tree

1 file changed

+38
-15
lines changed

1 file changed

+38
-15
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -243,22 +243,36 @@ internal struct LambdaHTTPServer {
243243
requestHead = head
244244

245245
case .body(let body):
246-
requestBody.setOrWriteImmutableBuffer(body)
246+
precondition(requestHead != nil, "Received .body without .head")
247+
248+
// if this is a request from a Streaming Lambda Handler,
249+
// stream the response instead of buffering it
250+
if self.isStreamingResponse(requestHead) {
251+
// we are receiving a chunked body,
252+
// we can stream the response and not accumulate the chunks
253+
print(String(buffer: body))
254+
} else {
255+
requestBody.setOrWriteImmutableBuffer(body)
256+
}
247257

248258
case .end:
249259
precondition(requestHead != nil, "Received .end without .head")
250-
// process the request
251-
let response = try await self.processRequest(
252-
head: requestHead,
253-
body: requestBody,
254-
logger: logger
255-
)
256-
// send the responses
257-
try await self.sendResponse(
258-
response: response,
259-
outbound: outbound,
260-
logger: logger
261-
)
260+
261+
// process the buffered response for non streaming requests
262+
if !self.isStreamingResponse(requestHead) {
263+
// process the complete request
264+
let response = try await self.processCompleteRequest(
265+
head: requestHead,
266+
body: requestBody,
267+
logger: logger
268+
)
269+
// send the responses
270+
try await self.sendCompleteResponse(
271+
response: response,
272+
outbound: outbound,
273+
logger: logger
274+
)
275+
}
262276

263277
requestHead = nil
264278
requestBody = nil
@@ -273,6 +287,15 @@ internal struct LambdaHTTPServer {
273287
}
274288
}
275289

290+
/// This function checks if the request is a streaming response request
291+
/// verb = POST, uri = :requestID/response, HTTP Header contains "Transfer-Encoding: chunked"
292+
private func isStreamingResponse(_ requestHead: HTTPRequestHead) -> Bool {
293+
requestHead.method == .POST &&
294+
requestHead.uri.hasSuffix(Consts.postResponseURLSuffix) &&
295+
requestHead.headers.contains(name: "Transfer-Encoding") &&
296+
requestHead.headers["Transfer-Encoding"].contains("chunked")
297+
}
298+
276299
/// This function process the URI request sent by the client and by the Lambda function
277300
///
278301
/// It enqueues the client invocation and iterate over the invocation queue when the Lambda function sends /next request
@@ -283,7 +306,7 @@ internal struct LambdaHTTPServer {
283306
/// - body: the HTTP request body
284307
/// - Throws:
285308
/// - Returns: the response to send back to the client or the Lambda function
286-
private func processRequest(
309+
private func processCompleteRequest(
287310
head: HTTPRequestHead,
288311
body: ByteBuffer?,
289312
logger: Logger
@@ -406,7 +429,7 @@ internal struct LambdaHTTPServer {
406429
}
407430
}
408431

409-
private func sendResponse(
432+
private func sendCompleteResponse(
410433
response: LocalServerResponse,
411434
outbound: NIOAsyncChannelOutboundWriter<HTTPServerResponsePart>,
412435
logger: Logger

0 commit comments

Comments
 (0)