Skip to content

Commit c097047

Browse files
committed
expose NpgsqlDataSource along with connection string - close #16
1 parent eadba73 commit c097047

13 files changed

+69
-51
lines changed

src/Blumchen.DependencyInjection/Configuration/DatabaseOptions.cs

-2
This file was deleted.

src/Blumchen.DependencyInjection/Workers/Worker.cs

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
using System.Collections.Concurrent;
22
using System.Text.Json.Serialization;
3-
using Blumchen.Configuration;
43
using Blumchen.Serialization;
54
using Blumchen.Subscriptions;
65
using Blumchen.Subscriptions.Management;
76
using Microsoft.Extensions.Hosting;
87
using Microsoft.Extensions.Logging;
8+
using Npgsql;
99
using Polly;
1010

1111

1212
namespace Blumchen.Workers;
1313

1414
public abstract class Worker<T>(
15-
DatabaseOptions databaseOptions,
15+
NpgsqlDataSource dataSource,
16+
string connectionString,
1617
IHandler<T> handler,
1718
JsonSerializerContext jsonSerializerContext,
1819
IErrorProcessor errorProcessor,
@@ -21,9 +22,8 @@ public abstract class Worker<T>(
2122
PublicationManagement.PublicationSetupOptions publicationSetupOptions,
2223
ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions,
2324
Func<TableDescriptorBuilder,TableDescriptorBuilder> tableDescriptorBuilder,
24-
ILoggerFactory loggerFactory): BackgroundService where T : class
25+
ILogger logger): BackgroundService where T : class
2526
{
26-
private readonly ILogger<Worker<T>> _logger = loggerFactory.CreateLogger<Worker<T>>();
2727
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
2828
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
2929
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters)
@@ -33,9 +33,9 @@ static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled)
3333
{
3434
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters),
3535
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters),
36-
(_, _) => (_, __, ___) => { }
36+
(_, _) => (_, _, _) => { }
3737
};
38-
LoggingActions.GetOrAdd(template,s => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
38+
LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
3939
}
4040

4141
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@@ -45,21 +45,22 @@ await pipeline.ExecuteAsync(async token =>
4545
await using var subscription = new Subscription();
4646
await using var cursor = subscription.Subscribe(builder =>
4747
builder
48-
.ConnectionString(databaseOptions.ConnectionString)
48+
.DataSource(dataSource)
49+
.ConnectionString(connectionString)
4950
.WithTable(tableDescriptorBuilder)
5051
.WithErrorProcessor(errorProcessor)
5152
.Handles<T, IHandler<T>>(handler)
5253
.NamingPolicy(namingPolicy)
5354
.JsonContext(jsonSerializerContext)
5455
.WithPublicationOptions(publicationSetupOptions)
5556
.WithReplicationOptions(replicationSlotSetupOptions)
56-
, ct: token, loggerFactory: loggerFactory).GetAsyncEnumerator(token);
57-
Notify(_logger, LogLevel.Information,"{WorkerName} started", WorkerName);
57+
, ct: token).GetAsyncEnumerator(token);
58+
Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName);
5859
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
59-
Notify(_logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);
60+
Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);
6061

6162
}, stoppingToken).ConfigureAwait(false);
62-
Notify(_logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
63+
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
6364
return;
6465
}
6566

src/Blumchen/Serialization/ITypeResolver.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ internal sealed class JsonTypeResolver(
2424
internal void WhiteList(Type type)
2525
{
2626
var typeInfo = SerializationContext.GetTypeInfo(type) ?? throw new NotSupportedException(type.FullName);
27-
_typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (s,t) =>typeInfo.Type);
28-
_typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,__)=> typeInfo);
27+
_typeDictionary.AddOrUpdate(_namingPolicy.Bind(typeInfo.Type), _ => typeInfo.Type, (_,_) =>typeInfo.Type);
28+
_typeInfoDictionary.AddOrUpdate(typeInfo.Type, _ => typeInfo, (_,_)=> typeInfo);
2929
}
3030

3131
public (string, JsonTypeInfo) Resolve(Type type) =>

src/Blumchen/Subscriptions/ISubscriptionOptions.cs

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
using Blumchen.Subscriptions.Replication;
22
using JetBrains.Annotations;
3+
using Npgsql;
34
using static Blumchen.Subscriptions.Management.PublicationManagement;
45
using static Blumchen.Subscriptions.Management.ReplicationSlotManagement;
56

67
namespace Blumchen.Subscriptions;
78

89
internal interface ISubscriptionOptions
910
{
10-
[UsedImplicitly] string ConnectionString { get; }
11+
[UsedImplicitly] NpgsqlDataSource DataSource { get; }
12+
[UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; }
1113
IReplicationDataMapper DataMapper { get; }
1214
[UsedImplicitly] PublicationSetupOptions PublicationOptions { get; }
1315
[UsedImplicitly] ReplicationSlotSetupOptions ReplicationOptions { get; }
1416
[UsedImplicitly] IErrorProcessor ErrorProcessor { get; }
1517

1618
void Deconstruct(
17-
out string connectionString,
19+
out NpgsqlDataSource dataSource,
20+
out NpgsqlConnectionStringBuilder connectionStringBuilder,
1821
out PublicationSetupOptions publicationSetupOptions,
1922
out ReplicationSlotSetupOptions replicationSlotSetupOptions,
2023
out IErrorProcessor errorProcessor,
@@ -23,7 +26,8 @@ void Deconstruct(
2326
}
2427

2528
internal record SubscriptionOptions(
26-
string ConnectionString,
29+
NpgsqlDataSource DataSource,
30+
NpgsqlConnectionStringBuilder ConnectionStringBuilder,
2731
PublicationSetupOptions PublicationOptions,
2832
ReplicationSlotSetupOptions ReplicationOptions,
2933
IErrorProcessor ErrorProcessor,

src/Blumchen/Subscriptions/Subscription.cs

+3-8
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,17 @@ public enum CreateStyle
3030
private ISubscriptionOptions? _options;
3131
public async IAsyncEnumerable<IEnvelope> Subscribe(
3232
Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder> builder,
33-
ILoggerFactory? loggerFactory = null,
3433
[EnumeratorCancellation] CancellationToken ct = default
3534
)
3635
{
3736
_options = builder(_builder).Build();
38-
var (connectionString, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;
39-
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connectionString);
40-
dataSourceBuilder.UseLoggerFactory(loggerFactory);
41-
42-
var dataSource = dataSourceBuilder.Build();
37+
var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;
38+
4339
await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false);
4440

45-
_connection = new LogicalReplicationConnection(connectionString);
41+
_connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString);
4642
await _connection.Open(ct).ConfigureAwait(false);
4743

48-
4944
await dataSource.SetupPublication(publicationSetupOptions, ct).ConfigureAwait(false);
5045
var result = await dataSource.SetupReplicationSlot(_connection, replicationSlotSetupOptions, ct).ConfigureAwait(false);
5146

src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs

+16-5
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
using Blumchen.Subscriptions.Management;
33
using Blumchen.Subscriptions.Replication;
44
using JetBrains.Annotations;
5+
using Npgsql;
56
using System.Text.Json.Serialization;
67

78
namespace Blumchen.Subscriptions;
89

910
public sealed class SubscriptionOptionsBuilder
1011
{
11-
private static string? _connectionString;
12+
private static NpgsqlConnectionStringBuilder? _connectionStringBuilder;
13+
private static NpgsqlDataSource? _dataSource;
1214
private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions;
1315
private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions;
1416
private static IReplicationDataMapper? _dataMapper;
@@ -22,7 +24,7 @@ public sealed class SubscriptionOptionsBuilder
2224

2325
static SubscriptionOptionsBuilder()
2426
{
25-
_connectionString = null;
27+
_connectionStringBuilder = default;
2628
_publicationSetupOptions = new();
2729
_replicationSlotSetupOptions = default;
2830
_dataMapper = default;
@@ -40,7 +42,14 @@ public SubscriptionOptionsBuilder WithTable(
4042
[UsedImplicitly]
4143
public SubscriptionOptionsBuilder ConnectionString(string connectionString)
4244
{
43-
_connectionString = connectionString;
45+
_connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString);
46+
return this;
47+
}
48+
49+
[UsedImplicitly]
50+
public SubscriptionOptionsBuilder DataSource(NpgsqlDataSource dataSource)
51+
{
52+
_dataSource = dataSource;
4453
return this;
4554
}
4655

@@ -91,7 +100,8 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce
91100
internal ISubscriptionOptions Build()
92101
{
93102
_messageTable ??= TableDescriptorBuilder.Build();
94-
ArgumentNullException.ThrowIfNull(_connectionString);
103+
ArgumentNullException.ThrowIfNull(_connectionStringBuilder);
104+
ArgumentNullException.ThrowIfNull(_dataSource);
95105
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
96106

97107
var typeResolver = new JsonTypeResolver(_jsonSerializerContext, _namingPolicy);
@@ -104,7 +114,8 @@ internal ISubscriptionOptions Build()
104114
if (_registry.Count == 0)_registry.Add(typeof(object), new ObjectTracingConsumer());
105115

106116
return new SubscriptionOptions(
107-
_connectionString,
117+
_dataSource,
118+
_connectionStringBuilder,
108119
_publicationSetupOptions,
109120
_replicationSlotSetupOptions ?? new ReplicationSlotManagement.ReplicationSlotSetupOptions(),
110121
_errorProcessor ?? new ConsoleOutErrorProcessor(),

src/Subscriber/Program.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Blumchen.Subscriptions;
33
using Commons;
44
using Microsoft.Extensions.Logging;
5+
using Npgsql;
56
using Subscriber;
67

78
#pragma warning disable CS8601 // Possible null reference assignment.
@@ -20,8 +21,11 @@
2021

2122
try
2223
{
24+
var dataSourceBuilder = new NpgsqlDataSourceBuilder(Settings.ConnectionString)
25+
.UseLoggerFactory(LoggerFactory.Create(builder => builder.AddConsole()));
2326
var cursor = subscription.Subscribe(
2427
builder => builder
28+
.DataSource(dataSourceBuilder.Build())
2529
.ConnectionString(Settings.ConnectionString)
2630
.WithTable(options => options
2731
.Id("id")
@@ -31,7 +35,7 @@
3135
.NamingPolicy(new AttributeNamingPolicy())
3236
.JsonContext(SourceGenerationContext.Default)
3337
.Handles<UserCreatedContract, Consumer>(consumer)
34-
.Handles<UserDeletedContract, Consumer>(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct
38+
.Handles<UserDeletedContract, Consumer>(consumer), ct:ct
3539
).GetAsyncEnumerator(ct);
3640
await using var cursor1 = cursor.ConfigureAwait(false);
3741
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested);

src/SubscriberWorker/Program.cs

+13-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Text.Json.Serialization;
2-
using Blumchen.Configuration;
32
using Blumchen.Serialization;
43
using Blumchen.Subscriptions;
54
using Blumchen.Workers;
@@ -10,6 +9,7 @@
109
using Polly.Retry;
1110
using Polly;
1211
using SubscriberWorker;
12+
using Npgsql;
1313

1414

1515
#pragma warning disable CS8601 // Possible null reference assignment.
@@ -29,19 +29,21 @@
2929
.AddSingleton<IHandler<UserCreatedContract>, Handler<UserCreatedContract>>()
3030
.AddBlumchen<SubscriberWorker<UserDeletedContract>, UserDeletedContract>()
3131
.AddSingleton<IHandler<UserDeletedContract>, Handler<UserDeletedContract>>()
32-
32+
.AddSingleton(Settings.ConnectionString)
33+
.AddTransient(sp =>
34+
new NpgsqlDataSourceBuilder(Settings.ConnectionString)
35+
.UseLoggerFactory(sp.GetRequiredService<ILoggerFactory>()).Build())
3336
.AddSingleton<INamingPolicy, AttributeNamingPolicy>()
3437
.AddSingleton<IErrorProcessor, ConsoleOutErrorProcessor>()
3538
.AddSingleton<JsonSerializerContext, SourceGenerationContext>()
36-
.AddSingleton(new DatabaseOptions(Settings.ConnectionString))
37-
.AddResiliencePipeline("default",(pipelineBuilder,context) =>
39+
.AddResiliencePipeline("default", (pipelineBuilder, _) =>
3840
pipelineBuilder
39-
.AddRetry(new RetryStrategyOptions
40-
{
41-
BackoffType = DelayBackoffType.Constant,
42-
Delay = TimeSpan.FromSeconds(5),
43-
MaxRetryAttempts = int.MaxValue
44-
}).Build())
41+
.AddRetry(new RetryStrategyOptions
42+
{
43+
BackoffType = DelayBackoffType.Constant,
44+
Delay = TimeSpan.FromSeconds(5),
45+
MaxRetryAttempts = int.MaxValue
46+
}).Build())
4547
.AddLogging(loggingBuilder =>
4648
{
4749
loggingBuilder
@@ -51,7 +53,7 @@
5153
.AddFilter("Blumchen", LogLevel.Debug)
5254
.AddFilter("SubscriberWorker", LogLevel.Debug)
5355
.AddSimpleConsole();
54-
});
56+
}).AddSingleton<ILogger>(sp => sp.GetRequiredService<ILoggerFactory>().CreateLogger<ILogger>());
5557

5658
await builder
5759
.Build()
+7-5
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
using System.Text.Json.Serialization;
2-
using Blumchen.Configuration;
32
using Blumchen.Serialization;
43
using Blumchen.Subscriptions;
54
using Blumchen.Subscriptions.Management;
65
using Blumchen.Workers;
76
using Microsoft.Extensions.Logging;
7+
using Npgsql;
88
using Polly.Registry;
99
// ReSharper disable ClassNeverInstantiated.Global
1010

1111
namespace SubscriberWorker;
1212
public class SubscriberWorker<T>(
13-
DatabaseOptions databaseOptions,
13+
NpgsqlDataSource dataSource,
14+
string connectionString,
1415
IHandler<T> handler,
1516
JsonSerializerContext jsonSerializerContext,
1617
ResiliencePipelineProvider<string> pipelineProvider,
1718
INamingPolicy namingPolicy,
1819
IErrorProcessor errorProcessor,
19-
ILoggerFactory loggerFactory
20-
): Worker<T>(databaseOptions
20+
ILogger logger
21+
): Worker<T>(dataSource
22+
, connectionString
2123
, handler
2224
, jsonSerializerContext
2325
, errorProcessor
@@ -26,4 +28,4 @@ ILoggerFactory loggerFactory
2628
, new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub")
2729
, new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot")
2830
, tableDescriptorBuilder => tableDescriptorBuilder.UseDefaults()
29-
, loggerFactory) where T : class;
31+
, logger) where T : class;

src/Tests/DatabaseFixture.cs

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri
8181
var consumer = new TestHandler<T>(log, jsonTypeInfo);
8282
var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder()
8383
.WithErrorProcessor(new TestOutErrorProcessor(Output))
84+
.DataSource(new NpgsqlDataSourceBuilder(connectionString).Build())
8485
.ConnectionString(connectionString)
8586
.JsonContext(info)
8687
.NamingPolicy(namingPolicy)

src/Tests/When_Subscription_Already_Exists.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ await MessageAppender.AppendAsync(
4646

4747
var subscription = new Subscription();
4848
await using var subscription1 = subscription.ConfigureAwait(false);
49-
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
49+
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
5050
{
5151
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
5252
return;

src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Empty.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task Read_from_table_using_named_transaction_snapshot()
3838
SubscriberContext.Default, sharedNamingPolicy, Output.WriteLine);
3939
var subscription = new Subscription();
4040
await using var subscription1 = subscription.ConfigureAwait(false);
41-
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
41+
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
4242
{
4343
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
4444
return;

src/Tests/When_Subscription_Does_Not_Exist_And_Table_Is_Not_Empty.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public async Task Read_from_table_using_named_transaction_snapshot()
4040
var subscription = new Subscription();
4141
await using var subscription1 = subscription.ConfigureAwait(false);
4242

43-
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, null, ct).ConfigureAwait(false))
43+
await foreach (var envelope in subscription.Subscribe(_ => subscriptionOptions, ct).ConfigureAwait(false))
4444
{
4545
Assert.Equal(@expected, ((OkEnvelope)envelope).Value);
4646
return;

0 commit comments

Comments
 (0)