Skip to content

Commit 7295286

Browse files
authored
Re-wrote .ToObservableChangeSet() operators for both Cache and List, to eliminate a deadlocking issue. (#1017)
See #998
1 parent 758ef92 commit 7295286

22 files changed

+3381
-2300
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
using System;
2+
using System.Linq;
3+
using System.Reactive;
4+
using System.Reactive.Linq;
5+
using System.Reactive.Subjects;
6+
using System.Threading.Tasks;
7+
8+
using DynamicData.Kernel;
9+
using DynamicData.Tests.Utilities;
10+
11+
using FluentAssertions;
12+
13+
using Xunit;
14+
15+
namespace DynamicData.Tests.Cache;
16+
17+
public static partial class AsyncDisposeManyFixture
18+
{
19+
public class IntegrationTests
20+
: IntegrationTestFixtureBase
21+
{
22+
[Theory(Timeout = 5_000)]
23+
[InlineData(ItemType.Disposable)]
24+
[InlineData(ItemType.AsyncDisposable)]
25+
[InlineData(ItemType.ImmediateAsyncDisposable)]
26+
public async Task ItemDisposalErrors_ErrorPropagatesToDisposalsCompleted(ItemType itemType)
27+
{
28+
using var source = new SourceCache<ItemBase, int>(static item => item.Id);
29+
using var sourceCompletionSource = new Subject<Unit>();
30+
31+
ValueRecordingObserver<Unit>? disposalsCompletedResults = null;
32+
33+
using var subscription = source
34+
.Connect()
35+
.TakeUntil(sourceCompletionSource)
36+
.AsyncDisposeMany(disposalsCompleted =>
37+
{
38+
disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription");
39+
disposalsCompleted.RecordValues(out disposalsCompletedResults);
40+
})
41+
.ValidateSynchronization()
42+
.ValidateChangeSets(static item => item.Id)
43+
.RecordCacheItems(out var results);
44+
45+
disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked");
46+
47+
48+
source.AddOrUpdate(new[]
49+
{
50+
ItemBase.Create(type: itemType, id: 1, version: 1),
51+
ItemBase.Create(type: itemType, id: 2, version: 1),
52+
ItemBase.Create(type: itemType, id: 3, version: 1)
53+
});
54+
55+
results.Error.Should().BeNull();
56+
results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed");
57+
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added");
58+
results.HasCompleted.Should().BeFalse();
59+
60+
disposalsCompletedResults.Error.Should().BeNull();
61+
disposalsCompletedResults.RecordedValues.Should().BeEmpty("no disposals should have occurred");
62+
disposalsCompletedResults.HasCompleted.Should().BeFalse("no disposals should have occurred");
63+
64+
65+
var error = new Exception("Test");
66+
source.Items.ElementAt(1).FailDisposal(error);
67+
68+
sourceCompletionSource.OnNext(Unit.Default);
69+
70+
// RX and TPL don't guarantee Task continuations run synchronously with antecedent completion
71+
await disposalsCompletedResults.WhenFinalized;
72+
73+
results.Error.Should().BeNull("disposal errors should be propagated on disposalsCompleted");
74+
results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed");
75+
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items were changed");
76+
results.HasCompleted.Should().BeTrue();
77+
78+
disposalsCompletedResults.Error.Should().Be(error, "disposal errors should be caught and propagated on disposalsCompleted");
79+
}
80+
81+
[Theory(Timeout = 5_000)]
82+
[InlineData(ItemType.Plain)]
83+
[InlineData(ItemType.Disposable)]
84+
[InlineData(ItemType.AsyncDisposable)]
85+
[InlineData(ItemType.ImmediateAsyncDisposable)]
86+
public async Task ItemDisposalsComplete_DisposalsCompletedOccursAndCompletes(ItemType itemType)
87+
{
88+
using var source = new SourceCache<ItemBase, int>(static item => item.Id);
89+
using var sourceCompletionSource = new Subject<Unit>();
90+
91+
ValueRecordingObserver<Unit>? disposalsCompletedResults = null;
92+
93+
using var subscription = source
94+
.Connect()
95+
.TakeUntil(sourceCompletionSource)
96+
.AsyncDisposeMany(disposalsCompleted =>
97+
{
98+
disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription");
99+
disposalsCompleted.RecordValues(out disposalsCompletedResults);
100+
})
101+
.ValidateSynchronization()
102+
.ValidateChangeSets(static item => item.Id)
103+
.RecordCacheItems(out var results);
104+
105+
disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked");
106+
107+
108+
source.AddOrUpdate(new[]
109+
{
110+
ItemBase.Create(type: itemType, id: 1, version: 1),
111+
ItemBase.Create(type: itemType, id: 2, version: 1),
112+
ItemBase.Create(type: itemType, id: 3, version: 1)
113+
});
114+
115+
results.Error.Should().BeNull();
116+
results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed");
117+
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "3 items were added");
118+
results.HasCompleted.Should().BeFalse();
119+
120+
disposalsCompletedResults.Error.Should().BeNull();
121+
disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed");
122+
disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed");
123+
124+
125+
sourceCompletionSource.OnNext(Unit.Default);
126+
foreach (var item in source.Items)
127+
item.CompleteDisposal();
128+
129+
// RX and TPL don't guarantee Task continuations run synchronously with antecedent completion
130+
await disposalsCompletedResults.WhenFinalized;
131+
132+
results.Error.Should().BeNull();
133+
results.RecordedChangeSets.Skip(1).Should().BeEmpty("no source operations were performed");
134+
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items were changed");
135+
results.HasCompleted.Should().BeTrue();
136+
137+
disposalsCompletedResults.Error.Should().BeNull();
138+
disposalsCompletedResults.RecordedValues.Count.Should().Be(1, "the source and all disposals have completed");
139+
disposalsCompletedResults.HasCompleted.Should().BeTrue("the source and all disposals have completed");
140+
}
141+
142+
[Fact(Timeout = 5_000)]
143+
public async Task ItemDisposalsOccurOnMultipleThreads_DisposalIsThreadSafe()
144+
{
145+
using var source = new SourceCache<AsyncDisposableItem, int>(static item => item.Id);
146+
using var sourceCompletionSource = new Subject<Unit>();
147+
148+
ValueRecordingObserver<Unit>? disposalsCompletedResults = null;
149+
150+
using var subscription = source
151+
.Connect()
152+
.TakeUntil(sourceCompletionSource)
153+
.AsyncDisposeMany(disposalsCompleted =>
154+
{
155+
disposalsCompletedResults.Should().BeNull("disposalsCompletedAccessor should only be invoked once per subscription");
156+
disposalsCompleted.RecordValues(out disposalsCompletedResults);
157+
})
158+
.ValidateSynchronization()
159+
.ValidateChangeSets(static item => item.Id)
160+
.RecordCacheItems(out var results);
161+
162+
disposalsCompletedResults.Should().NotBeNull("disposalsCompletedAccessor should have been invoked");
163+
164+
165+
var items = Enumerable.Range(1, 100_000)
166+
.Select(id => new AsyncDisposableItem()
167+
{
168+
Id = id,
169+
Version = 1
170+
})
171+
.ToArray();
172+
173+
source.AddOrUpdate(items);
174+
175+
results.Error.Should().BeNull();
176+
results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed");
177+
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "items were added");
178+
results.HasCompleted.Should().BeFalse();
179+
180+
disposalsCompletedResults.Error.Should().BeNull();
181+
disposalsCompletedResults.RecordedValues.Should().BeEmpty("the source has not completed");
182+
disposalsCompletedResults.HasCompleted.Should().BeFalse("the source has not completed");
183+
184+
185+
sourceCompletionSource.OnNext();
186+
await Task.WhenAll(items
187+
.GroupBy(item => item.Id % 4)
188+
.Select(group => Task.Run(() =>
189+
{
190+
foreach (var item in group)
191+
item.CompleteDisposal();
192+
})));
193+
194+
// RX and TPL don't guarantee Task continuations run synchronously with antecedent completion
195+
await disposalsCompletedResults.WhenFinalized;
196+
197+
results.Error.Should().BeNull();
198+
results.RecordedChangeSets.Count.Should().Be(1, "1 source operation was performed");
199+
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(items, "no items were removed");
200+
results.HasCompleted.Should().BeTrue();
201+
202+
items.Should().AllSatisfy(item => item.HasBeenDisposed.Should().BeTrue(), "disposable items should be disposed upon source completion");
203+
204+
disposalsCompletedResults.Error.Should().BeNull();
205+
disposalsCompletedResults.RecordedValues.Count.Should().Be(1, "the source and all disposals have completed");
206+
disposalsCompletedResults.HasCompleted.Should().BeTrue("the source and all disposals have completed");
207+
}
208+
}
209+
}

0 commit comments

Comments
 (0)