@@ -238,7 +238,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
238
238
guard case . sentResponse( requestID) = self . lambdaState else {
239
239
fatalError ( " Invalid state: \( self . lambdaState) " )
240
240
}
241
- try await handler. finishResponseRequest ( finalData: buffer, requestID: requestID)
241
+ try await handler. finishResponseRequest ( userHeaders : self . userHeaders , finalData: buffer, requestID: requestID)
242
242
guard case . sentResponse( requestID) = self . lambdaState else {
243
243
fatalError ( " Invalid state: \( self . lambdaState) " )
244
244
}
@@ -496,9 +496,11 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
496
496
" lambda-runtime-function-error-type " : " Unhandled " ,
497
497
]
498
498
self . streamingHeaders = [
499
- " host " : " \( self . configuration. ip) : \( self . configuration. port) " ,
499
+ " Host " : " \( self . configuration. ip) : \( self . configuration. port) " ,
500
500
" user-agent " : . userAgent,
501
- " transfer-encoding " : " chunked " ,
501
+ // "Content-type": "application/vnd.awslambda.http-integration-response",
502
+ // "Transfer-encoding": "chunked",
503
+ // "Lambda-Runtime-Function-Response-Mode": "streaming",
502
504
]
503
505
}
504
506
@@ -596,6 +598,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
596
598
597
599
func finishResponseRequest(
598
600
isolation: isolated ( any Actor ) ? = #isolation,
601
+ userHeaders: HTTPHeaders ,
599
602
finalData: ByteBuffer ? ,
600
603
requestID: String
601
604
) async throws {
@@ -607,13 +610,13 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
607
610
case . connected( let context, . waitingForResponse) :
608
611
try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < Void , any Error > ) in
609
612
self . state = . connected( context, . sentResponse( continuation) )
610
- self . sendResponseFinish ( finalData, sendHeadWithRequestID: requestID, context: context)
613
+ self . sendResponseFinish ( userHeaders , finalData, sendHeadWithRequestID: requestID, context: context)
611
614
}
612
615
613
616
case . connected( let context, . sendingResponse) :
614
617
try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < Void , any Error > ) in
615
618
self . state = . connected( context, . sentResponse( continuation) )
616
- self . sendResponseFinish ( finalData, sendHeadWithRequestID: nil , context: context)
619
+ self . sendResponseFinish ( userHeaders , finalData, sendHeadWithRequestID: nil , context: context)
617
620
}
618
621
619
622
case . connected( _, . sentResponse) :
@@ -639,14 +642,15 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
639
642
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length
640
643
let url = Consts . invocationURLPrefix + " / " + requestID + Consts. postResponseURLSuffix
641
644
642
- var responseHeaders = self . streamingHeaders
643
- responseHeaders. add ( contentsOf: userHeaders)
644
- logger. trace ( " sendResponseBodyPart : ========== Sending response headers: \( responseHeaders) " )
645
+ var headers = HTTPHeaders ( )
646
+ headers. add ( contentsOf: userHeaders)
647
+ headers. add ( contentsOf: self . streamingHeaders)
648
+ logger. trace ( " sendResponseBodyPart : ========== Sending response headers: \( headers) " )
645
649
let httpRequest = HTTPRequestHead (
646
650
version: . http1_1,
647
651
method: . POST,
648
652
uri: url,
649
- headers: responseHeaders
653
+ headers: headers // FIXME these are the headers returned to the control plane. I'm not sure if we should use the streaming headers here
650
654
)
651
655
652
656
context. write ( self . wrapOutboundOut ( . head( httpRequest) ) , promise: nil )
@@ -659,6 +663,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
659
663
660
664
private func sendResponseFinish(
661
665
isolation: isolated ( any Actor ) ? = #isolation,
666
+ _ userHeaders: HTTPHeaders ,
662
667
_ byteBuffer: ByteBuffer ? ,
663
668
sendHeadWithRequestID: String ? ,
664
669
context: ChannelHandlerContext
@@ -669,7 +674,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
669
674
670
675
// If we have less than 6MB, we don't want to use the streaming API. If we have more
671
676
// than 6MB we must use the streaming mode.
672
- let headers : HTTPHeaders =
677
+ var headers : HTTPHeaders =
673
678
if byteBuffer? . readableBytes ?? 0 < 6_000_000 {
674
679
[
675
680
" host " : " \( self . configuration. ip) : \( self . configuration. port) " ,
@@ -679,7 +684,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
679
684
} else {
680
685
self . streamingHeaders
681
686
}
682
-
687
+ headers . add ( contentsOf : userHeaders )
683
688
logger. trace ( " sendResponseFinish : ========== Sending response headers: \( headers) " )
684
689
let httpRequest = HTTPRequestHead (
685
690
version: . http1_1,
0 commit comments