Skip to content

Commit 0de9a12

Browse files
committed
better close support
1 parent 02ed4e8 commit 0de9a12

File tree

1 file changed

+53
-44
lines changed

1 file changed

+53
-44
lines changed

Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift

+53-44
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
390390
enum State {
391391
case disconnected
392392
case connected(ChannelHandlerContext, LambdaState)
393+
case closing
393394

394395
enum LambdaState {
395396
/// this is the "normal" state. Transitions to `waitingForNextInvocation`
@@ -402,7 +403,6 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
402403
case waitingForResponse
403404
case sendingResponse
404405
case sentResponse(CheckedContinuation<Void, any Error>)
405-
case closing
406406
}
407407
}
408408

@@ -426,11 +426,11 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
426426
self.sendNextRequest(context: context)
427427
}
428428

429-
case .connected(_, .closing),
430-
.connected(_, .sendingResponse),
429+
case .connected(_, .sendingResponse),
431430
.connected(_, .sentResponse),
432431
.connected(_, .waitingForNextInvocation),
433-
.connected(_, .waitingForResponse):
432+
.connected(_, .waitingForResponse),
433+
.closing:
434434
fatalError("Invalid state: \(self.state)")
435435

436436
case .disconnected:
@@ -475,7 +475,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
475475
case .disconnected:
476476
throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost)
477477

478-
case .connected(_, .closing):
478+
case .closing:
479479
throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway)
480480
}
481481
}
@@ -503,7 +503,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
503503
case .disconnected:
504504
throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost)
505505

506-
case .connected(_, .closing):
506+
case .closing:
507507
throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway)
508508
}
509509
}
@@ -536,7 +536,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
536536
case .disconnected:
537537
throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost)
538538

539-
case .connected(_, .closing):
539+
case .closing:
540540
throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway)
541541
}
542542
}
@@ -681,20 +681,61 @@ extension LambdaChannelHandler: ChannelInboundHandler {
681681
self.state = .connected(context, .idle)
682682
case .connected:
683683
break
684+
case .closing:
685+
fatalError("Invalid state: \(self.state)")
684686
}
685687
}
686688

687689
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
688690
let response = unwrapInboundIn(data)
689691

692+
// As defined in RFC 7230 Section 6.3:
693+
// HTTP/1.1 defaults to the use of "persistent connections", allowing
694+
// multiple requests and responses to be carried over a single
695+
// connection. The "close" connection option is used to signal that a
696+
// connection will not persist after the current request/response. HTTP
697+
// implementations SHOULD support persistent connections.
698+
//
699+
// That's why we only assume the connection shall be closed if we receive
700+
// a "connection = close" header.
701+
let serverCloseConnection =
702+
response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
703+
704+
let closeConnection = serverCloseConnection || response.head.version != .http1_1
705+
706+
if closeConnection {
707+
// If we were succeeding the request promise here directly and closing the connection
708+
// after succeeding the promise we may run into a race condition:
709+
//
710+
// The lambda runtime will ask for the next work item directly after a succeeded post
711+
// response request. The desire for the next work item might be faster than the attempt
712+
// to close the connection. This will lead to a situation where we try to the connection
713+
// but the next request has already been scheduled on the connection that we want to
714+
// close. For this reason we postpone succeeding the promise until the connection has
715+
// been closed. This codepath will only be hit in the very, very unlikely event of the
716+
// Lambda control plane demanding to close connection. (It's more or less only
717+
// implemented to support http1.1 correctly.) This behavior is ensured with the test
718+
// `LambdaTest.testNoKeepAliveServer`.
719+
self.state = .closing
720+
self.delegate.connectionWillClose(channel: context.channel)
721+
context.close(promise: nil)
722+
} else {
723+
self.state = .connected(context, .idle)
724+
}
725+
726+
// handle response content
727+
690728
switch self.state {
691729
case .connected(let context, .waitingForNextInvocation(let continuation)):
692730
do {
693731
let metadata = try InvocationMetadata(headers: response.head.headers)
694732
self.state = .connected(context, .waitingForResponse)
695733
continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer()))
696734
} catch {
697-
self.state = .connected(context, .closing)
735+
self.state = .closing
736+
737+
self.delegate.connectionWillClose(channel: context.channel)
738+
context.close(promise: nil)
698739
continuation.resume(
699740
throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error)
700741
)
@@ -704,46 +745,14 @@ extension LambdaChannelHandler: ChannelInboundHandler {
704745
if response.head.status == .accepted {
705746
self.state = .connected(context, .idle)
706747
continuation.resume()
748+
} else {
749+
self.state = .connected(context, .idle)
750+
continuation.resume(throwing: NewLambdaRuntimeError(code: .unexpectedStatusCodeForRequest))
707751
}
708752

709-
case .disconnected, .connected(_, _):
753+
case .disconnected, .closing, .connected(_, _):
710754
break
711755
}
712-
713-
// // As defined in RFC 7230 Section 6.3:
714-
// // HTTP/1.1 defaults to the use of "persistent connections", allowing
715-
// // multiple requests and responses to be carried over a single
716-
// // connection. The "close" connection option is used to signal that a
717-
// // connection will not persist after the current request/response. HTTP
718-
// // implementations SHOULD support persistent connections.
719-
// //
720-
// // That's why we only assume the connection shall be closed if we receive
721-
// // a "connection = close" header.
722-
// let serverCloseConnection =
723-
// response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
724-
//
725-
// let closeConnection = serverCloseConnection || response.head.version != .http1_1
726-
//
727-
// if closeConnection {
728-
// // If we were succeeding the request promise here directly and closing the connection
729-
// // after succeeding the promise we may run into a race condition:
730-
// //
731-
// // The lambda runtime will ask for the next work item directly after a succeeded post
732-
// // response request. The desire for the next work item might be faster than the attempt
733-
// // to close the connection. This will lead to a situation where we try to the connection
734-
// // but the next request has already been scheduled on the connection that we want to
735-
// // close. For this reason we postpone succeeding the promise until the connection has
736-
// // been closed. This codepath will only be hit in the very, very unlikely event of the
737-
// // Lambda control plane demanding to close connection. (It's more or less only
738-
// // implemented to support http1.1 correctly.) This behavior is ensured with the test
739-
// // `LambdaTest.testNoKeepAliveServer`.
740-
// self.state = .waitForConnectionClose(httpResponse, promise)
741-
// _ = context.channel.close()
742-
// return
743-
// } else {
744-
// self.state = .idle
745-
// promise.succeed(httpResponse)
746-
// }
747756
}
748757

749758
func errorCaught(context: ChannelHandlerContext, error: Error) {

0 commit comments

Comments
 (0)