Skip to content

Commit 26ca46f

Browse files
committed
Unsubscribe from redis topic async
1 parent c4dcfea commit 26ca46f

File tree

7 files changed

+28
-22
lines changed

7 files changed

+28
-22
lines changed

src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs

+5-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public NatsTopic(
2323
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
2424
}
2525

26-
protected override async ValueTask<IDisposable> OnConnectAsync(
26+
protected override async ValueTask<IAsyncDisposable> OnConnectAsync(
2727
CancellationToken cancellationToken)
2828
{
2929
// We ensure that the processing is not started before the context is fully initialized.
@@ -39,7 +39,7 @@ protected override async ValueTask<IDisposable> OnConnectAsync(
3939
return new Session(Name, natsSession, DiagnosticEvents);
4040
}
4141

42-
private sealed class Session : IDisposable
42+
private sealed class Session : IAsyncDisposable
4343
{
4444
private readonly string _name;
4545
private readonly IDisposable _natsSession;
@@ -56,14 +56,16 @@ public Session(
5656
_diagnosticEvents = diagnosticEvents;
5757
}
5858

59-
public void Dispose()
59+
public ValueTask DisposeAsync()
6060
{
6161
if (!_disposed)
6262
{
6363
_natsSession.Dispose();
6464
_diagnosticEvents.ProviderTopicInfo(_name, Session_Dispose_UnsubscribedFromNats);
6565
_disposed = true;
6666
}
67+
68+
return ValueTask.CompletedTask;
6769
}
6870
}
6971
}

src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannel.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public async ValueTask EnsureInitialized(CancellationToken cancellationToken)
3939
}
4040
}
4141

42-
public IDisposable Subscribe(PostgresChannelObserver observer)
42+
public IAsyncDisposable Subscribe(PostgresChannelObserver observer)
4343
{
4444
_observers.Add(observer);
4545

@@ -124,7 +124,7 @@ private void OnNotification(object sender, NpgsqlNotificationEventArgs eventArgs
124124
}
125125
}
126126

127-
private sealed class Unsubscriber : IDisposable
127+
private sealed class Unsubscriber : IAsyncDisposable
128128
{
129129
private readonly PostgresChannel _channel;
130130
private readonly PostgresChannelObserver _observer;
@@ -136,9 +136,10 @@ public Unsubscriber(PostgresChannel channel, PostgresChannelObserver observer)
136136
}
137137

138138
/// <inheritdoc />
139-
public void Dispose()
139+
public ValueTask DisposeAsync()
140140
{
141141
_channel.Unsubscribe(_observer);
142+
return ValueTask.CompletedTask;
142143
}
143144
}
144145

src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresTopic.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public PostgresTopic(
2222
}
2323

2424
/// <inheritdoc />
25-
protected override async ValueTask<IDisposable> OnConnectAsync(
25+
protected override async ValueTask<IAsyncDisposable> OnConnectAsync(
2626
CancellationToken cancellationToken)
2727
{
2828
await _channel.EnsureInitialized(cancellationToken);

src/HotChocolate/Core/src/Subscriptions.RabbitMQ/RabbitMQTopic.cs

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public RabbitMQTopic(
2525
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
2626
}
2727

28-
protected override async ValueTask<IDisposable> OnConnectAsync(
28+
protected override async ValueTask<IAsyncDisposable> OnConnectAsync(
2929
CancellationToken cancellationToken)
3030
{
3131
// We ensure that the processing is not started before the context is fully initialized.
@@ -85,7 +85,7 @@ private AsyncEventingBasicConsumer CreateConsumer(IModel channel, string queueNa
8585
return new AsyncEventingBasicConsumer(channel);
8686
}
8787

88-
private sealed class Subscription : IDisposable
88+
private sealed class Subscription : IAsyncDisposable
8989
{
9090
private readonly Action _unsubscribe;
9191
private bool _disposed;
@@ -95,15 +95,17 @@ public Subscription(Action unsubscribe)
9595
_unsubscribe = unsubscribe;
9696
}
9797

98-
public void Dispose()
98+
public ValueTask DisposeAsync()
9999
{
100100
if (_disposed)
101101
{
102-
return;
102+
return ValueTask.CompletedTask;
103103
}
104104

105105
_unsubscribe();
106106
_disposed = true;
107+
108+
return ValueTask.CompletedTask;
107109
}
108110
}
109111
}

src/HotChocolate/Core/src/Subscriptions.Redis/RedisTopic.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public RedisTopic(
2323
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
2424
}
2525

26-
protected override async ValueTask<IDisposable> OnConnectAsync(
26+
protected override async ValueTask<IAsyncDisposable> OnConnectAsync(
2727
CancellationToken cancellationToken)
2828
{
2929
// We ensure that the processing is not started before the context is fully initialized.
@@ -37,7 +37,7 @@ protected override async ValueTask<IDisposable> OnConnectAsync(
3737
return new Session(Name, _connection, DiagnosticEvents);
3838
}
3939

40-
private sealed class Session : IDisposable
40+
private sealed class Session : IAsyncDisposable
4141
{
4242
private readonly string _name;
4343
private readonly IConnectionMultiplexer _connection;
@@ -54,11 +54,11 @@ public Session(
5454
_diagnosticEvents = diagnosticEvents;
5555
}
5656

57-
public void Dispose()
57+
public async ValueTask DisposeAsync()
5858
{
5959
if (!_disposed)
6060
{
61-
_connection.GetSubscriber().Unsubscribe(_name);
61+
await _connection.GetSubscriber().UnsubscribeAsync(_name).ConfigureAwait(false);
6262
_diagnosticEvents.ProviderTopicInfo(_name, RedisTopic_UnsubscribedFromRedis);
6363
_disposed = true;
6464
}

src/HotChocolate/Core/src/Subscriptions/DefaultTopic.cs

+7-6
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ internal async ValueTask ConnectAsync(CancellationToken ct = default)
193193
}
194194
}
195195

196-
private void BeginProcessing(IDisposable session)
196+
private void BeginProcessing(IAsyncDisposable session)
197197
=> ProcessMessagesSessionAsync(session).FireAndForget();
198198

199-
private async Task ProcessMessagesSessionAsync(IDisposable session)
199+
private async Task ProcessMessagesSessionAsync(IAsyncDisposable session)
200200
{
201201
try
202202
{
@@ -208,7 +208,7 @@ private async Task ProcessMessagesSessionAsync(IDisposable session)
208208
}
209209
finally
210210
{
211-
session.Dispose();
211+
await session.DisposeAsync().ConfigureAwait(false);
212212
DiagnosticEvents.Disconnected(Name);
213213
}
214214
}
@@ -372,7 +372,7 @@ private MessageEnvelope<TMessage> DeserializeMessage(IMessageSerializer serializ
372372
/// <returns>
373373
/// Returns a session to dispose the subscription session.
374374
/// </returns>
375-
protected virtual ValueTask<IDisposable> OnConnectAsync(
375+
protected virtual ValueTask<IAsyncDisposable> OnConnectAsync(
376376
CancellationToken cancellationToken)
377377
=> new(DefaultSession.Instance);
378378

@@ -407,10 +407,11 @@ protected virtual void Dispose(bool disposing)
407407
}
408408
}
409409

410-
private sealed class DefaultSession : IDisposable
410+
private sealed class DefaultSession : IAsyncDisposable
411411
{
412412
private DefaultSession() { }
413-
public void Dispose() { }
413+
414+
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
414415

415416
public static readonly DefaultSession Instance = new();
416417
}

src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public async Task Usubscribe_Should_StopListeningToMessages()
265265
SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1));
266266

267267
// Act
268-
disposable.Dispose();
268+
await disposable.DisposeAsync();
269269
await testChannel.SendMessageAsync("aaaaaaaaaaaaaaaaaaaaaaaa:dGVzdA==:foobar");
270270

271271
// Assert

0 commit comments

Comments
 (0)