Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/SlimMessageBus.Host.Outbox/Services/OutboxSendingTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ async internal Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxM
{
const int defaultBatchSize = 50;

var runAgain = outboxMessages.Count == _outboxSettings.PollBatchSize;
var runAgain = outboxMessages.Count == _outboxSettings.PollBatchSize;
var failed = false;
var count = 0;

var aborted = new List<TOutboxMessage>(_outboxSettings.PollBatchSize);
Expand Down Expand Up @@ -288,21 +289,21 @@ async internal Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxM
.Where(x => x != null)
.Batch(bulkProducer.MaxMessagesPerTransaction ?? defaultBatchSize);

foreach (var batch in batches)
{
var (success, published) = await DispatchBatch(outboxRepository, bulkProducer, messageBusTarget, batch, busName, path, cancellationToken);
runAgain |= !success;
count += published;
}
}
}
foreach (var batch in batches)
{
var (success, published) = await DispatchBatch(outboxRepository, bulkProducer, messageBusTarget, batch, busName, path, cancellationToken);
failed |= !success;
count += published;
}
}
}

if (aborted.Count > 0)
{
await outboxRepository.AbortDelivery(aborted, cancellationToken);
}

return (runAgain, count);
return (!failed && runAgain, count);
}

async internal Task<(bool Success, int Published)> DispatchBatch(IOutboxMessageRepository<TOutboxMessage> outboxRepository, ITransportBulkProducer producer, IMessageBusTarget messageBusTarget, IReadOnlyCollection<OutboxBulkMessage> batch, string busName, string path, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.Test" />
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.Sql.DbContext.Test" />
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.PostgreSql.DbContext.Test" />
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.Test" />
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.PostgreSql.Test" />
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.Sql.DbContext.Test" />
<InternalsVisibleTo Include="SlimMessageBus.Host.Outbox.PostgreSql.DbContext.Test" />
<InternalsVisibleTo Include="DynamicProxyGenAssembly2" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,76 @@ public async Task SentMessages_AreNotIncluded()
}
}

public class SendMessagesTests(PostgreSqlFixture postgreSqlFixture) : BasePostgreSqlOutboxRepositoryTest(postgreSqlFixture)
{
private const string InstanceId = "outbox-sender";

[Fact]
public async Task FailedBatch_IsNotRetriedImmediatelyWithSameLockInstance()
{
// arrange
const int messageCount = 5;
const int maxDeliveryAttempts = 3;

_settings.PollBatchSize = messageCount;
_settings.MaxDeliveryAttempts = maxDeliveryAttempts;
_settings.LockExpiration = TimeSpan.FromSeconds(5);

await SeedOutbox(messageCount, (i, message) =>
{
message.MessageType = typeof(TestOutboxMessage).AssemblyQualifiedName;
message.MessagePayload = JsonSerializer.SerializeToUtf8Bytes(new TestOutboxMessage(i));
});

var serializer = new Mock<IMessageSerializer>();
serializer
.Setup(x => x.Deserialize(typeof(TestOutboxMessage), It.IsAny<IReadOnlyDictionary<string, object>>(), It.IsAny<byte[]>(), null))
.Returns((Type _, IReadOnlyDictionary<string, object> _, byte[] payload, object _) => JsonSerializer.Deserialize<TestOutboxMessage>(payload));

var serializerProvider = new Mock<IMessageSerializerProvider>();
serializerProvider.Setup(x => x.GetSerializer(It.IsAny<string>())).Returns(serializer.Object);

var masterBus = new Mock<IMasterMessageBus>();
masterBus.Setup(x => x.SerializerProvider).Returns(serializerProvider.Object);

var bulkProducer = masterBus.As<ITransportBulkProducer>();
bulkProducer.Setup(x => x.MaxMessagesPerTransaction).Returns((int?)null);
bulkProducer
.Setup(x => x.ProduceToTransportBulk(It.IsAny<IReadOnlyCollection<OutboxSendingTask<PostgreSqlOutboxMessage>.OutboxBulkMessage>>(), It.IsAny<string>(), It.IsAny<IMessageBusTarget>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new ProduceToTransportBulkResult<OutboxSendingTask<PostgreSqlOutboxMessage>.OutboxBulkMessage>([], new ProducerMessageBusException("Broker unavailable")));

var messageBus = new Mock<IMessageBusTarget>();
messageBus.Setup(x => x.Target).Returns(masterBus.Object);

var lockRenewalTimer = new Mock<IOutboxLockRenewalTimer>();
lockRenewalTimer.Setup(x => x.InstanceId).Returns(InstanceId);
lockRenewalTimer.Setup(x => x.LockDuration).Returns(_settings.LockExpiration);

var lockRenewalTimerFactory = new Mock<IOutboxLockRenewalTimerFactory>();
lockRenewalTimerFactory
.Setup(x => x.CreateRenewalTimer(It.IsAny<TimeSpan>(), It.IsAny<TimeSpan>(), It.IsAny<Action<Exception>>(), It.IsAny<CancellationToken>()))
.Returns(lockRenewalTimer.Object);

var services = new ServiceCollection()
.AddSingleton<IMessageBus>(messageBus.Object)
.AddSingleton(lockRenewalTimerFactory.Object)
.BuildServiceProvider();

var target = new OutboxSendingTask<PostgreSqlOutboxMessage>(NullLoggerFactory.Instance, _settings, services);

// act
var published = await target.SendMessages(services, _target, CancellationToken.None);
var messages = await _target.GetAllMessages(CancellationToken.None);

// assert
published.Should().Be(0);
messages.Should().OnlyContain(x => x.DeliveryAttempt == 1);
messages.Should().OnlyContain(x => !x.DeliveryAborted);
}

private sealed record TestOutboxMessage(int Value);
}

public class IncrementDeliveryAttemptTests(PostgreSqlFixture postgreSqlFixture) : BasePostgreSqlOutboxRepositoryTest(postgreSqlFixture)
{
[Fact]
Expand Down Expand Up @@ -298,4 +368,4 @@ public async Task HasNoLockedItemsToRenew_ReturnsFalse()
actual.Should().BeFalse();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@

global using AwesomeAssertions;

global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Logging.Abstractions;
global using Microsoft.Extensions.Time.Testing;

global using Moq;

global using Npgsql;

global using SlimMessageBus.Host.Outbox.Services;
global using SlimMessageBus.Host.Outbox.PostgreSql.Configuration;
global using SlimMessageBus.Host.Outbox.PostgreSql.Repositories;
global using SlimMessageBus.Host.Outbox.PostgreSql.Services;
global using SlimMessageBus.Host.Outbox.PostgreSql.Transactions;
global using SlimMessageBus.Host.Serialization;

global using Xunit;
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public async Task ProcessMessages_ShouldReturnCorrectValues_WhenOutboxMessagesPr
}

[Fact]
public async Task ProcessMessages_ShouldReturnRunAgainTrue_WhenOutboxMessagesCountEqualsPollBatchSize()
{
public async Task ProcessMessages_ShouldReturnRunAgainTrue_WhenOutboxMessagesCountEqualsPollBatchSize()
{
// Arrange
var outboxMessages = CreateOutboxMessages(50);
var cancellationToken = CancellationToken.None;
Expand All @@ -170,11 +170,36 @@ public async Task ProcessMessages_ShouldReturnRunAgainTrue_WhenOutboxMessagesCou

// Assert
result.RunAgain.Should().BeTrue();
result.Count.Should().Be(50);
}

[Fact]
public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised()
result.Count.Should().Be(50);
}

[Fact]
public async Task ProcessMessages_ShouldReturnRunAgainFalse_WhenBatchFailsAndOutboxMessagesCountEqualsPollBatchSize()
{
// Arrange
var outboxMessages = CreateOutboxMessages(50);
var cancellationToken = CancellationToken.None;

_mockCompositeMessageBus.Setup(x => x.GetChildBus(It.IsAny<string>())).Returns(_mockMasterMessageBus.Object);
_mockMessageBusBulkProducer.Setup(x => x.MaxMessagesPerTransaction).Returns(10);

_mockMessageBusBulkProducer.Setup(x => x.ProduceToTransportBulk(It.IsAny<IReadOnlyCollection<OutboxBulkMessage>>(), It.IsAny<string>(), It.IsAny<IMessageBusTarget>(), It.IsAny<CancellationToken>()))
.ReturnsAsync((IReadOnlyCollection<OutboxBulkMessage> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken) => new ProduceToTransportBulkResult<OutboxBulkMessage>([], new ProducerMessageBusException("Broker unavailable")));

var mockMessageTypeResolver = new Mock<IMessageTypeResolver>();
mockMessageTypeResolver.Setup(x => x.ToType(It.IsAny<string>())).Returns(typeof(object));
_outboxSettings.MessageTypeResolver = mockMessageTypeResolver.Object;

// Act
var result = await _sut.ProcessMessages(_mockOutboxRepository.Object, outboxMessages, _mockCompositeMessageBus.Object, _mockMessageBusTarget.Object, cancellationToken);

// Assert
result.RunAgain.Should().BeFalse();
result.Count.Should().Be(0);
}

[Fact]
public async Task ProcessMessages_ShouldAbortDelivery_WhenBusIsNotRecognised()
{
// Arrange
const int MessageCount = 10;
Expand Down Expand Up @@ -255,4 +280,4 @@ private static List<OutboxMessage> CreateOutboxMessages(int count)
.ToList();
}
}
}
}
Loading