Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide batch position and size information inside entity operations #1511

Merged
merged 1 commit into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ internal enum StateAccess

EntityId IDurableEntityContext.EntityId => this.self;

int IDurableEntityContext.BatchPosition => this.shim.BatchPosition;

int IDurableEntityContext.BatchSize => this.shim.OperationBatch.Count;

internal List<RequestMessage> OperationBatch => this.shim.OperationBatch;

internal ExceptionDispatchInfo InternalError { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public interface IDurableEntityContext
/// </summary>
bool HasState { get; }

/// <summary>
/// The size of the current batch of operations.
/// </summary>
int BatchSize { get; }

/// <summary>
/// The position of the currently executing operation within the current batch of operations.
/// </summary>
int BatchPosition { get; }

/// <summary>
/// Gets the current state of this entity, for reading and/or updating.
/// If this entity has no state yet, creates it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public TaskEntityShim(DurableTaskExtension config, string schedulerId)

internal List<RequestMessage> OperationBatch => this.operationBatch;

internal int BatchPosition { get; private set; }

public bool RollbackFailedOperations => this.context.Config.Options.RollbackEntityOperationsOnExceptions;

public void AddOperationToBatch(RequestMessage operationMessage)
Expand Down Expand Up @@ -251,8 +253,9 @@ public async Task ExecuteBatch()
else
{
// call the function once per operation in the batch
foreach (var request in this.operationBatch)
for (this.BatchPosition = 0; this.BatchPosition < this.operationBatch.Count; this.BatchPosition++)
{
var request = this.operationBatch[this.BatchPosition];
await this.ProcessOperationRequestAsync(request);
}
}
Expand Down
62 changes: 62 additions & 0 deletions test/Common/DurableTaskEndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,68 @@ public async Task DurableEntity_StringStoreWithCreateDelete(bool extendedSession
}
}

/// <summary>
/// End-to-end test which validates batching of entity signals.
/// </summary>
[Theory]
[Trait("Category", PlatformSpecificHelpers.TestCategory)]
[InlineData(true)]
[InlineData(false)]
public async Task DurableEntity_BatchedSignals(bool extendedSessions)
{
using (var host = TestHelpers.GetJobHost(
this.loggerProvider,
nameof(this.DurableEntity_BatchedSignals),
extendedSessions))
{
await host.StartAsync();

int numIterations = 100;
var entityId = new EntityId(nameof(TestEntities.BatchEntity), Guid.NewGuid().ToString());
var client = await host.GetEntityClientAsync(entityId, this.output);

// send a number of signals immediately after each other
List<Task> tasks = new List<Task>();
for (int i = 0; i < numIterations; i++)
{
tasks.Add(client.SignalEntity(this.output, i.ToString()));
}

await Task.WhenAll(tasks);

var result = await client.WaitForEntityState<List<(int, int)>>(
this.output,
timeout: Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromSeconds(20),
list => list.Count == numIterations ? null : $"waiting for {numIterations - list.Count} signals");

// validate the batching positions and sizes
int? cursize = null;
int curpos = 0;
int numBatches = 0;
foreach (var (position, size) in result)
{
if (cursize == null)
{
cursize = size;
curpos = 0;
numBatches++;
}

Assert.Equal(curpos, position);

if (++curpos == cursize)
{
cursize = null;
}
}

// there should always be some batching going on
Assert.True(numBatches < numIterations);

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which validates exception handling in entity operations.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions test/Common/TestEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ public static void SchedulerEntity(
state.Add(context.OperationName);
}

//-------------- An entity that records all batch positions and batch sizes -----------------

public static void BatchEntity(
[EntityTrigger(EntityName = "BatchEntity")] IDurableEntityContext context,
ILogger logger)
{
var state = context.GetState(() => new List<(int, int)>());
state.Add((context.BatchPosition, context.BatchSize));
}

//-------------- an entity that stores text, and whose state is
// saved/restored to/from storage when the entity is deactivated/activated -----------------
//
Expand Down