Skip to content

2.8 Communication channels

Marjan Nikolovski edited this page May 13, 2022 · 2 revisions

Purpose

An enterprise service bus implements a communication channel between mutually interacting software applications in a service-oriented architecture. It represents a software architecture for distributed computing, and is a special variant of the more general client-server model, where any application may behave as server or client. Enterprise service bus promotes agility and flexibility with regard to high-level protocol communication between applications. Its primary use is in enterprise application integration of heterogeneous and complex service landscapes.

Signals offers multiple implementations for the Communication channels aspect out of the box:

  • Signals.Aspects.CommunicationChannels.MsSql
  • Signals.Aspects.CommunicationChannels.ServiceBus

Only one implementation can be active at a time.

How to use

When we implement a distributed process a commuinication channel is automatically used when the api request finishes with execution, the request and response are mapped into transient data and they are sent via communication channel to the background application. The communication channel name is automatically built from the distributed process name, so we must make sure each distributed process name is unique.

public class MyDistributedProcess : DistributedProcess<MyTransientData, MyApiData, MethodResult<MyApiResult>>
{

    #region Executed in web application

    /// <summary>
    /// Authenticate process
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    public override MethodResult<MyApiResult> Auth(MyApiData request)
    {
        return Ok();
    }

    /// <summary>
    /// Validate process
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    public override MethodResult<MyApiResult> Validate(MyApiData request)
    {
        return Ok();
    }

    /// <summary>
    /// Handle process
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    public override MethodResult<MyApiResult> Handle(MyApiData request)
    {
        return Ok();
    }

    /// <summary>
    /// Map request and response to transient data
    /// </summary>
    /// <param name="request"></param>
    /// <param name="response"></param>
    /// <returns></returns>
    public override MyTransientData Map(MyApiData request, MethodResult<MyApiResult> response)
    {
        return new MyTransientData();
    }

    #endregion

    #region Executed in background application

    /// <summary>
    /// Execute background logic
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    public override VoidResult Work(MyTransientData request)
    {
        return Ok();
    }

    #endregion
}

Other more manual way of using communication channels is subscribing and publishing to channels using the IMessageChannel instance.

public class MyClass
{
    public void PublishMyData()
    {
        var data = new MyData();
        var messageChannel = SystemBootstrapper.GetInstance<IMessageChannel>();

        messageChannel.Publish<MyData>("my_channel", data);
    }

    public void SubscribeToMyChannel()
    {
        var messageChannel = SystemBootstrapper.GetInstance<IMessageChannel>();

        messageChannel.Subscribe<MyData>("my_channel", (data) => {
            // do something with data
        });
    }
}

Every process contains an instance of IMessageChannel in its this.Context.Channel, but it is not recommended to be used.

Configuration

We configure the communication channel aspect by using an instance of Signals.Aspects.CommunicationChannels.Configurations.IChannelConfiguration which we pass in the ApplicationBootstrapConfiguration instance (web or background) at startup.

Properties explanation

ServiceBusChannelConfiguration

  • ConnectionString: Azure servicee bus connection string
  • ChannelPrefix: Prefix of each channel for our application. Helps if multiple applications are using the same bus.
  • MaxConcurrentCalls: Number of concurrently processed event messages. Default 1

MsSqlChannelConfiguration

  • ConnectionString: MsSql connection string
  • DbTableName: Name of table that the events are stored in. Default "EventMessage"
  • MessageListeningStrategy:
  • MessageListeningStrategy.Broker: Channel is listening for changes published by the MsSQL server broker
  • MessageListeningStrategy.LongPolling: A poll Sql query is executed for changes
  • LongPollingTimeout: If the strategy time is LongPolling, how log of a timeout we should have between each poll request. Default 100 milliseconds

When using message broker, make sure the MSSQL Broker is enabled for the database:

ALTER DATABASE [DATABASE NAME] SET ENABLE_BROKER WITH ROLLBACK IMMEDIATE

Examples

Using Signals.Aspects.CommunicationChannels.ServiceBus

services
    .AddSignals(config =>
    {
        config.ChannelConfiguration = new ServiceBusChannelConfiguration
        {
            ConnectionString = DomainConfiguration.Instance.NotificationConfiguration.ConnectionString,
            ChannelPrefix = "my_app_",
            MaxConcurrentCalls = 10
        };
    });

Using Signals.Aspects.CommunicationChannels.MsSql

services
    .AddSignals(config =>
    {
        config.ChannelConfiguration = new MsSqlChannelConfiguration
        {
            ConnectionString = DomainConfiguration.Instance.DatabaseConfiguration.ConnectionString,
            DbTableName = $"EventMessage",
            MessageListeningStrategy = MessageListeningStrategy.LongPolling,
            LongPollingTimeout = 100
        };
    });

Extending communication channels

  1. Install package Signals.Aspects.CommunicationChannels
  2. Create class with implementation of Signals.Aspects.CommunicationChannels.Configurations.IChannelConfiguration
/// <summary>
/// Message channel configuration contract
/// </summary>
public class MyChannelConfiguration : IChannelConfiguration
{
    /// <summary>
    /// Custom property
    /// </summary>
    public string MyProperty { get; set; }
}
  1. Create class with implementation of Signals.Aspects.CommunicationChannels.IMessageChannel
/// <summary>
/// Message channel contract
/// </summary>
public class MyMessageChannel : IMessageChannel
{
    private MyChannelConfiguration _configuraiton;

    /// <summary>
    /// CTOR
    /// </summary>
    /// <param name="configuraiton"></param>
    public MyMessageChannel(MyChannelConfiguration configuraiton)
    {
        _configuraiton = configuraiton;
    }

    /// <summary>
    /// Close the subscriber
    /// </summary>
    /// <returns></returns>
    Task Close();

    /// <summary>
    ///  Publish a message
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    Task Publish<T>(T message) where T : class;

    /// <summary>
    /// Publish a message on a predefined channel
    /// </summary>
    /// <param name="channelName"></param>
    /// <param name="message"></param>
    /// <returns></returns>
    Task Publish<T>(string channelName, T message) where T : class;

    /// <summary>
    /// Subscribe to a message type
    /// </summary>
    /// <param name="action"></param>
    /// <returns></returns>
    Task Subscribe<T>(Action<T> action) where T : class;

    /// <summary>
    /// Subscribe to a message type on a predefined channel
    /// </summary>
    /// <param name="channelName"></param>
    /// <param name="action"></param>
    /// <returns></returns>
    Task Subscribe<T>(string channelName, Action<T> action) where T : class;
}
  1. Use our implementation of IChannelConfiguration when configuring our application
public static IServiceProvider AddSignals(this IServiceCollection services)
{
    services
        .AddSignals(config =>
        {
            config.ChannelConfiguration = new MyChannelConfiguration
            {
                MyProperty = "my_value"
            };
        });
}
Clone this wiki locally