Skip to content

Add target based scaling support for Netherite #265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@

public override string EventSourceName => "DurableTask-Netherite";

NetheriteMetricsProvider metricsProvider;
ILoadPublisherService loadPublisher;

/// <inheritdoc/>
public async override Task<string> RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings)
{
Expand Down Expand Up @@ -130,6 +133,25 @@
}
}

public override bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
// Target Scaler is created per function id. And they share the same NetheriteMetricsProvider.
if ( this.metricsProvider == null)
{
this.loadPublisher ??= this.Service.GetLoadPublisher();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make loadPublisher class property too like metricsProvider so that it can be shared among scalers.

this.metricsProvider = this.Service.GetNetheriteMetricsProvider(this.loadPublisher, this.Settings.EventHubsConnection);
}

targetScaler = new NetheriteTargetScaler(functionId, this.metricsProvider, this);

return true;
}

public class NetheriteScaleMetrics : ScaleMetrics
{
public byte[] Metrics { get; set; }
Expand All @@ -146,7 +168,7 @@
{
this.scalingMonitor = scalingMonitor;
string descriptorString = $"DurableTaskTrigger-Netherite-{uniqueIdentifier}".ToLower();
this.descriptor = new ScaleMonitorDescriptor(descriptorString);

Check warning on line 171 in src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'ScaleMonitorDescriptor.ScaleMonitorDescriptor(string)' is obsolete: 'This constructor is obsolete. Use the version that takes function id instead.'
}

public ScaleMonitorDescriptor Descriptor => this.descriptor;
Expand Down
112 changes: 112 additions & 0 deletions src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#if !NETSTANDARD
#if !NETCOREAPP2_2
namespace DurableTask.Netherite.AzureFunctions
{
using System;
using System.Linq;
using System.Threading.Tasks;
using DurableTask.Netherite.Scaling;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Host.Scale;
using static DurableTask.Netherite.Scaling.ScalingMonitor;

public class NetheriteTargetScaler : ITargetScaler
{
readonly NetheriteMetricsProvider metricsProvider;
readonly DurabilityProvider durabilityProvider;
readonly TargetScalerResult scaleResult;

public NetheriteTargetScaler(
string functionId,
NetheriteMetricsProvider metricsProvider,
DurabilityProvider durabilityProvider)
{
this.metricsProvider = metricsProvider;
this.durabilityProvider = durabilityProvider;
this.scaleResult = new TargetScalerResult();
this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId);
}

public TargetScalerDescriptor TargetScalerDescriptor { get; private set; }

public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
Metrics metrics = await this.metricsProvider.GetMetricsAsync();

int maxConcurrentActivities = this.durabilityProvider.MaxConcurrentTaskActivityWorkItems;
int maxConcurrentWorkItems = this.durabilityProvider.MaxConcurrentTaskOrchestrationWorkItems;

int target;

if (metrics.TaskHubIsIdle)
{
this.scaleResult.TargetWorkerCount = 0; // we need no workers
return this.scaleResult;
}

target = 1; // always need at least one worker when we are not idle

// if there is a backlog of activities, ask for enough workers to process them
int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities);
if (activities > 0)
{
int requestedWorkers = (activities + (maxConcurrentActivities - 1)) / maxConcurrentActivities; // rounded-up integer division
requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions
target = Math.Max(target, requestedWorkers);
}

// if there are load-challenged partitions, ask for a worker for each of them
int numberOfChallengedPartitions = metrics.LoadInformation.Values
.Count(info => info.IsLoaded() || info.WorkItems > maxConcurrentWorkItems);
target = Math.Max(target, numberOfChallengedPartitions);

// Determine how many different workers are currently running
int current = metrics.LoadInformation.Values.Select(info => info.WorkerId).Distinct().Count();

if (target < current)
{
// the target is lower than our current scale. However, before
// scaling in, we check some more things to avoid
// over-aggressive scale-in that could impact performance negatively.

int numberOfNonIdlePartitions = metrics.LoadInformation.Values.Count(info => !PartitionLoadInfo.IsLongIdle(info.LatencyTrend));
if (current > numberOfNonIdlePartitions)
{
// if we have more workers than non-idle partitions, don't immediately go lower than
// the number of non-idle partitions.
target = Math.Max(target, numberOfNonIdlePartitions);
}
else
{
// All partitions are busy, so so we don't want to reduce the worker count unless load is very low.
// Even if all partitions are runnning efficiently, it can be hard to know whether it is wise to reduce the worker count.
// We want to avoid scaling in unnecessarily when we've reached optimal scale-out.
// But we also want to avoid the case where a constant trickle of load after a big scale-out prevents scaling back in.
// To balance these goals, we vote to scale down only by one worker at a time when we see this situation.
bool allPartitionsAreFast = !metrics.LoadInformation.Values.Any(info =>
info.LatencyTrend.Length != PartitionLoadInfo.LatencyTrendLength
|| info.LatencyTrend.Any(c => c == PartitionLoadInfo.MediumLatency || c == PartitionLoadInfo.HighLatency));

if (allPartitionsAreFast)
{
// don't go lower than 1 below current
target = Math.Max(target, current - 1);
}
else
{
// don't go lower than current
target = Math.Max(target, current);
}
}
}

this.scaleResult.TargetWorkerCount = target;
return this.scaleResult;
}
}
}
#endif
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,17 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor)
new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName, this.Settings.TaskhubParametersFilePath)
: new AzureTableLoadPublisher(this.Settings.TableStorageConnection, this.Settings.LoadInformationAzureTableName, this.Settings.HubName);

NetheriteMetricsProvider netheriteMetricsProvider = this.GetNetheriteMetricsProvider(loadPublisher, this.Settings.EventHubsConnection);

monitor = new ScalingMonitor(
loadPublisher,
this.Settings.EventHubsConnection,
this.Settings.LoadInformationAzureTableName,
this.Settings.HubName,
this.TraceHelper.TraceScaleRecommendation,
this.TraceHelper.TraceProgress,
this.TraceHelper.TraceError);
this.TraceHelper.TraceError,
netheriteMetricsProvider);

return true;
}
Expand All @@ -249,6 +252,17 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor)
return false;
}

internal ILoadPublisherService GetLoadPublisher()
{
return string.IsNullOrEmpty(this.Settings.LoadInformationAzureTableName) ?
new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName, this.Settings.TaskhubParametersFilePath)
: new AzureTableLoadPublisher(this.Settings.TableStorageConnection, this.Settings.LoadInformationAzureTableName, this.Settings.HubName);
}

internal NetheriteMetricsProvider GetNetheriteMetricsProvider(ILoadPublisherService loadPublisher, ConnectionInfo eventHubsConnection)
{
return new NetheriteMetricsProvider(loadPublisher, eventHubsConnection);
}

public void WatchThreads(object _)
{
Expand Down
107 changes: 107 additions & 0 deletions src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite.Scaling
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Netherite.EventHubsTransport;
using static DurableTask.Netherite.Scaling.ScalingMonitor;

public class NetheriteMetricsProvider
{
readonly ILoadPublisherService loadPublisher;
readonly ConnectionInfo eventHubsConnection;

DateTime lastMetricsQueryTime = DateTime.MinValue;
Metrics metrics = default;

public NetheriteMetricsProvider(
ILoadPublisherService loadPublisher,
ConnectionInfo eventHubsConnection)
{
this.loadPublisher = loadPublisher;
this.eventHubsConnection = eventHubsConnection;
}

public virtual async Task<Metrics> GetMetricsAsync()
{
DateTime now = DateTime.UtcNow;

// Collect the metrics every 5 seconds to avoid excessive poling.
// If calling this method more frequently, return the cached metrics.
if ( now >= this.lastMetricsQueryTime.AddSeconds(5))
{
var loadInformation = await this.loadPublisher.QueryAsync(CancellationToken.None).ConfigureAwait(false);
var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false);

this.lastMetricsQueryTime = now;
this.metrics = new Metrics()
{
LoadInformation = loadInformation,
Busy = busy,
Timestamp = now,
};
}

return this.metrics;
}

/// <summary>
/// Determines if a taskhub is busy, based on load information for the partitions and on the eventhubs queue positions
/// </summary>
/// <param name="loadInformation"></param>
/// <returns>null if the hub is idle, or a string describing the current non-idle state</returns>
public async Task<string> TaskHubIsIdleAsync(Dictionary<uint, PartitionLoadInfo> loadInformation)
{
// first, check if any of the partitions have queued work or are scheduled to wake up
foreach (var kvp in loadInformation)
{
string busy = kvp.Value.IsBusy();
if (!string.IsNullOrEmpty(busy))
{
return $"P{kvp.Key:D2} {busy}";
}
}

// next, check if any of the entries are not current, in the sense that their input queue position
// does not match the latest queue position


List<long> positions = await Netherite.EventHubsTransport.EventHubsConnections.GetQueuePositionsAsync(this.eventHubsConnection, EventHubsTransport.PartitionHub).ConfigureAwait(false);

if (positions == null)
{
return "eventhubs is missing";
}

for (int i = 0; i < positions.Count; i++)
{
if (!loadInformation.TryGetValue((uint)i, out var loadInfo))
{
return $"P{i:D2} has no load information published yet";
}
if (positions[i] > loadInfo.InputQueuePosition)
{
return $"P{i:D2} has input queue position {loadInfo.InputQueuePosition} which is {positions[(int)i] - loadInfo.InputQueuePosition} behind latest position {positions[i]}";
}
}

// finally, check if we have waited long enough
foreach (var kvp in loadInformation)
{
string latencyTrend = kvp.Value.LatencyTrend;

if (!PartitionLoadInfo.IsLongIdle(latencyTrend))
{
return $"P{kvp.Key:D2} had some activity recently, latency trend is {latencyTrend}";
}
}

// we have concluded that there are no pending work items, timers, or unprocessed input queue entries
return null;
}
}
}
Loading
Loading