diff --git a/Synapse.sln b/Synapse.sln
index bb95ed2d9..5bd417f82 100644
--- a/Synapse.sln
+++ b/Synapse.sln
@@ -134,8 +134,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runtime.Kubernetes"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Core.Infrastructure.Containers.Docker", "src\core\Synapse.Core.Infrastructure.Containers.Docker\Synapse.Core.Infrastructure.Containers.Docker.csproj", "{DD6381BD-2C8B-4CE1-99B2-EC585DD818FA}"
EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kubernetes", "kubernetes", "{B3F3DB1B-23E7-45FA-8934-448BFFB294E8}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Synapse.Core.Infrastructure.Containers.Kubernetes", "src\core\Synapse.Core.Infrastructure.Containers.Kubernetes\Synapse.Core.Infrastructure.Containers.Kubernetes.csproj", "{41C99069-BD99-4FD2-BF33-984CF03B53E8}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{35D495F4-D267-4A84-9479-DB3C1BE85434}"
@@ -292,7 +290,6 @@ Global
{8FF58403-9E13-4F58-864F-E6FBC877BF37} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
{9B37AA4A-A342-4A41-A2A1-C8466825A70A} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
{DD6381BD-2C8B-4CE1-99B2-EC585DD818FA} = {9E296C8A-4D78-4592-B046-11A3A953FD25}
- {B3F3DB1B-23E7-45FA-8934-448BFFB294E8} = {562C91A3-6E91-4489-9D9D-064E7436D900}
{41C99069-BD99-4FD2-BF33-984CF03B53E8} = {9E296C8A-4D78-4592-B046-11A3A953FD25}
{AB30A91B-0158-411D-9BD3-36FFA441B3A2} = {35D495F4-D267-4A84-9479-DB3C1BE85434}
{06404855-A5BE-4556-91BC-064630E95737} = {35D495F4-D267-4A84-9479-DB3C1BE85434}
diff --git a/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs b/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs
index 113c709a7..58ac491d7 100644
--- a/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs
+++ b/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs
@@ -20,6 +20,7 @@
using Synapse.Api.Http.Controllers;
using Synapse.Core.Api.Services;
using System.Text.Json;
+using System.Text.Json.Serialization;
namespace Synapse.Api.Http;
@@ -44,6 +45,7 @@ public static IServiceCollection AddSynapseHttpApi(this IServiceCollection servi
.AddJsonOptions(options =>
{
options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
+ options.JsonSerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull | JsonIgnoreCondition.WhenWritingDefault;
})
.AddApplicationPart(typeof(WorkflowsController).Assembly);
services.AddIdentityServer(options =>
diff --git a/src/api/Synapse.Api.Server/appsettings.Development.json b/src/api/Synapse.Api.Server/appsettings.Development.json
index 774c813cc..17aacb0c2 100644
--- a/src/api/Synapse.Api.Server/appsettings.Development.json
+++ b/src/api/Synapse.Api.Server/appsettings.Development.json
@@ -19,6 +19,6 @@
}
},
"CloudEvents": {
- "Endpoint": "https://webhook.site/a4aff725-0711-48b2-a9d2-5d1b806d04d0"
+ "Endpoint": "http://localhost:5151/api/events/pub"
}
}
diff --git a/src/core/Synapse.Core/Resources/CorrelationSpec.cs b/src/core/Synapse.Core/Resources/CorrelationSpec.cs
index 544ce215f..a74b66472 100644
--- a/src/core/Synapse.Core/Resources/CorrelationSpec.cs
+++ b/src/core/Synapse.Core/Resources/CorrelationSpec.cs
@@ -46,16 +46,22 @@ public record CorrelationSpec
[DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!;
+ ///
+ /// Gets/sets a key/value mapping, if any, of the keys to use to correlate events
+ ///
+ [DataMember(Name = "keys", Order = 5), JsonPropertyName("keys"), JsonPropertyOrder(5), YamlMember(Alias = "keys", Order = 5)]
+ public virtual EquatableDictionary? Keys { get; set; }
+
///
/// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete
///
- [DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)]
+ [DataMember(Name = "stream", Order = 6), JsonPropertyName("stream"), JsonPropertyOrder(6), YamlMember(Alias = "stream", Order = 6)]
public virtual bool Stream { get; set; }
///
/// Gets/sets an object used to configure the correlation's outcome
///
- [DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)]
+ [DataMember(Name = "outcome", Order = 7), JsonPropertyName("outcome"), JsonPropertyOrder(7), YamlMember(Alias = "outcome", Order = 7)]
public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!;
}
diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
index 852608349..4c430342a 100644
--- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
+++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs
@@ -123,7 +123,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
{
Id = Guid.NewGuid().ToString("N")[..12],
Events = [new(filter.Key, e)],
- Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
+ Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys)
};
this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id);
this.Logger.LogInformation("Event successfully correlated to context with id '{contextId}'", context.Id);
@@ -152,7 +152,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
{
Id = Guid.NewGuid().ToString("N")[..12],
Events = [new(filter.Key, e)],
- Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
+ Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys)
};
await this.CreateOrUpdateContextAsync(context, cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id);
@@ -289,7 +289,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil
protected virtual async Task<(bool Succeeded, IDictionary? CorrelationKeys)> TryExtractCorrelationKeysAsync(CloudEvent e, IDictionary? keyDefinitions, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(e);
- var correlationKeys = new Dictionary();
+ var correlationKeys = this.Correlation.Resource.Spec.Keys ?? [];
if (keyDefinitions == null || keyDefinitions.Count < 1) return (true, correlationKeys);
foreach (var keyDefinition in keyDefinitions)
{
@@ -305,6 +305,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil
}
else if (!keyDefinition.Value.Expect.Equals(correlationTerm, StringComparison.OrdinalIgnoreCase)) return (false, null);
}
+ if (correlationKeys.ContainsKey(keyDefinition.Key) && correlationTerm != correlationKeys[keyDefinition.Key]) return (false, null);
correlationKeys[keyDefinition.Key] = correlationTerm;
}
return (true, correlationKeys);
@@ -361,7 +362,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, patch), workflowInstance.GetName(), workflowInstance.GetNamespace(), null, false, cancellationToken).ConfigureAwait(false);
break;
case CorrelationOutcomeType.Start:
- var input = this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? [] : await this.ExpressionEvaluator.EvaluateAsync>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false);
+ var input = (this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? new() { { "events", context.Events.Values } } : await this.ExpressionEvaluator.EvaluateAsync>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false));
workflowInstance = new()
{
Metadata = new()
@@ -373,6 +374,13 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
{
Definition = this.Correlation.Resource.Spec.Outcome.Start!.Workflow,
Input = input
+ },
+ Status = new()
+ {
+ Correlation = new()
+ {
+ Keys = context.Keys
+ }
}
};
await this.Resources.AddAsync(workflowInstance, false, cancellationToken).ConfigureAwait(false);
diff --git a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
index 6f79b5ef1..1c5df3660 100644
--- a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
+++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
@@ -449,6 +449,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte
Source = new ResourceReference(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()),
Lifetime = CorrelationLifetime.Ephemeral,
Events = listenTask.Listen.To,
+ Keys = this.Instance.Status?.Correlation?.Keys,
Expressions = task.Workflow.Definition.Evaluate ?? new(),
Outcome = new()
{
@@ -511,6 +512,17 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte
CompletedAt = DateTimeOffset.Now
}
}, cancellationToken).ConfigureAwait(false);
+ using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
+ this.Instance = await this.Api.WorkflowInstances.GetAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
+ var originalInstance = this.Instance.Clone();
+ foreach(var correlationKey in correlationContext.Keys)
+ {
+ this.Instance.Status!.Correlation!.Keys ??= [];
+ this.Instance.Status!.Correlation!.Keys[correlationKey.Key] = correlationKey.Value;
+ }
+ this.Instance.Status!.Correlation!.Contexts!.Remove(task.Instance.Reference.OriginalString);
+ var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance);
+ this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false);
return correlationContext;
}
diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
index 3159cca90..5d4e71c7d 100644
--- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
+++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
@@ -18,6 +18,7 @@
using Neuroglia.AsyncApi.IO;
using Neuroglia.AsyncApi.v3;
using Neuroglia.Data.Expressions;
+using System.Threading;
namespace Synapse.Runner.Services.Executors;
@@ -94,6 +95,11 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger
protected uint? Offset { get; set; }
+ ///
+ /// Gets/sets a boolean indicating whether or not to keep consuming incoming messages
+ ///
+ protected bool KeepConsume { get; set; } = true;
+
///
/// Gets the path for the specified message
///
@@ -234,7 +240,7 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document);
var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol);
- await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
+ var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation");
if(result.Messages == null)
{
@@ -244,24 +250,24 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
var observable = result.Messages;
if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan()));
if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value);
- else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () =>
- {
- var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false);
- return (message, keepGoing);
- })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
- else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () =>
- {
- var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false));
- return (message, keepGoing);
- })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
if (this.AsyncApi.Subscription.Foreach == null)
{
- var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false);
+ var messages = await observable.ToAsyncEnumerable().TakeWhileAwait(async m =>
+ {
+ if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) return await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
+ if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) return !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
+ return true;
+ }).ToListAsync(cancellationToken).ConfigureAwait(false);
await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}
else
{
- this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync);
+ //todo: fix
+ this.Subscription = observable.TakeWhile(_ => this.KeepConsume).SelectMany(m =>
+ {
+ OnStreamingMessageAsync(m).GetAwaiter().GetResult();
+ return Observable.Return(m);
+ }).SubscribeAsync(_ => System.Threading.Tasks.Task.CompletedTask, OnStreamingErrorAsync, OnStreamingCompletedAsync);
}
}
@@ -274,6 +280,11 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
+ if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While) && !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false))
+ {
+ this.KeepConsume = false;
+ return;
+ }
if (this.AsyncApi.Subscription.Foreach?.Do != null)
{
var taskDefinition = new DoTaskDefinition()
@@ -284,10 +295,15 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false)
]
};
- var arguments = this.GetExpressionEvaluationArguments();
var messageData = message as object;
+ var offset = this.Offset ?? 0;
+ if (!this.Offset.HasValue) this.Offset = 0;
+ var arguments = this.GetExpressionEvaluationArguments();
+ arguments ??= new Dictionary();
+ arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!;
+ arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset;
if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync