Skip to content

Commit 43c654f

Browse files
authored
(#298) Conflict Resolver implementation. (#325)
* (#298) Conflict resolver.
1 parent 55a1712 commit 43c654f

File tree

9 files changed

+852
-9
lines changed

9 files changed

+852
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace CommunityToolkit.Datasync.Client.Offline;
6+
7+
/// <summary>
8+
/// An abstract class that provides a mechanism for resolving conflicts between client and server objects of a specified
9+
/// type asynchronously. The object edition of the conflict resolver just calls the typed version.
10+
/// </summary>
11+
/// <typeparam name="TEntity">The type of entity being resolved.</typeparam>
12+
public abstract class AbstractConflictResolver<TEntity> : IConflictResolver<TEntity>
13+
{
14+
/// <inheritdoc />
15+
public abstract Task<ConflictResolution> ResolveConflictAsync(TEntity? clientObject, TEntity? serverObject, CancellationToken cancellationToken = default);
16+
17+
/// <summary>
18+
/// The object version of the resolver calls the typed version.
19+
/// </summary>
20+
/// <param name="clientObject"></param>
21+
/// <param name="serverObject"></param>
22+
/// <param name="cancellationToken"></param>
23+
/// <returns></returns>
24+
public virtual async Task<ConflictResolution> ResolveConflictAsync(object? clientObject, object? serverObject, CancellationToken cancellationToken = default)
25+
=> await ResolveConflictAsync((TEntity?)clientObject, (TEntity?)serverObject, cancellationToken);
26+
}
27+
28+
/// <summary>
29+
/// A conflict resolver where the client object always wins.
30+
/// </summary>
31+
public class ClientWinsConflictResolver : IConflictResolver
32+
{
33+
/// <inheritdoc />
34+
public Task<ConflictResolution> ResolveConflictAsync(object? clientObject, object? serverObject, CancellationToken cancellationToken = default)
35+
=> Task.FromResult(new ConflictResolution
36+
{
37+
Result = ConflictResolutionResult.Client,
38+
Entity = clientObject
39+
});
40+
}
41+
42+
/// <summary>
43+
/// A conflict resolver where the server object always wins.
44+
/// </summary>
45+
public class ServerWinsConflictResolver : IConflictResolver
46+
{
47+
/// <inheritdoc />
48+
public Task<ConflictResolution> ResolveConflictAsync(object? clientObject, object? serverObject, CancellationToken cancellationToken = default)
49+
=> Task.FromResult(new ConflictResolution
50+
{
51+
Result = ConflictResolutionResult.Server,
52+
Entity = serverObject
53+
});
54+
}
55+

src/CommunityToolkit.Datasync.Client/Offline/DatasyncOfflineOptionsBuilder.cs

+12-1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public DatasyncOfflineOptionsBuilder Entity<TEntity>(Action<EntityOfflineOptions
9595
EntityOfflineOptions<TEntity> entity = new();
9696
configure(entity);
9797
options.ClientName = entity.ClientName;
98+
options.ConflictResolver = entity.ConflictResolver;
9899
options.Endpoint = entity.Endpoint;
99100
options.QueryDescription = new QueryTranslator<TEntity>(entity.Query).Translate();
100101
return this;
@@ -137,7 +138,7 @@ internal OfflineOptions Build()
137138

138139
foreach (EntityOfflineOptions entity in this._entities.Values)
139140
{
140-
result.AddEntity(entity.EntityType, entity.ClientName, entity.Endpoint, entity.QueryDescription);
141+
result.AddEntity(entity.EntityType, entity.ClientName, entity.ConflictResolver, entity.Endpoint, entity.QueryDescription);
141142
}
142143

143144
return result;
@@ -164,6 +165,11 @@ public class EntityOfflineOptions(Type entityType)
164165
/// </summary>
165166
public Uri Endpoint { get; set; } = new Uri($"/tables/{entityType.Name.ToLowerInvariant()}", UriKind.Relative);
166167

168+
/// <summary>
169+
/// The conflict resolver for this entity.
170+
/// </summary>
171+
public IConflictResolver? ConflictResolver { get; set; }
172+
167173
/// <summary>
168174
/// The query description for the entity type - may be null (to mean "pull everything").
169175
/// </summary>
@@ -186,6 +192,11 @@ public class EntityOfflineOptions<TEntity>() where TEntity : class
186192
/// </summary>
187193
public string ClientName { get; set; } = string.Empty;
188194

195+
/// <summary>
196+
/// The conflict resolver for this entity.
197+
/// </summary>
198+
public IConflictResolver? ConflictResolver { get; set; }
199+
189200
/// <summary>
190201
/// The endpoint for the entity type.
191202
/// </summary>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace CommunityToolkit.Datasync.Client.Offline;
6+
7+
/// <summary>
8+
/// Definition of a conflict resolver. This is used in push situations where
9+
/// the server returns a 409 or 412 status code indicating that the client is
10+
/// out of step with the server.
11+
/// </summary>
12+
public interface IConflictResolver
13+
{
14+
/// <summary>
15+
/// Resolves the conflict between two objects - client side and server side.
16+
/// </summary>
17+
/// <param name="clientObject">The client object.</param>
18+
/// <param name="serverObject">The server object.</param>
19+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
20+
/// <returns>The conflict resolution.</returns>
21+
Task<ConflictResolution> ResolveConflictAsync(object? clientObject, object? serverObject, CancellationToken cancellationToken = default);
22+
}
23+
24+
/// <summary>
25+
/// Definition of a conflict resolver. This is used in push situations where
26+
/// the server returns a 409 or 412 status code indicating that the client is
27+
/// out of step with the server.
28+
/// </summary>
29+
/// <typeparam name="TEntity">The type of the entity.</typeparam>
30+
public interface IConflictResolver<TEntity> : IConflictResolver
31+
{
32+
/// <summary>
33+
/// Resolves the conflict between two objects - client side and server side.
34+
/// </summary>
35+
/// <param name="clientObject">The client object.</param>
36+
/// <param name="serverObject">The server object.</param>
37+
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to observe.</param>
38+
/// <returns>The conflict resolution.</returns>
39+
Task<ConflictResolution> ResolveConflictAsync(TEntity? clientObject, TEntity? serverObject, CancellationToken cancellationToken = default);
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
namespace CommunityToolkit.Datasync.Client.Offline;
6+
7+
/// <summary>
8+
/// The possible results of a conflict resolution.
9+
/// </summary>
10+
public enum ConflictResolutionResult
11+
{
12+
/// <summary>
13+
/// The default resolution, which is to do nothing and re-queue the operation.
14+
/// </summary>
15+
Default,
16+
17+
/// <summary>
18+
/// The provided client object should be used. This results in a new "force" submission
19+
/// to the server to over-write the server entity.
20+
/// </summary>
21+
Client,
22+
23+
/// <summary>
24+
/// The server object should be used. This results in the client object being updated
25+
/// with whatever the server object was provided.
26+
/// </summary>
27+
Server
28+
}
29+
30+
/// <summary>
31+
/// The model class returned by a conflict resolver to indicate the resolution of the conflict.
32+
/// </summary>
33+
public class ConflictResolution
34+
{
35+
/// <summary>
36+
/// The conflict resolution result.
37+
/// </summary>
38+
public ConflictResolutionResult Result { get; set; } = ConflictResolutionResult.Default;
39+
40+
/// <summary>
41+
/// The entity, if required.
42+
/// </summary>
43+
public object? Entity { get; set; }
44+
}

src/CommunityToolkit.Datasync.Client/Offline/Models/EntityDatasyncOptions.cs

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ namespace CommunityToolkit.Datasync.Client.Offline.Models;
1111
/// </summary>
1212
internal class EntityDatasyncOptions
1313
{
14+
/// <summary>
15+
/// The conflict resolver for the entity.
16+
/// </summary>
17+
internal IConflictResolver? ConflictResolver { get; init; }
18+
1419
/// <summary>
1520
/// The endpoint for the entity type.
1621
/// </summary>

src/CommunityToolkit.Datasync.Client/Offline/Models/OfflineOptions.cs

+16-2
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,18 @@ internal class OfflineOptions()
2525
/// </summary>
2626
/// <param name="entityType">The type of the entity being stored.</param>
2727
/// <param name="clientName">The name of the client.</param>
28+
/// <param name="conflictResolver">The conflict resolver to use.</param>
2829
/// <param name="endpoint">The endpoint serving the datasync services.</param>
2930
/// <param name="queryDescription">The optional query description to describe what entities need to be pulled.</param>
30-
public void AddEntity(Type entityType, string clientName, Uri endpoint, QueryDescription? queryDescription = null)
31+
public void AddEntity(Type entityType, string clientName, IConflictResolver? conflictResolver, Uri endpoint, QueryDescription? queryDescription = null)
3132
{
32-
this._cache.Add(entityType, new EntityOptions { ClientName = clientName, Endpoint = endpoint, QueryDescription = queryDescription });
33+
this._cache.Add(entityType, new EntityOptions
34+
{
35+
ClientName = clientName,
36+
ConflictResolver = conflictResolver,
37+
Endpoint = endpoint,
38+
QueryDescription = queryDescription
39+
});
3340
}
3441

3542
/// <summary>
@@ -43,6 +50,7 @@ public EntityDatasyncOptions GetOptions(Type entityType)
4350
{
4451
return new()
4552
{
53+
ConflictResolver = options.ConflictResolver,
4654
Endpoint = options.Endpoint,
4755
HttpClient = HttpClientFactory.CreateClient(options.ClientName),
4856
QueryDescription = options.QueryDescription ?? new QueryDescription()
@@ -52,6 +60,7 @@ public EntityDatasyncOptions GetOptions(Type entityType)
5260
{
5361
return new()
5462
{
63+
ConflictResolver = null,
5564
Endpoint = new Uri($"tables/{entityType.Name.ToLowerInvariant()}", UriKind.Relative),
5665
HttpClient = HttpClientFactory.CreateClient(),
5766
QueryDescription = new QueryDescription()
@@ -69,6 +78,11 @@ internal class EntityOptions
6978
/// </summary>
7079
public required string ClientName { get; set; }
7180

81+
/// <summary>
82+
/// The conflict resolver for the entity options.
83+
/// </summary>
84+
internal IConflictResolver? ConflictResolver { get; set; }
85+
7286
/// <summary>
7387
/// The endpoint for the entity type.
7488
/// </summary>

src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,19 @@ public async Task<PullResult> ExecuteAsync(IEnumerable<PullRequest> requests, Pu
5959
{
6060
_ = context.Add(item);
6161
result.IncrementAdditions();
62-
}
62+
}
6363
else if (originalEntity is not null && metadata.Deleted)
6464
{
6565
_ = context.Remove(originalEntity);
6666
result.IncrementDeletions();
67-
}
67+
}
6868
else if (originalEntity is not null && !metadata.Deleted)
6969
{
7070
context.Entry(originalEntity).CurrentValues.SetValues(item);
7171
result.IncrementReplacements();
7272
}
7373

74-
if (metadata.UpdatedAt.HasValue && metadata.UpdatedAt.Value > lastSynchronization)
74+
if (metadata.UpdatedAt > lastSynchronization)
7575
{
7676
lastSynchronization = metadata.UpdatedAt.Value;
7777
bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);

src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs

+38-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using CommunityToolkit.Datasync.Client.Threading;
99
using Microsoft.EntityFrameworkCore;
1010
using Microsoft.EntityFrameworkCore.ChangeTracking;
11+
using System.Net;
1112
using System.Reflection;
1213
using System.Text.Json;
1314

@@ -77,7 +78,7 @@ internal List<EntityEntry> GetChangedEntitiesInScope()
7778
/// </summary>
7879
/// <remarks>
7980
/// An entity is "synchronization ready" if:
80-
///
81+
///
8182
/// * It is a property on this context
8283
/// * The property is public and a <see cref="DbSet{TEntity}"/>.
8384
/// * The property does not have a <see cref="DoNotSynchronizeAttribute"/> specified.
@@ -215,7 +216,7 @@ internal IEnumerable<Type> GetSynchronizableEntityTypes(IEnumerable<Type> allowe
215216
/// </summary>
216217
/// <remarks>
217218
/// An entity is "synchronization ready" if:
218-
///
219+
///
219220
/// * It is a property on this context
220221
/// * The property is public and a <see cref="DbSet{TEntity}"/>.
221222
/// * The property does not have a <see cref="DoNotSynchronizeAttribute"/> specified.
@@ -299,7 +300,40 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt
299300
ExecutableOperation op = await ExecutableOperation.CreateAsync(operation, cancellationToken).ConfigureAwait(false);
300301
ServiceResponse response = await op.ExecuteAsync(options, cancellationToken).ConfigureAwait(false);
301302

302-
if (!response.IsSuccessful)
303+
bool isSuccessful = response.IsSuccessful;
304+
if (response.IsConflictStatusCode && options.ConflictResolver is not null)
305+
{
306+
object? serverEntity = JsonSerializer.Deserialize(response.ContentStream, entityType, DatasyncSerializer.JsonSerializerOptions);
307+
object? clientEntity = JsonSerializer.Deserialize(operation.Item, entityType, DatasyncSerializer.JsonSerializerOptions);
308+
ConflictResolution resolution = await options.ConflictResolver.ResolveConflictAsync(clientEntity, serverEntity, cancellationToken).ConfigureAwait(false);
309+
310+
if (resolution.Result is ConflictResolutionResult.Client)
311+
{
312+
operation.Item = JsonSerializer.Serialize(resolution.Entity, entityType, DatasyncSerializer.JsonSerializerOptions);
313+
operation.State = OperationState.Pending;
314+
operation.LastAttempt = DateTimeOffset.UtcNow;
315+
operation.HttpStatusCode = response.StatusCode;
316+
operation.EntityVersion = string.Empty; // Force the push
317+
operation.Version++;
318+
_ = this._context.Update(operation);
319+
ExecutableOperation resolvedOp = await ExecutableOperation.CreateAsync(operation, cancellationToken).ConfigureAwait(false);
320+
response = await resolvedOp.ExecuteAsync(options, cancellationToken).ConfigureAwait(false);
321+
isSuccessful = response.IsSuccessful;
322+
}
323+
else if (resolution.Result is ConflictResolutionResult.Server)
324+
{
325+
lock (this.pushlock)
326+
{
327+
operation.State = OperationState.Completed; // Make it successful
328+
operation.LastAttempt = DateTimeOffset.UtcNow;
329+
operation.HttpStatusCode = 200;
330+
isSuccessful = true;
331+
_ = this._context.Update(operation);
332+
}
333+
}
334+
}
335+
336+
if (!isSuccessful)
303337
{
304338
lock (this.pushlock)
305339
{
@@ -315,6 +349,7 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt
315349
// If the operation is a success, then the content may need to be updated.
316350
if (operation.Kind != OperationKind.Delete)
317351
{
352+
_ = response.ContentStream.Seek(0L, SeekOrigin.Begin); // Reset the memory stream to the beginning.
318353
object? newValue = JsonSerializer.Deserialize(response.ContentStream, entityType, DatasyncSerializer.JsonSerializerOptions);
319354
object? oldValue = await this._context.FindAsync(entityType, [operation.ItemId], cancellationToken).ConfigureAwait(false);
320355
ReplaceDatabaseValue(oldValue, newValue);

0 commit comments

Comments
 (0)