Skip to content

Commit 3700d19

Browse files
authored
Extended session support (Azure#289)
- Added support for extended sessions - End-to-end test reliability and diagnostic improvements - Refactored Event Grid tests - Updated DurableTask.AzureStorage dependency to 1.2.0 (which brings in DurableTask.Core 2.0.0.5)
1 parent 8f3ebfe commit 3700d19

22 files changed

+977
-606
lines changed

Diff for: .stylecop/GlobalSuppressions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// Project-level suppressions either have no target or are given
77
// a specific target and scoped to a namespace, type, member, etc.
88

9+
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1611:Element parameters should be documented", Justification = "Test code does not require detailed documentation")]
910
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1623:Property summary documentation should match accessors", Justification = "In some cases we would rather prefix descriptions with 'Optional'")]
1011
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1642:Constructor summary documentation should begin with standard text", Justification = "Not enforcing")]
1112
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.LayoutRules", "SA1502:Element should not be on a single line", Justification = "This is more concise")]

Diff for: src/WebJobs.Extensions.DurableTask/DurableOrchestrationContext.cs

+12-12
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public sealed class DurableOrchestrationContext : DurableOrchestrationContextBas
3737
private string serializedInput;
3838
private string serializedOutput;
3939
private string serializedCustomStatus;
40-
private int owningThreadId;
4140

4241
internal DurableOrchestrationContext(
4342
DurableTaskExtension config,
@@ -48,7 +47,6 @@ internal DurableOrchestrationContext(
4847
this.deferredTasks = new List<Func<Task>>();
4948
this.orchestrationName = functionName;
5049
this.orchestrationVersion = functionVersion;
51-
this.owningThreadId = -1;
5250
}
5351

5452
/// <inheritdoc />
@@ -77,11 +75,6 @@ internal DurableOrchestrationContext(
7775

7876
internal IList<HistoryEvent> History { get; set; }
7977

80-
internal void AssignToCurrentThread()
81-
{
82-
this.owningThreadId = Thread.CurrentThread.ManagedThreadId;
83-
}
84-
8578
/// <summary>
8679
/// Returns the orchestrator function input as a raw JSON string value.
8780
/// </summary>
@@ -387,6 +380,16 @@ private async Task<TResult> CallDurableTaskFunctionAsync<TResult>(
387380
e.InnerException?.Message);
388381
throw new FunctionFailedException(message, e.InnerException);
389382
}
383+
catch (SubOrchestrationFailedException e)
384+
{
385+
exception = e;
386+
string message = string.Format(
387+
"The {0} function '{1}' failed: \"{2}\". See the function execution logs for additional details.",
388+
functionType.ToString().ToLowerInvariant(),
389+
functionName,
390+
e.InnerException?.Message);
391+
throw new FunctionFailedException(message, e.InnerException);
392+
}
390393
catch (Exception e)
391394
{
392395
exception = e;
@@ -457,13 +460,10 @@ private void ThrowIfInvalidAccess()
457460
throw new InvalidOperationException("The inner context has not been initialized.");
458461
}
459462

460-
// TODO: This should be considered best effort because it's possible that async work
461-
// was scheduled and the CLR decided to run it on the same thread. The only guaranteed
462-
// way to detect cross-thread access is to do it in the Durable Task Framework directly.
463-
if (this.owningThreadId != -1 && this.owningThreadId != Thread.CurrentThread.ManagedThreadId)
463+
if (!OrchestrationContext.IsOrchestratorThread)
464464
{
465465
throw new InvalidOperationException(
466-
"Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints .");
466+
"Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.");
467467
}
468468
}
469469

Diff for: src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs

+47-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System;
55
using System.Collections.Concurrent;
6-
using System.Net;
76
using System.Net.Http;
87
using System.Reflection;
98
using System.Text;
@@ -155,6 +154,36 @@ public class DurableTaskExtension :
155154
/// </summary>
156155
public string EventGridKeySettingName { get; set; }
157156

157+
/// <summary>
158+
/// Gets or sets a flag indicating whether to enable extended sessions.
159+
/// </summary>
160+
/// <remarks>
161+
/// <para>Extended sessions can improve the performance of orchestrator functions by allowing them to skip
162+
/// replays when new messages are received within short periods of time.</para>
163+
/// <para>Note that orchestrator functions which are extended this way will continue to count against the
164+
/// <see cref="MaxConcurrentOrchestratorFunctions"/> limit. To avoid starvation, only half of the maximum
165+
/// number of allowed concurrent orchestrator functions can be concurrently extended at any given time.
166+
/// The <see cref="ExtendedSessionIdleTimeoutInSeconds"/> property can also be used to control how long an idle
167+
/// orchestrator function is allowed to be extended.</para>
168+
/// <para>It is recommended that this property be set to <c>false</c> during development to help
169+
/// ensure that the orchestrator code correctly obeys the idempotency rules.</para>
170+
/// </remarks>
171+
/// <value>
172+
/// <c>true</c> to enable extended sessions; otherwise <c>false</c>.
173+
/// </value>
174+
public bool ExtendedSessionsEnabled { get; set; }
175+
176+
/// <summary>
177+
/// Gets or sets the amount of time in seconds before an idle session times out. The default value is 30 seconds.
178+
/// </summary>
179+
/// <remarks>
180+
/// This setting is applicable when <see cref="ExtendedSessionsEnabled"/> is set to <c>true</c>.
181+
/// </remarks>
182+
/// <value>
183+
/// The number of seconds before an idle session times out.
184+
/// </value>
185+
public int ExtendedSessionIdleTimeoutInSeconds { get; set; } = 30;
186+
158187
internal LifeCycleNotificationHelper LifeCycleNotificationHelper => this.lifeCycleNotificationHelper;
159188

160189
internal EndToEndTraceHelper TraceHelper => this.traceHelper;
@@ -206,6 +235,17 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
206235
this.orchestrationService = new AzureStorageOrchestrationService(settings);
207236
this.taskHubWorker = new TaskHubWorker(this.orchestrationService, this, this);
208237
this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.OrchestrationMiddleware);
238+
239+
context.Config.AddService<IOrchestrationService>(this.orchestrationService);
240+
}
241+
242+
/// <summary>
243+
/// Deletes all data stored in the current task hub.
244+
/// </summary>
245+
/// <returns>A task representing the async delete operation.</returns>
246+
public Task DeleteTaskHubAsync()
247+
{
248+
return this.orchestrationService.DeleteAsync();
209249
}
210250

211251
/// <summary>
@@ -374,15 +414,21 @@ internal AzureStorageOrchestrationServiceSettings GetOrchestrationServiceSetting
374414
throw new InvalidOperationException("Unable to find an Azure Storage connection string to use for this binding.");
375415
}
376416

417+
TimeSpan extendedSessionTimeout = TimeSpan.FromSeconds(
418+
Math.Max(this.ExtendedSessionIdleTimeoutInSeconds, 0));
419+
377420
return new AzureStorageOrchestrationServiceSettings
378421
{
379422
StorageConnectionString = resolvedStorageConnectionString,
380423
TaskHubName = taskHubNameOverride ?? this.HubName,
381424
PartitionCount = this.PartitionCount,
425+
ControlQueueBatchSize = this.ControlQueueBatchSize,
382426
ControlQueueVisibilityTimeout = this.ControlQueueVisibilityTimeout,
383427
WorkItemQueueVisibilityTimeout = this.WorkItemQueueVisibilityTimeout,
384428
MaxConcurrentTaskOrchestrationWorkItems = this.MaxConcurrentOrchestratorFunctions,
385429
MaxConcurrentTaskActivityWorkItems = this.MaxConcurrentActivityFunctions,
430+
ExtendedSessionsEnabled = this.ExtendedSessionsEnabled,
431+
ExtendedSessionIdleTimeout = extendedSessionTimeout,
386432
};
387433
}
388434

Diff for: src/WebJobs.Extensions.DurableTask/LifeCycleNotificationHelper.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public EventGridEvent() { }
251251
public string Subject { get; set; }
252252

253253
[JsonProperty(PropertyName = "data")]
254-
public object Data { get; set; }
254+
public EventGridPayload Data { get; set; }
255255

256256
[JsonProperty(PropertyName = "eventType")]
257257
public string EventType { get; set; }

Diff for: src/WebJobs.Extensions.DurableTask/Listener/TaskOrchestrationShim.cs

-6
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ private enum AsyncActionType
4747

4848
public void SetFunctionInvocationCallback(Func<Task> callback)
4949
{
50-
if (this.functionInvocationCallback != null)
51-
{
52-
throw new InvalidOperationException($"{nameof(this.SetFunctionInvocationCallback)} must be called only once.");
53-
}
54-
5550
this.functionInvocationCallback = callback ?? throw new ArgumentNullException(nameof(callback));
5651
}
5752

@@ -62,7 +57,6 @@ public override async Task<string> Execute(OrchestrationContext innerContext, st
6257
throw new InvalidOperationException($"The {nameof(this.functionInvocationCallback)} has not been assigned!");
6358
}
6459

65-
this.context.AssignToCurrentThread();
6660
this.context.SetInnerContext(innerContext);
6761
this.context.SetInput(serializedInput);
6862

Diff for: src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
<AssemblyName>Microsoft.Azure.WebJobs.Extensions.DurableTask</AssemblyName>
66
<RootNamespace>Microsoft.Azure.WebJobs.Extensions.DurableTask</RootNamespace>
77
<DocumentationFile>Microsoft.Azure.WebJobs.Extensions.DurableTask.xml</DocumentationFile>
8-
<AssemblyVersion>1.3.1.0</AssemblyVersion>
9-
<FileVersion>1.3.1.0</FileVersion>
10-
<Version>1.3.1-rc</Version>
8+
<AssemblyVersion>1.3.3.0</AssemblyVersion>
9+
<FileVersion>1.3.3.0</FileVersion>
10+
<Version>1.3.3-rc</Version>
1111
<Company>Microsoft Corporation</Company>
1212
</PropertyGroup>
1313

1414
<ItemGroup>
15-
<PackageReference Include="DurableTask.AzureStorage" Version="1.1.7-beta" />
15+
<PackageReference Include="DurableTask.AzureStorage" Version="1.2.0" />
1616
</ItemGroup>
1717

1818
<ItemGroup Condition="'$(Configuration)' == 'Debug'">

Diff for: test/BindingTests.cs

+6-6
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public BindingTests(ITestOutputHelper output)
3232
[Fact]
3333
public async Task ActivityTriggerAsJObject()
3434
{
35-
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsJObject)))
35+
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsJObject), false))
3636
{
3737
await host.StartAsync();
3838

@@ -48,7 +48,7 @@ public async Task ActivityTriggerAsJObject()
4848
// The function checks to see if there is a property called "Foo" which is set to a value
4949
// called "Bar" and returns true if this is the case. Otherwise returns false.
5050
Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);
51-
Assert.Equal(true, status?.Output);
51+
Assert.True((bool)status?.Output);
5252

5353
await host.StopAsync();
5454
}
@@ -57,7 +57,7 @@ public async Task ActivityTriggerAsJObject()
5757
[Fact]
5858
public async Task ActivityTriggerAsPOCO()
5959
{
60-
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsPOCO)))
60+
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsPOCO), false))
6161
{
6262
await host.StartAsync();
6363

@@ -83,7 +83,7 @@ public async Task ActivityTriggerAsPOCO()
8383
[Fact]
8484
public async Task ActivityTriggerAsNumber()
8585
{
86-
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber)))
86+
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber), false))
8787
{
8888
await host.StartAsync();
8989

@@ -107,7 +107,7 @@ public async Task ActivityTriggerAsNumber()
107107
[Fact]
108108
public async Task BindToBlobViaParameterName()
109109
{
110-
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber)))
110+
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber), false))
111111
{
112112
await host.StartAsync();
113113

@@ -157,7 +157,7 @@ public async Task BindToBlobViaParameterName()
157157
[Fact]
158158
public async Task BindToBlobViaPOCO()
159159
{
160-
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber)))
160+
using (JobHost host = TestHelpers.GetJobHost(this.loggerFactory, nameof(this.ActivityTriggerAsNumber), false))
161161
{
162162
await host.StartAsync();
163163

Diff for: test/DurableOrchestrationClientBaseTests.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44
using System;
55
using System.Threading.Tasks;
66
using FluentAssertions;
7-
using Microsoft.Azure.WebJobs;
87
using Moq;
98
using Xunit;
109

11-
namespace WebJobs.Extensions.DurableTask.Tests
10+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
1211
{
1312
public class DurableOrchestrationClientBaseTests
1413
{

Diff for: test/DurableOrchestrationClientMock.cs

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
using System;
55
using System.Threading.Tasks;
66
using DurableTask.Core;
7-
using Microsoft.Azure.WebJobs;
8-
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
97

10-
namespace WebJobs.Extensions.DurableTask.Tests
8+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
119
{
1210
internal class DurableOrchestrationClientMock : DurableOrchestrationClient
1311
{

Diff for: test/DurableOrchestrationContextBaseTests.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55
using System.Threading;
66
using System.Threading.Tasks;
77
using FluentAssertions;
8-
using Microsoft.Azure.WebJobs;
98
using Moq;
109
using Xunit;
1110

12-
namespace WebJobs.Extensions.DurableTask.Tests
11+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests
1312
{
1413
public class DurableOrchestrationContextBaseTests
1514
{

0 commit comments

Comments
 (0)