Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add target-based scaling support for Azure Storage #2452

Merged
merged 27 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9faa500
initial commit
bachuv Apr 19, 2023
64a5484
first draft TBS
davidmrdavid Apr 21, 2023
9e56855
add no op scaler
davidmrdavid Apr 21, 2023
cd2ebe0
fix identation of dependency in csproj
davidmrdavid Apr 21, 2023
841be56
add error message to no-op target scaler
davidmrdavid Apr 21, 2023
adbe80a
add private build suffix
davidmrdavid Apr 21, 2023
5f79abe
change preview suffix
davidmrdavid Apr 21, 2023
7bc0077
introduce more conditional compilation to pass smoke tests
davidmrdavid Apr 25, 2023
f534ce5
pass stylecop
davidmrdavid Apr 25, 2023
0ad2d8e
add conditional compilation
davidmrdavid Apr 25, 2023
dc2c71a
patch conditional compilation exceptions
davidmrdavid Apr 25, 2023
706aa00
add unit test
davidmrdavid Apr 28, 2023
39b29d7
Merge branch 'main' of https://github.com/Azure/azure-functions-durab…
davidmrdavid Apr 28, 2023
78f9490
Merge branch 'dev' of https://github.com/Azure/azure-functions-durabl…
davidmrdavid Apr 28, 2023
70b6ebe
add comments
davidmrdavid Apr 28, 2023
bac5605
add unit tests
davidmrdavid May 1, 2023
776e9b6
Update src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvi…
davidmrdavid May 11, 2023
22541c2
add logs and comments
davidmrdavid May 11, 2023
d94fa7f
Remove extra line
davidmrdavid May 11, 2023
5da5ec4
Merge branch 'dev' into dajusto/tbs
davidmrdavid Sep 20, 2023
4490d11
incorporate PR feedback
davidmrdavid Sep 21, 2023
c192695
remove DTFx.Listener improts
davidmrdavid Sep 21, 2023
2e6a317
remove old GetPerformanceMonitor implementation
davidmrdavid Sep 21, 2023
e251a81
pass stylecop
davidmrdavid Sep 21, 2023
7912e7a
add comments to explain when code runs in the ScaleController process
davidmrdavid Sep 21, 2023
8d03b9a
pass stylecop
davidmrdavid Sep 21, 2023
731d809
Add ScaleController V3 integration (#2462)
davidmrdavid Oct 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the target-based scaling support still based on the old deprecated AS backend?

using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
Expand Down Expand Up @@ -200,6 +201,16 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
}

#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
}

/// <inheritdoc/>
public override bool TryGetScaleMonitor(
string functionId,
Expand All @@ -208,12 +219,30 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(),
this.logger);
storageAccount,
this.logger,
metricsProvider);
return true;
}

#endif
#if FUNCTIONS_V3_OR_GREATER
public override bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
}
#endif
Expand Down
22 changes: 22 additions & 0 deletions src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -565,5 +565,27 @@ public virtual bool TryGetScaleMonitor(
return false;
}
#endif

#if FUNCTIONS_V3_OR_GREATER
/// <summary>
/// Tries to obtain a scaler for target based scaling.
/// </summary>
/// <param name="functionId">Function id.</param>
/// <param name="functionName">Function name.</param>
/// <param name="hubName">Task hub name.</param>
/// <param name="connectionName">The name of the storage-specific connection settings.</param>
/// <param name="targetScaler">The target-based scaler.</param>
/// <returns>True if target-based scaling is supported, false otherwise.</returns>
public virtual bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
targetScaler = null;
return false;
}
#endif
}
}
53 changes: 51 additions & 2 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1589,10 +1589,52 @@ internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionN
{
// the durability provider does not support runtime scaling.
// Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower());
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower(), functionId);
}
}
#endif
#if FUNCTIONS_V3_OR_GREATER

internal ITargetScaler GetTargetScaler(string functionId, FunctionName functionName, string connectionName)
{
if (this.defaultDurabilityProvider.TryGetTargetScaler(
functionId,
functionName.Name,
this.Options.HubName,
connectionName,
out ITargetScaler targetScaler))
{
return targetScaler;
}
else
{
// the durability provider does not support target-based scaling.
// Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on).
return new NoOpTargetScaler(functionId);
}
}

private sealed class NoOpTargetScaler : ITargetScaler
{
/// <summary>
/// Construct a placeholder target scaler.
/// </summary>
/// <param name="functionId">The function ID.</param>
public NoOpTargetScaler(string functionId)
{
this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId);
}

public TargetScalerDescriptor TargetScalerDescriptor { get; }

public Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
throw new NotImplementedException("The current DurableTask backend configuration does not support target-based scaling");
}
}
#endif

#if !FUNCTIONS_V1
/// <summary>
/// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
/// This is required to allow operation of those providers even if runtime scaling is turned off
Expand All @@ -1604,9 +1646,16 @@ private sealed class NoOpScaleMonitor : IScaleMonitor
/// Construct a placeholder scale monitor.
/// </summary>
/// <param name="name">A descriptive name.</param>
public NoOpScaleMonitor(string name)
/// <param name="functionId">The function ID.</param>
public NoOpScaleMonitor(string name, string functionId)
{
#if FUNCTIONS_V3_OR_GREATER
this.Descriptor = new ScaleMonitorDescriptor(name, functionId);
#else
#pragma warning disable CS0618 // Type or member is obsolete
this.Descriptor = new ScaleMonitorDescriptor(name);
#pragma warning restore CS0618 // Type or member is obsolete
#endif
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Net.Http;
using System.Threading;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
Expand Down
29 changes: 24 additions & 5 deletions src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
#endif

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
#if !FUNCTIONS_V1
#if FUNCTIONS_V3_OR_GREATER
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
#elif FUNCTIONS_V2_OR_GREATER
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider
#else
internal sealed class DurableTaskListener : IListener
Expand All @@ -26,10 +24,15 @@ internal sealed class DurableTaskListener : IListener
private readonly FunctionName functionName;
private readonly FunctionType functionType;
private readonly string connectionName;

#if !FUNCTIONS_V1
private readonly Lazy<IScaleMonitor> scaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private readonly Lazy<ITargetScaler> targetScaler;
#endif

public DurableTaskListener(
DurableTaskExtension config,
string functionId,
Expand All @@ -49,11 +52,20 @@ public DurableTaskListener(
this.functionType = functionType;
this.connectionName = connectionName;
#if !FUNCTIONS_V1

this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
this.config.GetScaleMonitor(
this.functionId,
this.functionName,
this.connectionName));

#endif
#if FUNCTIONS_V3_OR_GREATER
this.targetScaler = new Lazy<ITargetScaler>(() =>
this.config.GetTargetScaler(
this.functionId,
this.functionName,
this.connectionName));
#endif
}

Expand Down Expand Up @@ -98,5 +110,12 @@ public IScaleMonitor GetMonitor()
return this.scaleMonitor.Value;
}
#endif

#if FUNCTIONS_V3_OR_GREATER
public ITargetScaler GetTargetScaler()
{
return this.targetScaler.Value;
}
#endif
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

#if !FUNCTIONS_V1
using System;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class DurableTaskMetricsProvider
{
private readonly string functionName;
private readonly string hubName;
private readonly ILogger logger;
private readonly CloudStorageAccount storageAccount;

private DisconnectedPerformanceMonitor performanceMonitor;

public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount)
{
this.functionName = functionName;
this.hubName = hubName;
this.logger = logger;
this.performanceMonitor = performanceMonitor;
this.storageAccount = storageAccount;
}

public virtual async Task<DurableTaskTriggerMetrics> GetMetricsAsync()
{
DurableTaskTriggerMetrics metrics = new DurableTaskTriggerMetrics();

// Durable stores its own metrics, so we just collect them here
PerformanceHeartbeat heartbeat = null;
try
{
DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor();
heartbeat = await performanceMonitor.PulseAsync();
}
catch (StorageException e)
{
this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
}

if (heartbeat != null)
{
metrics.PartitionCount = heartbeat.PartitionCount;
metrics.ControlQueueLengths = JsonConvert.SerializeObject(heartbeat.ControlQueueLengths);
metrics.ControlQueueLatencies = JsonConvert.SerializeObject(heartbeat.ControlQueueLatencies);
metrics.WorkItemQueueLength = heartbeat.WorkItemQueueLength;
if (heartbeat.WorkItemQueueLatency > TimeSpan.Zero)
{
metrics.WorkItemQueueLatency = heartbeat.WorkItemQueueLatency.ToString();
}
}

return metrics;
}

internal DisconnectedPerformanceMonitor GetPerformanceMonitor()
{
if (this.performanceMonitor == null)
{
if (this.storageAccount == null)
{
throw new ArgumentNullException(nameof(this.storageAccount));
}

this.performanceMonitor = new DisconnectedPerformanceMonitor(this.storageAccount, this.hubName);
}

return this.performanceMonitor;
}
}
}
#endif
Loading