Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ message GetWorkItemsRequest {
int32 maxConcurrentEntityWorkItems = 3;

repeated WorkerCapability capabilities = 10;
WorkItemFilters workItemFilters = 11;
}

enum WorkerCapability {
Expand All @@ -844,6 +845,26 @@ enum WorkerCapability {
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}

message WorkItemFilters {
repeated OrchestrationFilter orchestrations = 1;
repeated ActivityFilter activities = 2;
repeated EntityFilter entities = 3;
}

message OrchestrationFilter {
string name = 1;
repeated string versions = 2;
}

message ActivityFilter {
string name = 1;
repeated string versions = 2;
}

message EntityFilter {
string name = 1;
}

message WorkItem {
oneof request {
OrchestratorRequest orchestratorRequest = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,45 @@ public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTask
builder.Services.AddSingleton(filter);
return builder;
}

/// <summary>
/// Adds <see cref="DurableTaskWorkerWorkItemFilters"/> to the specified <see cref="IDurableTaskWorkerBuilder"/>.
/// </summary>
/// <param name="builder">The builder to set the builder target for.</param>
/// <param name="workItemFilters">The instance of a <see cref="DurableTaskWorkerWorkItemFilters"/> to use.</param>
/// <returns>The same <see cref="IDurableTaskWorkerBuilder"/> instance, allowing for method chaining.</returns>
/// <remarks>If this is called without specified filters, the filters will be constructed from the registered orchestrations, activities, and entities.</remarks>
public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters = null)
{
Check.NotNull(builder);
if (workItemFilters != null)
{
// Use the options pattern with the builder's name to support named builders
builder.Services.AddOptions<DurableTaskWorkerWorkItemFilters>(builder.Name)
.Configure(opts =>
{
opts.Orchestrations = workItemFilters.Orchestrations;
opts.Activities = workItemFilters.Activities;
opts.Entities = workItemFilters.Entities;
});
}
else
{
// Auto-generate the filters from registered orchestrations, activities, and entities.
builder.Services.AddOptions<DurableTaskWorkerWorkItemFilters>(builder.Name)
.Configure<IOptionsMonitor<DurableTaskRegistry>, IOptionsMonitor<DurableTaskWorkerOptions>>(
(opts, registryMonitor, workerOptionsMonitor) =>
{
DurableTaskRegistry registry = registryMonitor.Get(builder.Name);
DurableTaskWorkerOptions workerOptions = workerOptionsMonitor.Get(builder.Name);
DurableTaskWorkerWorkItemFilters generated =
DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, workerOptions);
opts.Orchestrations = generated.Orchestrations;
opts.Activities = generated.Activities;
opts.Entities = generated.Entities;
});
}

return builder;
}
}
104 changes: 104 additions & 0 deletions src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Worker;

/// <summary>
/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend
/// and only work items matching the filters will be processed by the worker. If no filters are provided,
/// the worker will process all work items.
/// </summary>
public class DurableTaskWorkerWorkItemFilters
{
/// <summary>
/// Gets or sets the orchestration filters.
/// </summary>
public IReadOnlyList<OrchestrationFilter> Orchestrations { get; set; } = [];

/// <summary>
/// Gets or sets the activity filters.
/// </summary>
public IReadOnlyList<ActivityFilter> Activities { get; set; } = [];

/// <summary>
/// Gets or sets the entity filters.
/// </summary>
public IReadOnlyList<EntityFilter> Entities { get; set; } = [];

/// <summary>
/// Creates a new instance of the <see cref="DurableTaskWorkerWorkItemFilters"/> class.
/// </summary>
/// <param name="registry"><see cref="DurableTaskRegistry"/> to construct the filter from.</param>
/// <param name="workerOptions"><see cref="DurableTaskWorkerOptions"/> that optionally provides versioning information.</param>
/// <returns>A new instance of <see cref="DurableTaskWorkerWorkItemFilters"/> constructed from the provided registry.</returns>
internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions)
{
// TODO: Support multiple versions per orchestration/activity. For now, grab the worker version from the options.
return new DurableTaskWorkerWorkItemFilters
{
Orchestrations = registry.Orchestrators.Select(orchestration => new OrchestrationFilter
{
Name = orchestration.Key,
Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.Version] : [],
}).ToList(),
Activities = registry.Activities.Select(activity => new ActivityFilter
{
Name = activity.Key,
Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.Version] : [],
}).ToList(),
Entities = registry.Entities.Select(entity => new EntityFilter
{
// Entity names are normalized to lowercase in the backend.
Name = entity.Key.ToString(),
}).ToList(),
};
}

/// <summary>
/// Specifies an orchestration filter.
/// </summary>
/// <param name="name">The name of the orchestration.</param>
/// <param name="versions">The optional versions of the orchestration.</param>
public readonly struct OrchestrationFilter(string name, IReadOnlyList<string>? versions)
{
/// <summary>
/// Gets or initializes the name of the orchestration to filter.
/// </summary>
public string Name { get; init; } = name;

/// <summary>
/// Gets or initializes the versions of the orchestration to filter.
/// </summary>
public IReadOnlyList<string> Versions { get; init; } = versions ?? [];
}

/// <summary>
/// Specifies an activity filter.
/// </summary>
/// <param name="name">The name of the activity.</param>
/// <param name="versions">The optional versions of the activity.</param>
public readonly struct ActivityFilter(string name, IReadOnlyList<string>? versions)
{
/// <summary>
/// Gets or initializes the name of the activity to filter.
/// </summary>
public string Name { get; init; } = name;

/// <summary>
/// Gets or initializes the versions of the activity to filter.
/// </summary>
public IReadOnlyList<string> Versions { get; init; } = versions ?? [];
}

/// <summary>
/// Specifies an entity filter.
/// </summary>
/// <param name="name">The name of the entity.</param>
public readonly struct EntityFilter(string name)
{
/// <summary>
/// Gets or initializes the name of the entity to filter.
/// </summary>
public string Name { get; init; } = name;
}
}
2 changes: 2 additions & 0 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.DurableTask.Abstractions;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Tracing;
using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -257,6 +258,7 @@ async ValueTask<OrchestrationRuntimeState> BuildRuntimeStateAsync(
MaxConcurrentEntityWorkItems =
workerOptions.Concurrency.MaximumConcurrentEntityWorkItems,
Capabilities = { this.worker.grpcOptions.Capabilities },
WorkItemFilters = this.worker.workItemFilters?.ToGrpcWorkItemFilters(),
},
cancellationToken: cancellation);
}
Expand Down
6 changes: 5 additions & 1 deletion src/Worker/Grpc/GrpcDurableTaskWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
readonly ILoggerFactory loggerFactory;
readonly ILogger logger;
readonly IOrchestrationFilter? orchestrationFilter;
readonly DurableTaskWorkerWorkItemFilters? workItemFilters;

/// <summary>
/// Initializes a new instance of the <see cref="GrpcDurableTaskWorker" /> class.
Expand All @@ -30,6 +31,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
/// <param name="loggerFactory">The logger.</param>
/// <param name="orchestrationFilter">The optional <see cref="IOrchestrationFilter"/> used to filter orchestration execution.</param>
/// <param name="exceptionPropertiesProvider">The custom exception properties provider that help build failure details.</param>
/// <param name="workItemFiltersMonitor">The optional <see cref="IOptionsMonitor{DurableTaskWorkerWorkItemFilters}"/> used to filter work items in the backend.</param>
public GrpcDurableTaskWorker(
string name,
IDurableTaskFactory factory,
Expand All @@ -38,7 +40,8 @@ public GrpcDurableTaskWorker(
IServiceProvider services,
ILoggerFactory loggerFactory,
IOrchestrationFilter? orchestrationFilter = null,
IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
IExceptionPropertiesProvider? exceptionPropertiesProvider = null,
IOptionsMonitor<DurableTaskWorkerWorkItemFilters>? workItemFiltersMonitor = null)
: base(name, factory)
{
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
Expand All @@ -48,6 +51,7 @@ public GrpcDurableTaskWorker(
this.logger = CreateLogger(loggerFactory, this.workerOptions);
this.orchestrationFilter = orchestrationFilter;
this.ExceptionPropertiesProvider = exceptionPropertiesProvider;
this.workItemFilters = workItemFiltersMonitor?.Get(name);
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask.Worker.Grpc.Internal;

/// <summary>
/// Extension for <see cref="DurableTaskWorkerWorkItemFilters"/> to convert to gRPC types.
/// </summary>
public static class DurableTaskWorkerWorkItemFiltersExtension
{
/// <summary>
/// Converts a <see cref="DurableTaskWorkerWorkItemFilters"/> to a gRPC <see cref="P.WorkItemFilters"/>.
/// </summary>
/// <param name="workItemFilter">The <see cref="DurableTaskWorkerWorkItemFilters"/> to convert.</param>
/// <returns>A gRPC <see cref="P.WorkItemFilters"/>.</returns>
public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter)
{
Check.NotNull(workItemFilter);
var grpcWorkItemFilters = new P.WorkItemFilters();
foreach (var orchestrationFilter in workItemFilter.Orchestrations)
{
var grpcOrchestrationFilter = new P.OrchestrationFilter
{
Name = orchestrationFilter.Name,
};
grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions);
grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter);
}

foreach (var activityFilter in workItemFilter.Activities)
{
var grpcActivityFilter = new P.ActivityFilter
{
Name = activityFilter.Name,
};
grpcActivityFilter.Versions.AddRange(activityFilter.Versions);
grpcWorkItemFilters.Activities.Add(grpcActivityFilter);
}

foreach (var entityFilter in workItemFilter.Entities)
{
var grpcEntityFilter = new P.EntityFilter
{
Name = entityFilter.Name,
};
grpcWorkItemFilters.Entities.Add(grpcEntityFilter);
}

return grpcWorkItemFilters;
}
}
Loading
Loading