Skip to content

Commit 0220ee5

Browse files
authored
Add target-based scaling support for Azure Storage (#2452)
1 parent 7dbbc91 commit 0220ee5

19 files changed

+829
-171
lines changed

src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs

+32-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using DurableTask.AzureStorage.Tracking;
1111
using DurableTask.Core;
1212
using Microsoft.Extensions.Logging;
13+
using Microsoft.WindowsAzure.Storage;
1314
using Newtonsoft.Json;
1415
using Newtonsoft.Json.Converters;
1516
using Newtonsoft.Json.Linq;
@@ -200,6 +201,16 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
200201
}
201202

202203
#if !FUNCTIONS_V1
204+
205+
internal DurableTaskMetricsProvider GetMetricsProvider(
206+
string functionName,
207+
string hubName,
208+
CloudStorageAccount storageAccount,
209+
ILogger logger)
210+
{
211+
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
212+
}
213+
203214
/// <inheritdoc/>
204215
public override bool TryGetScaleMonitor(
205216
string functionId,
@@ -208,12 +219,31 @@ public override bool TryGetScaleMonitor(
208219
string connectionName,
209220
out IScaleMonitor scaleMonitor)
210221
{
222+
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
223+
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
211224
scaleMonitor = new DurableTaskScaleMonitor(
212225
functionId,
213226
functionName,
214227
hubName,
215-
this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(),
216-
this.logger);
228+
storageAccount,
229+
this.logger,
230+
metricsProvider);
231+
return true;
232+
}
233+
234+
#endif
235+
#if FUNCTIONS_V3_OR_GREATER
236+
public override bool TryGetTargetScaler(
237+
string functionId,
238+
string functionName,
239+
string hubName,
240+
string connectionName,
241+
out ITargetScaler targetScaler)
242+
{
243+
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
244+
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
245+
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
246+
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
217247
return true;
218248
}
219249
#endif

src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs

+22
Original file line numberDiff line numberDiff line change
@@ -565,5 +565,27 @@ public virtual bool TryGetScaleMonitor(
565565
return false;
566566
}
567567
#endif
568+
569+
#if FUNCTIONS_V3_OR_GREATER
570+
/// <summary>
571+
/// Tries to obtain a scaler for target based scaling.
572+
/// </summary>
573+
/// <param name="functionId">Function id.</param>
574+
/// <param name="functionName">Function name.</param>
575+
/// <param name="hubName">Task hub name.</param>
576+
/// <param name="connectionName">The name of the storage-specific connection settings.</param>
577+
/// <param name="targetScaler">The target-based scaler.</param>
578+
/// <returns>True if target-based scaling is supported, false otherwise.</returns>
579+
public virtual bool TryGetTargetScaler(
580+
string functionId,
581+
string functionName,
582+
string hubName,
583+
string connectionName,
584+
out ITargetScaler targetScaler)
585+
{
586+
targetScaler = null;
587+
return false;
588+
}
589+
#endif
568590
}
569591
}

src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

+6-76
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,13 @@ public DurableTaskExtension(
150150
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
151151
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
152152
this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService));
153-
this.ResolveAppSettingOptions();
153+
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
154154

155155
ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName);
156156

157157
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
158158
this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper();
159-
this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
159+
this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
160160
this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
161161
this.isOptionsConfigured = true;
162162

@@ -249,6 +249,8 @@ public string HubName
249249

250250
internal DurableTaskOptions Options { get; }
251251

252+
internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider;
253+
252254
internal HttpApiHandler HttpApiHandler { get; private set; }
253255

254256
internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; }
@@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
296298
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
297299
}
298300

299-
private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
301+
internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
300302
{
301303
bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType);
302304

@@ -578,32 +580,13 @@ private void StopLocalGrpcServer()
578580
}
579581
#endif
580582

581-
private void ResolveAppSettingOptions()
582-
{
583-
if (this.Options == null)
584-
{
585-
throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings.");
586-
}
587-
588-
if (this.nameResolver == null)
589-
{
590-
throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings.");
591-
}
592-
593-
if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName))
594-
{
595-
// use the resolved task hub name
596-
this.Options.HubName = taskHubName;
597-
}
598-
}
599-
600583
private void InitializeForFunctionsV1(ExtensionConfigContext context)
601584
{
602585
#if FUNCTIONS_V1
603586
context.ApplyConfig(this.Options, "DurableTask");
604587
this.nameResolver = context.Config.NameResolver;
605588
this.loggerFactory = context.Config.LoggerFactory;
606-
this.ResolveAppSettingOptions();
589+
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
607590
ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName);
608591
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
609592
this.connectionInfoResolver = new WebJobsConnectionInfoProvider();
@@ -1573,59 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat
15731556
activity.AddTag("DurableFunctionsRuntimeStatus", statusStr);
15741557
}
15751558
}
1576-
1577-
internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName)
1578-
{
1579-
if (this.defaultDurabilityProvider.TryGetScaleMonitor(
1580-
functionId,
1581-
functionName.Name,
1582-
this.Options.HubName,
1583-
connectionName,
1584-
out IScaleMonitor scaleMonitor))
1585-
{
1586-
return scaleMonitor;
1587-
}
1588-
else
1589-
{
1590-
// the durability provider does not support runtime scaling.
1591-
// Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
1592-
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower());
1593-
}
1594-
}
1595-
1596-
/// <summary>
1597-
/// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
1598-
/// This is required to allow operation of those providers even if runtime scaling is turned off
1599-
/// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
1600-
/// </summary>
1601-
private sealed class NoOpScaleMonitor : IScaleMonitor
1602-
{
1603-
/// <summary>
1604-
/// Construct a placeholder scale monitor.
1605-
/// </summary>
1606-
/// <param name="name">A descriptive name.</param>
1607-
public NoOpScaleMonitor(string name)
1608-
{
1609-
this.Descriptor = new ScaleMonitorDescriptor(name);
1610-
}
1611-
1612-
/// <summary>
1613-
/// A descriptive name.
1614-
/// </summary>
1615-
public ScaleMonitorDescriptor Descriptor { get; private set; }
1616-
1617-
/// <inheritdoc/>
1618-
Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
1619-
{
1620-
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
1621-
}
1622-
1623-
/// <inheritdoc/>
1624-
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
1625-
{
1626-
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
1627-
}
1628-
}
16291559
#endif
16301560
}
16311561
}

src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs

+26-2
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22
// Licensed under the MIT License. See LICENSE in the project root for license information.
33

44
using System;
5-
using System.Net.Http;
6-
using System.Threading;
5+
using System.Collections.Generic;
6+
using System.Linq;
77
#if !FUNCTIONS_V1
88
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth;
99
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
1010
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
1111
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
12+
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
13+
using Microsoft.Azure.WebJobs.Host.Scale;
1214
using Microsoft.Extensions.Azure;
1315
using Microsoft.Extensions.DependencyInjection;
1416
using Microsoft.Extensions.DependencyInjection.Extensions;
17+
using Microsoft.Extensions.Logging;
1518
using Microsoft.Extensions.Options;
1619
#else
1720
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
@@ -109,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
109112
return builder;
110113
}
111114

115+
#if FUNCTIONS_V3_OR_GREATER
116+
/// <summary>
117+
/// Adds the <see cref="IScaleMonitor"/> and <see cref="ITargetScaler"/> providers for the Durable Triggers.
118+
/// </summary>
119+
/// <param name="builder">The <see cref="IWebJobsBuilder"/> to configure.</param>
120+
/// <returns>Returns the provided <see cref="IWebJobsBuilder"/>.</returns>
121+
internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
122+
{
123+
// this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756
124+
DurableTaskTriggersScaleProvider provider = null;
125+
builder.Services.AddSingleton(serviceProvider =>
126+
{
127+
provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService<IOptions<DurableTaskOptions>>(), serviceProvider.GetService<INameResolver>(), serviceProvider.GetService<ILoggerFactory>(), serviceProvider.GetService<IEnumerable<IDurabilityProviderFactory>>(), triggerMetadata);
128+
return provider;
129+
});
130+
builder.Services.AddSingleton<IScaleMonitorProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
131+
builder.Services.AddSingleton<ITargetScalerProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
132+
return builder;
133+
}
134+
#endif
135+
112136
/// <summary>
113137
/// Adds the Durable Task extension to the provided <see cref="IWebJobsBuilder"/>.
114138
/// </summary>

src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs

+31-7
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,19 @@
22
// Licensed under the MIT License. See LICENSE in the project root for license information.
33

44
using System;
5-
using System.Collections.Generic;
6-
using System.Linq;
75
using System.Threading;
86
using System.Threading.Tasks;
9-
using DurableTask.AzureStorage.Monitoring;
10-
using Microsoft.Azure.WebJobs.Host.Executors;
7+
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
118
using Microsoft.Azure.WebJobs.Host.Listeners;
129
#if !FUNCTIONS_V1
1310
using Microsoft.Azure.WebJobs.Host.Scale;
1411
#endif
1512

1613
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
1714
{
18-
#if !FUNCTIONS_V1
15+
#if FUNCTIONS_V3_OR_GREATER
16+
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
17+
#elif FUNCTIONS_V2_OR_GREATER
1918
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider
2019
#else
2120
internal sealed class DurableTaskListener : IListener
@@ -26,10 +25,15 @@ internal sealed class DurableTaskListener : IListener
2625
private readonly FunctionName functionName;
2726
private readonly FunctionType functionType;
2827
private readonly string connectionName;
28+
2929
#if !FUNCTIONS_V1
3030
private readonly Lazy<IScaleMonitor> scaleMonitor;
3131
#endif
3232

33+
#if FUNCTIONS_V3_OR_GREATER
34+
private readonly Lazy<ITargetScaler> targetScaler;
35+
#endif
36+
3337
public DurableTaskListener(
3438
DurableTaskExtension config,
3539
string functionId,
@@ -48,12 +52,25 @@ public DurableTaskListener(
4852
this.functionName = functionName;
4953
this.functionType = functionType;
5054
this.connectionName = connectionName;
55+
5156
#if !FUNCTIONS_V1
5257
this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
53-
this.config.GetScaleMonitor(
58+
ScaleUtils.GetScaleMonitor(
59+
this.config.DefaultDurabilityProvider,
5460
this.functionId,
5561
this.functionName,
56-
this.connectionName));
62+
this.connectionName,
63+
this.config.Options.HubName));
64+
65+
#endif
66+
#if FUNCTIONS_V3_OR_GREATER
67+
this.targetScaler = new Lazy<ITargetScaler>(() =>
68+
ScaleUtils.GetTargetScaler(
69+
this.config.DefaultDurabilityProvider,
70+
this.functionId,
71+
this.functionName,
72+
this.connectionName,
73+
this.config.Options.HubName));
5774
#endif
5875
}
5976

@@ -98,5 +115,12 @@ public IScaleMonitor GetMonitor()
98115
return this.scaleMonitor.Value;
99116
}
100117
#endif
118+
119+
#if FUNCTIONS_V3_OR_GREATER
120+
public ITargetScaler GetTargetScaler()
121+
{
122+
return this.targetScaler.Value;
123+
}
124+
#endif
101125
}
102126
}

0 commit comments

Comments
 (0)