Skip to content

Commit d083827

Browse files
extend IDurableEntityContext with properties that indicate current batch size and batch position. (#1511)
1 parent 8201b5c commit d083827

File tree

5 files changed

+90
-1
lines changed

5 files changed

+90
-1
lines changed

src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableEntityContext.cs

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ internal enum StateAccess
6868

6969
EntityId IDurableEntityContext.EntityId => this.self;
7070

71+
int IDurableEntityContext.BatchPosition => this.shim.BatchPosition;
72+
73+
int IDurableEntityContext.BatchSize => this.shim.OperationBatch.Count;
74+
7175
internal List<RequestMessage> OperationBatch => this.shim.OperationBatch;
7276

7377
internal ExceptionDispatchInfo InternalError { get; set; }

src/WebJobs.Extensions.DurableTask/ContextInterfaces/IDurableEntityContext.cs

+10
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ public interface IDurableEntityContext
4747
/// </summary>
4848
bool HasState { get; }
4949

50+
/// <summary>
51+
/// The size of the current batch of operations.
52+
/// </summary>
53+
int BatchSize { get; }
54+
55+
/// <summary>
56+
/// The position of the currently executing operation within the current batch of operations.
57+
/// </summary>
58+
int BatchPosition { get; }
59+
5060
/// <summary>
5161
/// Gets the current state of this entity, for reading and/or updating.
5262
/// If this entity has no state yet, creates it.

src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public TaskEntityShim(DurableTaskExtension config, DurabilityProvider durability
5454

5555
internal List<RequestMessage> OperationBatch => this.operationBatch;
5656

57+
internal int BatchPosition { get; private set; }
58+
5759
public bool RollbackFailedOperations => this.context.Config.Options.RollbackEntityOperationsOnExceptions;
5860

5961
public void AddOperationToBatch(RequestMessage operationMessage)
@@ -270,8 +272,9 @@ public async Task ExecuteBatch()
270272
else
271273
{
272274
// call the function once per operation in the batch
273-
foreach (var request in this.operationBatch)
275+
for (this.BatchPosition = 0; this.BatchPosition < this.operationBatch.Count; this.BatchPosition++)
274276
{
277+
var request = this.operationBatch[this.BatchPosition];
275278
await this.ProcessOperationRequestAsync(request);
276279
}
277280
}

test/Common/DurableTaskEndToEndTests.cs

+62
Original file line numberDiff line numberDiff line change
@@ -3005,6 +3005,68 @@ public async Task DurableEntity_StringStoreWithCreateDelete(bool extendedSession
30053005
}
30063006
}
30073007

3008+
/// <summary>
3009+
/// End-to-end test which validates batching of entity signals.
3010+
/// </summary>
3011+
[Theory]
3012+
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
3013+
[InlineData(true)]
3014+
[InlineData(false)]
3015+
public async Task DurableEntity_BatchedSignals(bool extendedSessions)
3016+
{
3017+
using (var host = TestHelpers.GetJobHost(
3018+
this.loggerProvider,
3019+
nameof(this.DurableEntity_BatchedSignals),
3020+
extendedSessions))
3021+
{
3022+
await host.StartAsync();
3023+
3024+
int numIterations = 100;
3025+
var entityId = new EntityId(nameof(TestEntities.BatchEntity), Guid.NewGuid().ToString());
3026+
var client = await host.GetEntityClientAsync(entityId, this.output);
3027+
3028+
// send a number of signals immediately after each other
3029+
List<Task> tasks = new List<Task>();
3030+
for (int i = 0; i < numIterations; i++)
3031+
{
3032+
tasks.Add(client.SignalEntity(this.output, i.ToString()));
3033+
}
3034+
3035+
await Task.WhenAll(tasks);
3036+
3037+
var result = await client.WaitForEntityState<List<(int, int)>>(
3038+
this.output,
3039+
timeout: Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(20),
3040+
list => list.Count == numIterations ? null : $"waiting for {numIterations - list.Count} signals");
3041+
3042+
// validate the batching positions and sizes
3043+
int? cursize = null;
3044+
int curpos = 0;
3045+
int numBatches = 0;
3046+
foreach (var (position, size) in result)
3047+
{
3048+
if (cursize == null)
3049+
{
3050+
cursize = size;
3051+
curpos = 0;
3052+
numBatches++;
3053+
}
3054+
3055+
Assert.Equal(curpos, position);
3056+
3057+
if (++curpos == cursize)
3058+
{
3059+
cursize = null;
3060+
}
3061+
}
3062+
3063+
// there should always be some batching going on
3064+
Assert.True(numBatches < numIterations);
3065+
3066+
await host.StopAsync();
3067+
}
3068+
}
3069+
30083070
/// <summary>
30093071
/// End-to-end test which validates exception handling in entity operations.
30103072
/// </summary>

test/Common/TestEntities.cs

+10
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,16 @@ public static void SchedulerEntity(
259259
state.Add(context.OperationName);
260260
}
261261

262+
//-------------- An entity that records all batch positions and batch sizes -----------------
263+
264+
public static void BatchEntity(
265+
[EntityTrigger(EntityName = "BatchEntity")] IDurableEntityContext context,
266+
ILogger logger)
267+
{
268+
var state = context.GetState(() => new List<(int, int)>());
269+
state.Add((context.BatchPosition, context.BatchSize));
270+
}
271+
262272
//-------------- an entity that stores text, and whose state is
263273
// saved/restored to/from storage when the entity is deactivated/activated -----------------
264274
//

0 commit comments

Comments
 (0)