@@ -25,7 +25,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
25
25
var port : Int
26
26
}
27
27
28
- struct Writer : LambdaResponseStreamWriter {
28
+ struct Writer : LambdaRuntimeClientResponseStreamWriter {
29
29
private var runtimeClient : NewLambdaRuntimeClient
30
30
31
31
fileprivate init ( runtimeClient: NewLambdaRuntimeClient ) {
@@ -71,6 +71,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
71
71
enum ClosingState {
72
72
case notClosing
73
73
case closing( CheckedContinuation < Void , Never > )
74
+ case closed
74
75
}
75
76
76
77
private let eventLoop : any EventLoop
@@ -121,20 +122,20 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
121
122
122
123
switch self . connectionState {
123
124
case . disconnected:
124
- break
125
+ if self . closingConnections. isEmpty {
126
+ return continuation. resume ( )
127
+ }
125
128
126
129
case . connecting( let continuations) :
127
130
for continuation in continuations {
128
131
continuation. resume ( throwing: NewLambdaRuntimeError ( code: . closingRuntimeClient) )
129
132
}
130
133
self . connectionState = . connecting( [ ] )
131
134
132
- case . connected( let channel, let lambdaChannelHandler ) :
133
- channel. clo
135
+ case . connected( let channel, _ ) :
136
+ channel. close ( mode : . all , promise : nil )
134
137
}
135
138
}
136
-
137
-
138
139
}
139
140
140
141
func nextInvocation( ) async throws -> ( Invocation , Writer ) {
@@ -227,19 +228,35 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
227
228
}
228
229
229
230
private func channelClosed( _ channel: any Channel ) {
230
- switch self . connectionState {
231
- case . disconnected:
232
- break
231
+ switch ( self . connectionState, self . closingState) {
232
+ case ( . disconnected, _) ,
233
+ ( _, . closed) :
234
+ fatalError ( " Invalid state: \( self . connectionState) , \( self . closingState) " )
233
235
234
- case . connecting( let array) :
236
+ case ( . connecting( let array) , . notClosing ) :
235
237
self . connectionState = . disconnected
236
-
237
238
for continuation in array {
238
239
continuation. resume ( throwing: NewLambdaRuntimeError ( code: . lostConnectionToControlPlane) )
239
240
}
240
241
241
- case . connected:
242
+ case ( . connecting( let array) , . closing( let continuation) ) :
243
+ self . connectionState = . disconnected
244
+ precondition ( array. isEmpty, " If we are closing we should have failed all connection attempts already " )
245
+ if self . closingConnections. isEmpty {
246
+ self . closingState = . closed
247
+ continuation. resume ( )
248
+ }
249
+
250
+ case ( . connected, . notClosing) :
251
+ self . connectionState = . disconnected
252
+
253
+ case ( . connected, . closing( let continuation) ) :
242
254
self . connectionState = . disconnected
255
+
256
+ if self . closingConnections. isEmpty {
257
+ self . closingState = . closed
258
+ continuation. resume ( )
259
+ }
243
260
}
244
261
}
245
262
@@ -356,7 +373,6 @@ extension NewLambdaRuntimeClient: LambdaChannelHandlerDelegate {
356
373
357
374
isolated. connectionState = . disconnected
358
375
359
-
360
376
}
361
377
}
362
378
0 commit comments