Skip to content

Fix AsyncRx GroupByUntil double OnCompletedAsync bug #2201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 75 additions & 36 deletions AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -613,29 +613,20 @@ public partial class AsyncObserver

var nullGate = new object();
var nullGroup = default(IAsyncSubject<TElement>);
bool observerComplete = false;

async ValueTask OnErrorAsync(Exception ex)
{
var nullGroupLocal = default(IAsyncSubject<TElement>);

lock (nullGate)
{
nullGroupLocal = nullGroup;
}

if (nullGroupLocal != null)
{
await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false);
}

foreach (var group in groups.Values)
{
await group.OnErrorAsync(ex).ConfigureAwait(false);
}
await ErrorAndRemoveNullGroupIfPresentAsync(ex);
await ErrorAndRemoveAllGroupsIfPresentAsync(ex);

using (await gate.LockAsync().ConfigureAwait(false))
{
await observer.OnErrorAsync(ex).ConfigureAwait(false);
if (!observerComplete)
{
observerComplete = true;
await observer.OnErrorAsync(ex).ConfigureAwait(false);
}
}
}

Expand Down Expand Up @@ -714,7 +705,10 @@ async ValueTask OnErrorAsync(Exception ex)

using (await gate.LockAsync().ConfigureAwait(false))
{
await observer.OnNextAsync(g).ConfigureAwait(false);
if (!observerComplete)
{
await observer.OnNextAsync(g).ConfigureAwait(false);
}
}

var durationSubscription = new SingleAssignmentAsyncDisposable();
Expand All @@ -723,18 +717,7 @@ async ValueTask Expire()
{
if (key == null)
{
var oldNullGroup = default(IAsyncSubject<TElement>);

lock (nullGate)
{
oldNullGroup = nullGroup;
nullGroup = null;
}

if (oldNullGroup != null)
{
await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
}
await CompleteAndRemoveNullGroupIfPresentAsync();
}
else
{
Expand Down Expand Up @@ -778,22 +761,78 @@ async ValueTask Expire()
{
if (nullGroup != null)
{
await nullGroup.OnCompletedAsync().ConfigureAwait(false);
await CompleteAndRemoveNullGroupIfPresentAsync();
}

foreach (var group in groups.Values)
{
await group.OnCompletedAsync().ConfigureAwait(false);
}
await CompleteAndRemoveAllGroupsIfPresentAsync();

using (await gate.LockAsync().ConfigureAwait(false))
{
await observer.OnCompletedAsync().ConfigureAwait(false);
if (!observerComplete)
{
observerComplete = true;
await observer.OnCompletedAsync().ConfigureAwait(false);
}
}
}
),
refCount
);

ValueTask CompleteAndRemoveNullGroupIfPresentAsync() => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(null);
ValueTask ErrorAndRemoveNullGroupIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(x);
async ValueTask CompleteOrErrorAndRemoveNullGroupIfPresentAsync(Exception x)
{
var oldNullGroup = default(IAsyncSubject<TElement>);

lock (nullGate)
{
oldNullGroup = nullGroup;
nullGroup = null;
}

if (oldNullGroup != null)
{
if (x is null)
{
await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
}
else
{
await oldNullGroup.OnErrorAsync(x).ConfigureAwait(false);
}
}
}

ValueTask CompleteAndRemoveAllGroupsIfPresentAsync() => CompleteOrErrorAndRemoveAllGroupsAsync(null);
ValueTask ErrorAndRemoveAllGroupsIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveAllGroupsAsync(x);
async ValueTask CompleteOrErrorAndRemoveAllGroupsAsync(Exception x)
{
foreach (var key in groups.Keys)
{
// The ConcurrentDictionary's Keys property is a snapshot, so
// although this TryRemove should always succeed for the first
// key in the dictionary (as long as our upstream observable is
// obeying the rules, and not making multiple concurrent calls
// to our observer) each await in this loop offers an opportunity
// for one of the group duration observables to complete, which
// will cause the Expire method above to run, meaning that an
// entry that was present when we retrieved Keys at the start of
// this loop might already have been completed and removed by the
// time this loop reaches it.
if (groups.TryRemove(key, out var group))
{
if (x is null)
{
await group.OnCompletedAsync().ConfigureAwait(false);
}
else
{
await group.OnErrorAsync(x).ConfigureAwait(false);
}
}
}
}
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions azure-pipelines.asyncrx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,27 @@ stages:
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true

steps:
- task: UseDotNet@2
displayName: Use .NET 8.0.x SDK
inputs:
version: 8.0.x
performMultiLevelLookup: true

# We need .NET 7.0 and 6.0 to be able to run all tests.
# For .NET 7.0, the runtime package is sufficient because we don't need to build anything.
# That doesn't work for 6.0, because we need the desktop framework, and the only way to
# get that into a build agent seems to be to install the SDK.
- task: UseDotNet@2
displayName: Use .NET Core 7.0.x SDK
inputs:
version: 7.0.x
performMultiLevelLookup: true

- task: UseDotNet@2
displayName: Use .NET 6.0 SDK
inputs:
version: '6.0.x'

- task: DotNetCoreCLI@2
inputs:
command: custom
Expand Down