10
10
11
11
use super :: { fixtures, matchers, * } ;
12
12
13
+ use crate :: partition:: state_machine:: tests:: matchers:: { invoked, killed, suspended} ;
13
14
use assert2:: assert;
14
15
use assert2:: let_assert;
15
16
use googletest:: any;
@@ -157,6 +158,7 @@ async fn terminate_scheduled_invocation(
157
158
response: eq( IngressResponseResult :: Failure ( match termination_flavor {
158
159
TerminationFlavor :: Kill => KILLED_INVOCATION_ERROR ,
159
160
TerminationFlavor :: Cancel => CANCELED_INVOCATION_ERROR ,
161
+ _ => panic!( "Unexpected termination flavor" ) ,
160
162
} ) )
161
163
} ) )
162
164
) ;
@@ -172,6 +174,288 @@ async fn terminate_scheduled_invocation(
172
174
Ok ( ( ) )
173
175
}
174
176
177
+ #[ rstest]
178
+ #[ case( ExperimentalFeature :: InvocationStatusKilled . into( ) , TerminationFlavor :: KillAndRestart ) ]
179
+ #[ case( EnumSet :: empty( ) , TerminationFlavor :: KillAndRestart ) ]
180
+ #[ case( EnumSet :: empty( ) , TerminationFlavor :: CancelAndRestart ) ]
181
+ #[ tokio:: test]
182
+ async fn terminate_and_restart_scheduled_invocation_has_no_effect (
183
+ #[ case] experimental_features : EnumSet < ExperimentalFeature > ,
184
+ #[ case] termination_flavor : TerminationFlavor ,
185
+ ) -> anyhow:: Result < ( ) > {
186
+ let mut test_env = TestEnv :: create_with_experimental_features ( experimental_features) . await ;
187
+
188
+ let invocation_id = InvocationId :: mock_random ( ) ;
189
+
190
+ let _ = test_env
191
+ . apply ( Command :: Invoke ( ServiceInvocation {
192
+ invocation_id,
193
+ execution_time : Some ( MillisSinceEpoch :: MAX ) ,
194
+ ..ServiceInvocation :: mock ( )
195
+ } ) )
196
+ . await ;
197
+
198
+ // assert that inboxed invocation is in invocation_status
199
+ let scheduled_invocation_status = test_env
200
+ . storage ( )
201
+ . get_invocation_status ( & invocation_id)
202
+ . await ?;
203
+ assert ! ( let InvocationStatus :: Scheduled ( _) = scheduled_invocation_status) ;
204
+
205
+ // This has no effect
206
+ let actions = test_env
207
+ . apply ( Command :: TerminateInvocation ( InvocationTermination {
208
+ invocation_id,
209
+ flavor : termination_flavor,
210
+ } ) )
211
+ . await ;
212
+ assert_that ! ( actions, empty( ) ) ;
213
+
214
+ // Invocation status didn't change at all
215
+ assert_eq ! (
216
+ scheduled_invocation_status,
217
+ test_env
218
+ . storage( )
219
+ . get_invocation_status( & invocation_id)
220
+ . await
221
+ . unwrap( )
222
+ ) ;
223
+
224
+ test_env. shutdown ( ) . await ;
225
+ Ok ( ( ) )
226
+ }
227
+
228
+ #[ rstest]
229
+ #[ case( ExperimentalFeature :: InvocationStatusKilled . into( ) , TerminationFlavor :: KillAndRestart ) ]
230
+ #[ case( EnumSet :: empty( ) , TerminationFlavor :: KillAndRestart ) ]
231
+ #[ case( EnumSet :: empty( ) , TerminationFlavor :: CancelAndRestart ) ]
232
+ #[ tokio:: test]
233
+ async fn terminate_and_restart_inboxed_invocation_has_no_effect (
234
+ #[ case] experimental_features : EnumSet < ExperimentalFeature > ,
235
+ #[ case] termination_flavor : TerminationFlavor ,
236
+ ) -> anyhow:: Result < ( ) > {
237
+ let mut test_env = TestEnv :: create_with_experimental_features ( experimental_features) . await ;
238
+
239
+ let invocation_target = InvocationTarget :: mock_virtual_object ( ) ;
240
+ let invocation_id = InvocationId :: mock_generate ( & invocation_target) ;
241
+
242
+ // First invocation takes the lock
243
+ let _ = test_env
244
+ . apply ( Command :: Invoke ( ServiceInvocation {
245
+ invocation_id : InvocationId :: mock_generate ( & invocation_target) ,
246
+ invocation_target : invocation_target. clone ( ) ,
247
+ ..ServiceInvocation :: mock ( )
248
+ } ) )
249
+ . await ;
250
+
251
+ // This invocation will be inboxed
252
+ let _ = test_env
253
+ . apply ( Command :: Invoke ( ServiceInvocation {
254
+ invocation_id,
255
+ invocation_target : invocation_target. clone ( ) ,
256
+ ..ServiceInvocation :: mock ( )
257
+ } ) )
258
+ . await ;
259
+
260
+ // assert that inboxed invocation is in invocation_status
261
+ let inboxed_invocation_status = test_env
262
+ . storage ( )
263
+ . get_invocation_status ( & invocation_id)
264
+ . await ?;
265
+ assert ! ( let InvocationStatus :: Inboxed ( _) = inboxed_invocation_status) ;
266
+
267
+ // This has no effect
268
+ let actions = test_env
269
+ . apply ( Command :: TerminateInvocation ( InvocationTermination {
270
+ invocation_id,
271
+ flavor : termination_flavor,
272
+ } ) )
273
+ . await ;
274
+ assert_that ! ( actions, empty( ) ) ;
275
+
276
+ // Invocation status didn't change at all
277
+ assert_eq ! (
278
+ inboxed_invocation_status,
279
+ test_env
280
+ . storage( )
281
+ . get_invocation_status( & invocation_id)
282
+ . await
283
+ . unwrap( )
284
+ ) ;
285
+
286
+ test_env. shutdown ( ) . await ;
287
+ Ok ( ( ) )
288
+ }
289
+
290
+ #[ tokio:: test]
291
+ async fn kill_and_restart_when_invoked ( ) -> anyhow:: Result < ( ) > {
292
+ let mut test_env = TestEnv :: create_with_experimental_features (
293
+ ExperimentalFeature :: InvocationStatusKilled . into ( ) ,
294
+ )
295
+ . await ;
296
+
297
+ let invocation_id = InvocationId :: mock_random ( ) ;
298
+
299
+ // Start invocation and pin the deployment and add one entry (doesn't matter which one)
300
+ let _ = test_env
301
+ . apply_multiple ( [
302
+ Command :: Invoke ( ServiceInvocation {
303
+ invocation_id,
304
+ ..ServiceInvocation :: mock ( )
305
+ } ) ,
306
+ Command :: InvokerEffect ( InvokerEffect {
307
+ invocation_id,
308
+ kind : InvokerEffectKind :: PinnedDeployment ( PinnedDeployment {
309
+ deployment_id : Default :: default ( ) ,
310
+ service_protocol_version : Default :: default ( ) ,
311
+ } ) ,
312
+ } ) ,
313
+ Command :: InvokerEffect ( InvokerEffect {
314
+ invocation_id,
315
+ kind : InvokerEffectKind :: JournalEntry {
316
+ entry_index : 1 ,
317
+ entry : ProtobufRawEntryCodec :: serialize_enriched ( Entry :: ClearAllState ) ,
318
+ } ,
319
+ } ) ,
320
+ ] )
321
+ . await ;
322
+ assert_that ! (
323
+ test_env
324
+ . storage( )
325
+ . get_invocation_status( & invocation_id)
326
+ . await ?,
327
+ invoked( )
328
+ ) ;
329
+
330
+ // First we should transition to killed status
331
+ let _ = test_env
332
+ . apply ( Command :: TerminateInvocation ( InvocationTermination {
333
+ invocation_id,
334
+ flavor : TerminationFlavor :: KillAndRestart ,
335
+ } ) )
336
+ . await ;
337
+ assert_that ! (
338
+ test_env
339
+ . storage( )
340
+ . get_invocation_status( & invocation_id)
341
+ . await ?,
342
+ killed( )
343
+ ) ;
344
+
345
+ // Now send the Failed invoker effect to complete the kill procedure
346
+ let actions = test_env
347
+ . apply ( Command :: InvokerEffect ( InvokerEffect {
348
+ invocation_id,
349
+ kind : InvokerEffectKind :: Failed ( KILLED_INVOCATION_ERROR ) ,
350
+ } ) )
351
+ . await ;
352
+
353
+ // Should have restarted the invocation
354
+ assert_that ! (
355
+ actions,
356
+ contains( matchers:: actions:: invoke_for_id( invocation_id) )
357
+ ) ;
358
+
359
+ // We should be back to invoked state with a reset journal and reset deployment id
360
+ assert_that ! (
361
+ test_env
362
+ . storage( )
363
+ . get_invocation_status( & invocation_id)
364
+ . await ?,
365
+ pat!( InvocationStatus :: Invoked ( pat!(
366
+ InFlightInvocationMetadata {
367
+ journal_metadata: pat!( JournalMetadata { length: eq( 1 ) } ) ,
368
+ pinned_deployment: none( ) ,
369
+ restart_when_completed: eq( false )
370
+ }
371
+ ) ) )
372
+ ) ;
373
+
374
+ test_env. shutdown ( ) . await ;
375
+ Ok ( ( ) )
376
+ }
377
+
378
+ #[ tokio:: test]
379
+ async fn kill_and_restart_when_suspended ( ) -> anyhow:: Result < ( ) > {
380
+ let mut test_env = TestEnv :: create_with_experimental_features (
381
+ ExperimentalFeature :: InvocationStatusKilled . into ( ) ,
382
+ )
383
+ . await ;
384
+
385
+ let invocation_id = InvocationId :: mock_random ( ) ;
386
+
387
+ // Start invocation, pin the deployment, add one completable entry, suspend
388
+ let _ = test_env
389
+ . apply_multiple ( [
390
+ Command :: Invoke ( ServiceInvocation {
391
+ invocation_id,
392
+ ..ServiceInvocation :: mock ( )
393
+ } ) ,
394
+ Command :: InvokerEffect ( InvokerEffect {
395
+ invocation_id,
396
+ kind : InvokerEffectKind :: PinnedDeployment ( PinnedDeployment {
397
+ deployment_id : Default :: default ( ) ,
398
+ service_protocol_version : Default :: default ( ) ,
399
+ } ) ,
400
+ } ) ,
401
+ Command :: InvokerEffect ( InvokerEffect {
402
+ invocation_id,
403
+ kind : InvokerEffectKind :: JournalEntry {
404
+ entry_index : 1 ,
405
+ entry : ProtobufRawEntryCodec :: serialize_enriched ( Entry :: Awakeable (
406
+ AwakeableEntry { result : None } ,
407
+ ) ) ,
408
+ } ,
409
+ } ) ,
410
+ Command :: InvokerEffect ( InvokerEffect {
411
+ invocation_id,
412
+ kind : InvokerEffectKind :: Suspended {
413
+ waiting_for_completed_entries : [ 1 ] . into ( ) ,
414
+ } ,
415
+ } ) ,
416
+ ] )
417
+ . await ;
418
+ assert_that ! (
419
+ test_env
420
+ . storage( )
421
+ . get_invocation_status( & invocation_id)
422
+ . await ?,
423
+ suspended( )
424
+ ) ;
425
+
426
+ // This should immediately restart
427
+ let actions = test_env
428
+ . apply ( Command :: TerminateInvocation ( InvocationTermination {
429
+ invocation_id,
430
+ flavor : TerminationFlavor :: KillAndRestart ,
431
+ } ) )
432
+ . await ;
433
+
434
+ // Should have restarted the invocation
435
+ assert_that ! (
436
+ actions,
437
+ contains( matchers:: actions:: invoke_for_id( invocation_id) )
438
+ ) ;
439
+
440
+ // We should be back to invoked state with a reset journal and reset deployment id
441
+ assert_that ! (
442
+ test_env
443
+ . storage( )
444
+ . get_invocation_status( & invocation_id)
445
+ . await ?,
446
+ pat!( InvocationStatus :: Invoked ( pat!(
447
+ InFlightInvocationMetadata {
448
+ journal_metadata: pat!( JournalMetadata { length: eq( 1 ) } ) ,
449
+ pinned_deployment: none( ) ,
450
+ restart_when_completed: eq( false )
451
+ }
452
+ ) ) )
453
+ ) ;
454
+
455
+ test_env. shutdown ( ) . await ;
456
+ Ok ( ( ) )
457
+ }
458
+
175
459
#[ rstest]
176
460
#[ case( ExperimentalFeature :: InvocationStatusKilled . into( ) ) ]
177
461
#[ case( EnumSet :: empty( ) ) ]
0 commit comments