Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Bindings/RabbitMQAsyncCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public RabbitMQAsyncCollector(RabbitMQContext context, ILogger logger)

public Task AddAsync(byte[] message, CancellationToken cancellationToken = default)
{
_batch.Add(exchange: string.Empty, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message);
_batch.Add(exchange: _context.ResolvedAttribute.ExchangeName, routingKey: _context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message);
_logger.LogDebug($"Adding message to batch for publishing...");

return Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions src/Config/DefaultRabbitMQServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
internal class DefaultRabbitMQServiceFactory : IRabbitMQServiceFactory
{
public IRabbitMQService CreateService(string connectionString, string hostName, string queueName, string userName, string password, int port)
public IRabbitMQService CreateService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port)
{
return new RabbitMQService(connectionString, hostName, queueName, userName, password, port);
return new RabbitMQService(connectionString, queueName, exchangeName, hostName, userName, password, port);
}

public IRabbitMQService CreateService(string connectionString, string hostName, string userName, string password, int port)
Expand Down
2 changes: 1 addition & 1 deletion src/Config/IRabbitMQServiceFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
public interface IRabbitMQServiceFactory
{
IRabbitMQService CreateService(string connectionString, string hostName, string queueName, string userName, string password, int port);
IRabbitMQService CreateService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port);

IRabbitMQService CreateService(string connectionString, string hostName, string userName, string password, int port);
}
Expand Down
18 changes: 14 additions & 4 deletions src/Config/RabbitMQExtensionConfigProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,22 @@ public void ValidateBinding(RabbitMQAttribute attribute, Type type)
{
throw new InvalidOperationException("RabbitMQ username and password required if not connecting to localhost");
}

string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName);
string exchangeName = Utility.FirstOrDefault(attribute.ExchangeName, _options.Value.ExchangeName);
_logger.LogInformation($"Queue: {queueName} and Exchange {exchangeName}");
if (string.IsNullOrEmpty(queueName) && string.IsNullOrEmpty(exchangeName))
{
throw new InvalidOperationException("One of queueName or exchangeName should be provided");
}
}

internal RabbitMQContext CreateContext(RabbitMQAttribute attribute)
{
string connectionString = Utility.FirstOrDefault(attribute.ConnectionStringSetting, _options.Value.ConnectionString);
string hostName = Utility.FirstOrDefault(attribute.HostName, _options.Value.HostName);
string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName);
string queueName = Utility.FirstOrDefault(attribute.QueueName, _options.Value.QueueName) ?? string.Empty;
string exchangeName = Utility.FirstOrDefault(attribute.ExchangeName, _options.Value.ExchangeName) ?? string.Empty;
string userName = Utility.FirstOrDefault(attribute.UserName, _options.Value.UserName);
string password = Utility.FirstOrDefault(attribute.Password, _options.Value.Password);
int port = Utility.FirstOrDefault(attribute.Port, _options.Value.Port);
Expand All @@ -92,12 +101,13 @@ internal RabbitMQContext CreateContext(RabbitMQAttribute attribute)
ConnectionStringSetting = connectionString,
HostName = hostName,
QueueName = queueName,
ExchangeName = exchangeName,
UserName = userName,
Password = password,
Port = port,
};

service = GetService(connectionString, hostName, queueName, userName, password, port);
service = GetService(connectionString, queueName, exchangeName, hostName, userName, password, port);

return new RabbitMQContext
{
Expand All @@ -106,9 +116,9 @@ internal RabbitMQContext CreateContext(RabbitMQAttribute attribute)
};
}

internal IRabbitMQService GetService(string connectionString, string hostName, string queueName, string userName, string password, int port)
internal IRabbitMQService GetService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port)
{
return _rabbitMQServiceFactory.CreateService(connectionString, hostName, queueName, userName, password, port);
return _rabbitMQServiceFactory.CreateService(connectionString, queueName, exchangeName, hostName, userName, password, port);
}

// Overloaded method used only for getting the RabbitMQ client
Expand Down
6 changes: 6 additions & 0 deletions src/Config/RabbitMQOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public RabbitMQOptions()
/// </summary>
public string QueueName { get; set; }

/// <summary>
/// Gets or sets the ExchangeName to enqueue messages to.
/// </summary>
public string ExchangeName { get; set; }

/// <summary>
/// Gets or sets the UserName used to authenticate with RabbitMQ.
/// </summary>
Expand Down Expand Up @@ -58,6 +63,7 @@ public string Format()
{
{ nameof(HostName), HostName },
{ nameof(QueueName), QueueName },
{ nameof(ExchangeName), ExchangeName },
{ nameof(Port), Port },
{ nameof(PrefetchCount), PrefetchCount },
};
Expand Down
8 changes: 7 additions & 1 deletion src/RabbitMQAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,11 @@ public sealed class RabbitMQAttribute : Attribute
/// </summary>
[ConnectionString]
public string ConnectionStringSetting { get; set; }

/// <summary>
/// Gets or sets the ExchangeName to send messages to.
/// </summary>
[AutoResolve]
public string ExchangeName { get; set; }
}
}
}
15 changes: 13 additions & 2 deletions src/Services/RabbitMQService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal sealed class RabbitMQService : IRabbitMQService
private readonly string _connectionString;
private readonly string _hostName;
private readonly string _queueName;
private readonly string _exchangeName;
private readonly string _userName;
private readonly string _password;
private readonly int _port;
Expand All @@ -31,13 +32,23 @@ public RabbitMQService(string connectionString, string hostName, string userName
_model = connectionFactory.CreateConnection().CreateModel();
}

public RabbitMQService(string connectionString, string hostName, string queueName, string userName, string password, int port)
public RabbitMQService(string connectionString, string queueName, string exchangeName, string hostName, string userName, string password, int port)
: this(connectionString, hostName, userName, password, port)
{
_rabbitMQModel = new RabbitMQModel(_model);
_queueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
_exchangeName = exchangeName ?? throw new ArgumentNullException(nameof(exchangeName));

if (!string.IsNullOrEmpty(_queueName))
{
_model.QueueDeclarePassive(_queueName);
}

if (!string.IsNullOrEmpty(_exchangeName))
{
_model.ExchangeDeclarePassive(_exchangeName); // Throws exception if exchange doesn't exist
}

_model.QueueDeclarePassive(_queueName); // Throws exception if queue doesn't exist
_batch = _model.CreateBasicPublishBatch();
}

Expand Down
2 changes: 1 addition & 1 deletion src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
throw new InvalidOperationException("RabbitMQ username and password required if not connecting to localhost");
}

IRabbitMQService service = _provider.GetService(connectionString, hostName, queueName, userName, password, port);
IRabbitMQService service = _provider.GetService(connectionString, queueName, string.Empty, hostName, userName, password, port);

return Task.FromResult<ITriggerBinding>(new RabbitMQTriggerBinding(service, hostName, queueName, _logger, parameter.ParameterType, _options.Value.PrefetchCount));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ private string GetFormattedOption(RabbitMQOptions option)
{
{ nameof(option.HostName), option.HostName },
{ nameof(option.QueueName), option.QueueName },
{ nameof(option.ExchangeName), option.ExchangeName },
{ nameof(option.Port), option.Port },
{ nameof(option.PrefetchCount), option.PrefetchCount },
};
Expand Down Expand Up @@ -50,6 +51,7 @@ public void TestConfiguredRabbitMQOptions()
int expectedPort = 8080;
string expectedHostName = "someHostName";
string expectedQueueName = "someQueueName";
string expectedExchangeName = "someExchangeName";
string expectedUserName = "someUserName";
string expectedPassword = "somePassword";
string expectedConnectionString = "someConnectionString";
Expand All @@ -58,6 +60,7 @@ public void TestConfiguredRabbitMQOptions()
Port = expectedPort,
HostName = expectedHostName,
QueueName = expectedQueueName,
ExchangeName = expectedExchangeName,
UserName = expectedUserName,
Password = expectedPassword,
ConnectionString = expectedConnectionString,
Expand All @@ -68,6 +71,7 @@ public void TestConfiguredRabbitMQOptions()
Assert.Equal(expectedPort, options.Port);
Assert.Equal(expectedHostName, options.HostName);
Assert.Equal(expectedQueueName, options.QueueName);
Assert.Equal(expectedExchangeName, options.ExchangeName);
Assert.Equal(expectedUserName, options.UserName);
Assert.Equal(expectedPassword, options.Password);
Assert.Equal(expectedConnectionString, options.ConnectionString);
Expand Down