From 6e521982c1c3fbef551aa3a41473506354f6d1e2 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Fri, 31 Jan 2025 00:16:25 +0100 Subject: [PATCH 1/2] =?UTF-8?q?fix(Correlator):=20Fixed=20the=20`Correlati?= =?UTF-8?q?onHandler`=20to=20set=20correlated=20events=20as=20the=20input?= =?UTF-8?q?=20of=20the=20workflow=20instance=20to=20create=20when=20the=20?= =?UTF-8?q?=C3=B2utcome`=20has=20been=20set=20to=20instantiate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Charles d'Avernas --- Synapse.sln | 3 --- src/api/Synapse.Api.Server/appsettings.Development.json | 2 +- .../Synapse.Correlator/Services/CorrelationHandler.cs | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Synapse.sln b/Synapse.sln index bb95ed2d9..5bd417f82 100644 --- a/Synapse.sln +++ b/Synapse.sln @@ -134,8 +134,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runtime.Kubernetes" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Core.Infrastructure.Containers.Docker", "src\core\Synapse.Core.Infrastructure.Containers.Docker\Synapse.Core.Infrastructure.Containers.Docker.csproj", "{DD6381BD-2C8B-4CE1-99B2-EC585DD818FA}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kubernetes", "kubernetes", "{B3F3DB1B-23E7-45FA-8934-448BFFB294E8}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Synapse.Core.Infrastructure.Containers.Kubernetes", "src\core\Synapse.Core.Infrastructure.Containers.Kubernetes\Synapse.Core.Infrastructure.Containers.Kubernetes.csproj", "{41C99069-BD99-4FD2-BF33-984CF03B53E8}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{35D495F4-D267-4A84-9479-DB3C1BE85434}" @@ -292,7 +290,6 @@ Global {8FF58403-9E13-4F58-864F-E6FBC877BF37} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {9B37AA4A-A342-4A41-A2A1-C8466825A70A} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {DD6381BD-2C8B-4CE1-99B2-EC585DD818FA} = {9E296C8A-4D78-4592-B046-11A3A953FD25} - {B3F3DB1B-23E7-45FA-8934-448BFFB294E8} = {562C91A3-6E91-4489-9D9D-064E7436D900} {41C99069-BD99-4FD2-BF33-984CF03B53E8} = {9E296C8A-4D78-4592-B046-11A3A953FD25} {AB30A91B-0158-411D-9BD3-36FFA441B3A2} = {35D495F4-D267-4A84-9479-DB3C1BE85434} {06404855-A5BE-4556-91BC-064630E95737} = {35D495F4-D267-4A84-9479-DB3C1BE85434} diff --git a/src/api/Synapse.Api.Server/appsettings.Development.json b/src/api/Synapse.Api.Server/appsettings.Development.json index 774c813cc..17aacb0c2 100644 --- a/src/api/Synapse.Api.Server/appsettings.Development.json +++ b/src/api/Synapse.Api.Server/appsettings.Development.json @@ -19,6 +19,6 @@ } }, "CloudEvents": { - "Endpoint": "https://webhook.site/a4aff725-0711-48b2-a9d2-5d1b806d04d0" + "Endpoint": "http://localhost:5151/api/events/pub" } } diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs index 852608349..6540fe4de 100644 --- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs +++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs @@ -361,7 +361,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, patch), workflowInstance.GetName(), workflowInstance.GetNamespace(), null, false, cancellationToken).ConfigureAwait(false); break; case CorrelationOutcomeType.Start: - var input = this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? [] : await this.ExpressionEvaluator.EvaluateAsync>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false); + var input = (this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? new() { { "events", context.Events.Values } } : await this.ExpressionEvaluator.EvaluateAsync>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false)); workflowInstance = new() { Metadata = new() From e149870c113965a194d8bd8520c4fe7aa960d681 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Fri, 31 Jan 2025 17:07:24 +0100 Subject: [PATCH 2/2] fix(Runner): Fixed the `ConnectedWorkflowExecutionContext` to remove handled correlation contexts from the `WorkflowInstance` they are associated to fix(Runner): Fixed the `ConnectedWorkflowExecutionContext` to populate `WorkfowInstance` correlation keys set by handled correlation contexts fix(Runner): Fixed a feature-breaking bug with the `AsyncApiCallExecutor` disposing inline of the IAsyncApiSubscribeOperationResult, thus irreversibly breaking the message stream fix(Runner): Fixed the `AsyncApiCallExecutor`, which was not properly evaluating `while` and `until` message consumption conditions fix(Runner): Fixed the `AsyncApiCallExecutor`, which was not passing the `item` and `index` arguments to the `output.as` and `export.as` expressions Signed-off-by: Charles d'Avernas --- .../IServiceCollectionExtensions.cs | 2 + .../Synapse.Core/Resources/CorrelationSpec.cs | 10 +++- .../Services/CorrelationHandler.cs | 14 ++++- .../ConnectedWorkflowExecutionContext.cs | 12 ++++ .../Executors/AsyncApiCallExecutor.cs | 59 ++++++++++++------- 5 files changed, 71 insertions(+), 26 deletions(-) diff --git a/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs b/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs index 113c709a7..58ac491d7 100644 --- a/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs +++ b/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs @@ -20,6 +20,7 @@ using Synapse.Api.Http.Controllers; using Synapse.Core.Api.Services; using System.Text.Json; +using System.Text.Json.Serialization; namespace Synapse.Api.Http; @@ -44,6 +45,7 @@ public static IServiceCollection AddSynapseHttpApi(this IServiceCollection servi .AddJsonOptions(options => { options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase; + options.JsonSerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull | JsonIgnoreCondition.WhenWritingDefault; }) .AddApplicationPart(typeof(WorkflowsController).Assembly); services.AddIdentityServer(options => diff --git a/src/core/Synapse.Core/Resources/CorrelationSpec.cs b/src/core/Synapse.Core/Resources/CorrelationSpec.cs index 544ce215f..a74b66472 100644 --- a/src/core/Synapse.Core/Resources/CorrelationSpec.cs +++ b/src/core/Synapse.Core/Resources/CorrelationSpec.cs @@ -46,16 +46,22 @@ public record CorrelationSpec [DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)] public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!; + /// + /// Gets/sets a key/value mapping, if any, of the keys to use to correlate events + /// + [DataMember(Name = "keys", Order = 5), JsonPropertyName("keys"), JsonPropertyOrder(5), YamlMember(Alias = "keys", Order = 5)] + public virtual EquatableDictionary? Keys { get; set; } + /// /// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete /// - [DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)] + [DataMember(Name = "stream", Order = 6), JsonPropertyName("stream"), JsonPropertyOrder(6), YamlMember(Alias = "stream", Order = 6)] public virtual bool Stream { get; set; } /// /// Gets/sets an object used to configure the correlation's outcome /// - [DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)] + [DataMember(Name = "outcome", Order = 7), JsonPropertyName("outcome"), JsonPropertyOrder(7), YamlMember(Alias = "outcome", Order = 7)] public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!; } diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs index 6540fe4de..4c430342a 100644 --- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs +++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs @@ -123,7 +123,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken { Id = Guid.NewGuid().ToString("N")[..12], Events = [new(filter.Key, e)], - Keys = CorrelationKeys == null ? new() : new(CorrelationKeys) + Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys) }; this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id); this.Logger.LogInformation("Event successfully correlated to context with id '{contextId}'", context.Id); @@ -152,7 +152,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken { Id = Guid.NewGuid().ToString("N")[..12], Events = [new(filter.Key, e)], - Keys = CorrelationKeys == null ? new() : new(CorrelationKeys) + Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys) }; await this.CreateOrUpdateContextAsync(context, cancellationToken).ConfigureAwait(false); this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id); @@ -289,7 +289,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil protected virtual async Task<(bool Succeeded, IDictionary? CorrelationKeys)> TryExtractCorrelationKeysAsync(CloudEvent e, IDictionary? keyDefinitions, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(e); - var correlationKeys = new Dictionary(); + var correlationKeys = this.Correlation.Resource.Spec.Keys ?? []; if (keyDefinitions == null || keyDefinitions.Count < 1) return (true, correlationKeys); foreach (var keyDefinition in keyDefinitions) { @@ -305,6 +305,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil } else if (!keyDefinition.Value.Expect.Equals(correlationTerm, StringComparison.OrdinalIgnoreCase)) return (false, null); } + if (correlationKeys.ContainsKey(keyDefinition.Key) && correlationTerm != correlationKeys[keyDefinition.Key]) return (false, null); correlationKeys[keyDefinition.Key] = correlationTerm; } return (true, correlationKeys); @@ -373,6 +374,13 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte { Definition = this.Correlation.Resource.Spec.Outcome.Start!.Workflow, Input = input + }, + Status = new() + { + Correlation = new() + { + Keys = context.Keys + } } }; await this.Resources.AddAsync(workflowInstance, false, cancellationToken).ConfigureAwait(false); diff --git a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs index 6f79b5ef1..1c5df3660 100644 --- a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs @@ -449,6 +449,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte Source = new ResourceReference(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()), Lifetime = CorrelationLifetime.Ephemeral, Events = listenTask.Listen.To, + Keys = this.Instance.Status?.Correlation?.Keys, Expressions = task.Workflow.Definition.Evaluate ?? new(), Outcome = new() { @@ -511,6 +512,17 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte CompletedAt = DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance = await this.Api.WorkflowInstances.GetAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken).ConfigureAwait(false); + var originalInstance = this.Instance.Clone(); + foreach(var correlationKey in correlationContext.Keys) + { + this.Instance.Status!.Correlation!.Keys ??= []; + this.Instance.Status!.Correlation!.Keys[correlationKey.Key] = correlationKey.Value; + } + this.Instance.Status!.Correlation!.Contexts!.Remove(task.Instance.Reference.OriginalString); + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance); + this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false); return correlationContext; } diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs index 3159cca90..5d4e71c7d 100644 --- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs @@ -18,6 +18,7 @@ using Neuroglia.AsyncApi.IO; using Neuroglia.AsyncApi.v3; using Neuroglia.Data.Expressions; +using System.Threading; namespace Synapse.Runner.Services.Executors; @@ -94,6 +95,11 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger protected uint? Offset { get; set; } + /// + /// Gets/sets a boolean indicating whether or not to keep consuming incoming messages + /// + protected bool KeepConsume { get; set; } = true; + /// /// Gets the path for the specified message /// @@ -234,7 +240,7 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation"); await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol); - await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); + var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation"); if(result.Messages == null) { @@ -244,24 +250,24 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken var observable = result.Messages; if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan())); if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value); - else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () => - { - var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false); - return (message, keepGoing); - })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); - else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () => - { - var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)); - return (message, keepGoing); - })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); if (this.AsyncApi.Subscription.Foreach == null) { - var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false); + var messages = await observable.ToAsyncEnumerable().TakeWhileAwait(async m => + { + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) return await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) return !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + return true; + }).ToListAsync(cancellationToken).ConfigureAwait(false); await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } else { - this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync); + //todo: fix + this.Subscription = observable.TakeWhile(_ => this.KeepConsume).SelectMany(m => + { + OnStreamingMessageAsync(m).GetAwaiter().GetResult(); + return Observable.Return(m); + }).SubscribeAsync(_ => System.Threading.Tasks.Task.CompletedTask, OnStreamingErrorAsync, OnStreamingCompletedAsync); } } @@ -274,6 +280,11 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) { if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation"); + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While) && !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false)) + { + this.KeepConsume = false; + return; + } if (this.AsyncApi.Subscription.Foreach?.Do != null) { var taskDefinition = new DoTaskDefinition() @@ -284,10 +295,15 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false) ] }; - var arguments = this.GetExpressionEvaluationArguments(); var messageData = message as object; + var offset = this.Offset ?? 0; + if (!this.Offset.HasValue) this.Offset = 0; + var arguments = this.GetExpressionEvaluationArguments(); + arguments ??= new Dictionary(); + arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!; + arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset; if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(fromExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); - else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Subscription.Foreach.Output.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Subscription.Foreach.Output.As!, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); if (this.AsyncApi.Subscription.Foreach.Export?.As is string toExpression) { var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(toExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; @@ -295,19 +311,20 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) } else if (this.AsyncApi.Subscription.Foreach.Export?.As != null) { - var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(this.AsyncApi.Subscription.Foreach.Export.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; + var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(this.AsyncApi.Subscription.Foreach.Export.As!, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false); } - var offset = this.Offset ?? 0; - if (!this.Offset.HasValue) this.Offset = 0; - arguments ??= new Dictionary(); - arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!; - arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset; var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, this.GetPathFor(offset), this.Task.Input, null, this.Task, false, this.CancellationTokenSource!.Token).ConfigureAwait(false); var taskExecutor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); await taskExecutor.ExecuteAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + if (this.Task.ContextData != taskExecutor.Task.ContextData) await this.Task.SetContextDataAsync(taskExecutor.Task.ContextData, this.CancellationTokenSource!.Token).ConfigureAwait(false); this.Offset++; } + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until) && await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false)) + { + this.KeepConsume = false; + return; + } } ///