Implementando um EventBus com RabbitMQ em C#

Neste post mostro como implementar um EventBus, utilizando RabbitMQ, em C#. Este código ainda está em desenvolvimento. Em breve, uma versão mais madura estará disponível no meu Github junto aos Building Blocks para construção de Microsserviços.

Se tiver interesse em entender mais sobre microsserviços, recomendo que acesse o Guia de Conteúdo para Microsserviços deste site.

Esta implementação é uma adaptação da entregue pela Microsoft no projeto eShopOnContainers (semelhanças não são mera coincidência).

O que é um EventBus?

De forma simplificada, um EventBus é um artefato de software que permite que um componente publique notificações (eventos) indicando a ocorrência de algo potencialmente relevante para outros. Então, esse “evento” pode ser “escutado” por esses outros componentes que irão realizar alguma ação correlata.

Por exemplo:

  • O “estoque” pode publicar um evento indicando que um produto chegou a seu nível mínimo. Esse evento poderia ser escutado por “compras” que iniciaria o processo para reposição;
  • O “estoque” poderia publicar um evento indicando que um produto foi reposto. Assim, “vendas” poderia iniciar um processo de envio de emails notificando clientes interessados no produto;
  • O “faturamento” poderia publicar um evento indicando que teve problemas para processar um pagamento. Isso poderia fazer com que “relacionamento” enviasse um email para o cliente indicando possíveis alternativas.

Em uma boa implementação, é irrelevante para o componente que está publicando eventos quais componentes os estão “escutando”. Da mesma forma, é irrelevante para os componentes que estão escutando qual foi o componente que realizou a publicação.

Um EventBus facilita a comunicação assíncrona e desacoplada entre os componentes de uma aplicação. Também facilita a escalabilidade do sistema.

Por que usar RabbitMQ para implementar um EventBus?

RabbitMQ é um poderoso mecanismo de mensageria. Ele permite a adoção de padrões flexíveis para troca de mensagens entre componentes de software. É extremamente sólido (escrito em Erlang) e amplamente adotado pela indústria.

Não reinvente a roda!

RabbitMQ facilita consideravelmente a troca de mensagens em um ambiente de sistema distribuído.

Usando RabbitMQ com Docker

Se você, como eu, não gosta da ideia de ficar instalando frameworks e serviços em seu computador, recomendo que utilize RabbitMQ com Docker. É simples assim:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Feito isso, você terá o RabbitMQ rodando em seu computador com a ferramenta de administração (em http://localhost:15672).

Se ainda não conhece RabbitMQ, gaste algum tempo em algum tutorial na Internet antes de continuar.

A anatomia de um evento

Um evento é uma mensagem indicando que algo ocorreu. Essas mensagens tem um nome que, geralmente, está no passado.

Na minha implementação, decidi criar uma classe abstrata para os eventos.

public abstract class Event
{
    protected Event()
    {
        Id = Guid.NewGuid();
        CreatedAt = DateTime.UtcNow;
    }

    public Guid Id { get; }
    public DateTime CreatedAt { get; }
}

Essa abordagem define um conjunto mínimo de dados que todo evento deverá possuir para poder ser processado (aqui, optei por um Id e por um registro do momento da ocorrência).

Cada evento que um componente for publicar no EventBus deverá ter uma classe especializada.

public class PaymentFailureEvent : Event
{
    public string TransactionId { get; }
    public string Description { get; }

    public PaymentFailureEvent(
        string transactionId,
        string description
        )
    {
        TransactionId = transactionId;
        Description = description;
    }
}

Como um evento é ouvido  por um componente

Para ouvir um evento que será publicado no EventBus, um componente precisa definir um handler. Optei por duas interfaces.

using System.Threading.Tasks;

namespace BuildingBlocks.Messaging.RabbitMQ.EventBus
{
    public interface IEventHandler
    {
        Task Handle(TEvent @event);
    }

    public interface IDynamicEventHandler
    {
        Task Handle(dynamic @event);
    }
}

A ideia da segunda interface é permitir que o componente que está “ouvindo” não precise ter uma classe específica para cada evento.

public class PaymentFailureEventHandler :
    IDynamicEventHandler
{
    public Task Handle(dynamic @event)
    {
        string transactionId = @event.TransactionId;

        // business logic 

        return Task.CompletedTask;
    }
}

É importante indicar que, em um ambiente de microsserviços, considero amplamente recomendável que cada microsserviço defina suas classes “evento” – isso reduz o acoplamento (se as classes tiverem o mesmo nome, e os mesmos elementos, a solução funciona).

A anatomia de um EventBus

O EventBus é um building block que deverá ser consumido por todos os componentes da aplicação (exemplo, microsserviços)

Eis sua definição básica:

public interface IEventBus
{
    void Publish(Event @event);

    void Subscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler;

    void Subscribe(string eventName) 
        where TEventHandler : IDynamicEventHandler;

    void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler;

    void Unsubscribe(string eventName) 
        where TEventHandler : IDynamicEventHandler;
}

Basicamente, tenho um método para a publicação de eventos, dois para que “consumidores” possam manifestar seu interesse por um determinado tipo e, finalmente, dois para cancelar o interesse (desassinar).

Por definição, nessa implementação, utilizo o nome do tipo para inferir o nome evento nas implementações com tipagem forte.

A implementação do EventBus

Agora, vamos a um pouco mais de código.

public class RabbitMQEventBus : IDisposable, IEventBus
{
    private readonly ILifetimeScope _autofac;
    private readonly PersisterConnection _connection;
    private readonly ILogger _logger;
    private readonly SubscriptionsManager _subscriptionManager;

    private string _queueName;
    private IModel _consumerChannel;

    public string ExchangeName { get; }

    public RabbitMQEventBus(
        PersisterConnection connection,
        ILogger logger,
        ILifetimeScope autofac,
        string exchangeName = "DefaultExchange"
        )
    {
            
        _connection = connection 
            ?? throw new ArgumentNullException(nameof(connection));

        _logger = logger 
            ?? throw new ArgumentNullException(nameof(logger));

        _autofac = autofac
            ?? throw new ArgumentNullException(nameof(autofac));

        ExchangeName = exchangeName;

        _subscriptionManager = new SubscriptionsManager();
        _subscriptionManager.OnEventRemoved += OnSubscriptionManagerEventRemoved;
        _subscriptionManager.OnEventAdded += OnSubscriptionManagerEventAdded;

        _consumerChannel = CreateConsumerChannel();
    }

    void OnSubscriptionManagerEventAdded(object _, string eventName)
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        using (var channel = _connection.CreateModel())
        {
            channel.QueueBind(
                queue: _queueName,
                exchange: ExchangeName,
                routingKey: eventName
                );
        }
    }

    void OnSubscriptionManagerEventRemoved(object _, string eventName)
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        using (var channel = _connection.CreateModel())
        {
            channel.QueueUnbind(
                queue: _queueName,
                exchange: ExchangeName,
                routingKey: eventName
            );

            if (!_subscriptionManager.IsEmpty) return;

            _queueName = string.Empty;
            _consumerChannel.Close();
        }
    }

    public void Publish(Event @event)
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        var policy = Policy.Handle()
            .Or()
            .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
            {
                _logger.LogWarning(ex.ToString());
            });

        using (var channel = _connection.CreateModel())
        {
            var eventName = @event.GetType()
                .Name;

            channel.ExchangeDeclare(exchange: ExchangeName,
                type: "direct");

            var message = JsonConvert.SerializeObject(@event);
            var body = Encoding.UTF8.GetBytes(message);

            policy.Execute(() =>
            {
                channel.BasicPublish(exchange: ExchangeName,
                    routingKey: eventName,
                    basicProperties: null,
                    body: body);
            });
        }
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {
        var eventName = typeof(TEvent).Name;
        _subscriptionManager.AddSubscription<TEvent, TEventHandler>();
    }

    public void Subscribe(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        _subscriptionManager.AddSubscription(eventName);
    }


    public void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {
        _subscriptionManager.RemoveSubscription<TEvent, TEventHandler>();
    }

    public void Unsubscribe(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        _subscriptionManager.RemoveSubscription(eventName);
    }

    private IModel CreateConsumerChannel()
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        var channel = _connection.CreateModel();
        channel.ExchangeDeclare(exchange: ExchangeName, type: "direct");
        _queueName = channel.QueueDeclare().QueueName;

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            var eventName = ea.RoutingKey;
            var message = Encoding.UTF8.GetString(ea.Body);

            await HandleEvent(eventName, message);
        };

        channel.BasicConsume(queue: _queueName,
            autoAck: false,
            consumer: consumer);

        channel.CallbackException += (sender, ea) =>
        {
            _consumerChannel.Dispose();
            _consumerChannel = CreateConsumerChannel();
        };

        return channel;
    }

    private async Task HandleEvent(string eventName, string message)
    {
        if (!_subscriptionManager.HasSubscriptionsForEvent(eventName))
        {
            return;
        }

        using (var scope = _autofac.BeginLifetimeScope("RabbitMQEventBus"))
        {
            var subscriptions = _subscriptionManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                await subscription.Handle(message, scope);
            }
        }
    }

    public void Dispose()
    {
        _consumerChannel?.Dispose();
    }
}

Algumas notas:

  • Optei por manter uma topologia simples. Tenho um único Exchange fazendo encaminhamento direct para as queues dos assinantes.  Cada assinante possui sua Queue. Há um Binding para cada tipo de evento que a Queue assina.
  • Utilizei o modelo de conexão persistente indicado em um post anterior
  • Quando o processo se encerra (fechando o canal), a queue também é encerrada (eventos só são processados quando o consumer está em operação).
  • Utilzo AutoFac para fazer a carga do handler. Gosto muito da abordagem por permitir fácil injeção de dependências.

As assinaturas no EventBus são mantidas por um objeto auxiliar (SubscriptionManager)

public class SubscriptionsManager
{
    private readonly IDictionary<string, IList> _handlers
        = new Dictionary<string, IList>();

    public bool IsEmpty => !_handlers.Keys.Any();

    public event EventHandler OnEventRemoved;
    public event EventHandler OnEventAdded;

    public void AddSubscription(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        AddSubscription(
            typeof(TEventHandler),
            eventName
            );
    }

    public void AddSubscription<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {
        AddSubscription(
            typeof(TEventHandler),
            typeof(TEvent).Name,
            typeof(TEvent)
            );
    }

    private void AddSubscription(Type handlerType, string eventName, Type eventType = null)
    {
        if (!HasSubscriptionsForEvent(eventName))
        {
            _handlers.Add(eventName, new List());
            OnEventAdded?.Invoke(this, eventName);
        }

        if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
        {
            throw new ArgumentException(
                $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
        }

        _handlers[eventName].Add(Subscription.New(
            handlerType, eventType)
            );
    }

    public void RemoveSubscription(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        var handlerToRemove = FindSubscriptionToRemove(eventName);
        RemoveSubscription(eventName, handlerToRemove);
    }


    public void RemoveSubscription<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {

        var eventName = typeof(TEvent).Name;
        var handlerToRemove = FindSubscriptionToRemove(eventName);
        RemoveSubscription(eventName, handlerToRemove);
    }

    Subscription FindSubscriptionToRemove(string eventName)
    {
        if (!HasSubscriptionsForEvent(eventName))
        {
            return null;
        }

        return _handlers[eventName].SingleOrDefault(s => s.HandlerType == typeof(TEventHandler));
    }

    private void RemoveSubscription(
        string eventName, 
        Subscription subsToRemove
        )
    {
        if (subsToRemove == null) return;
        _handlers[eventName].Remove(subsToRemove);

        if (_handlers[eventName].Any()) return;

        _handlers.Remove(eventName);
        OnEventRemoved?.Invoke(this, eventName);
    }

    public bool HasSubscriptionsForEvent(string eventName) => 
        _handlers.ContainsKey(eventName);

    public IEnumerable GetHandlersForEvent() 
        where TEvent : Event
    {
        var key = typeof(TEvent).Name;
        return GetHandlersForEvent(key);
    }

    public IEnumerable GetHandlersForEvent(string eventName) 
        => _handlers[eventName];

    public Type GetEventTypeByName(string eventName) => _handlers[eventName]
        ?.FirstOrDefault(handler => !handler.IsDynamic)
        ?.EventType;
}

Cada assinatura é mantida em um objeto especialista com acesso ao Handler

public class Subscription
{
    public bool IsDynamic => EventType == null;
    public Type HandlerType { get; }
    public Type EventType { get;  }

    private Subscription(Type handlerType, Type eventType = null)
    {
        HandlerType = handlerType;
        EventType = eventType;
    }

    public async Task Handle(string message, ILifetimeScope scope)
    {
        if (IsDynamic)
        {
            dynamic eventData = JObject.Parse(message);
            if (scope.ResolveOptional(HandlerType) is IDynamicEventHandler handler)
                await handler.Handle(eventData);
        }
        else
        {
            var eventData = JsonConvert.DeserializeObject(message, EventType);
            var handler = scope.ResolveOptional(HandlerType);
            var concreteType = typeof(IEventHandler<>).MakeGenericType(EventType);
            await(Task) concreteType.GetMethod("Handle")
                .Invoke(handler, new[] { eventData });
        }
    }

    public static Subscription New(Type handlerType, Type eventType) =>
        new Subscription(handlerType, eventType);
}

Concluindo

Em um ambiente de múltiplos componentes, sobretudo Microsserviços, o EventBus é um componente essencial para facilitar escalabilidade e resiliência. RabbitMQ é uma excelente opção para implementação do EventBus.

É importante atentar que a má utilização do EventBus pode gerar algumas dores de cabeça, sobretudo em cenários onde os microsserviços precisem colaborar para completar uma atividade atômica… Mas, sobre SAGAS falaremos em outra oportunidade.

Capa: Joanna Kosinska

Compartilhe este insight:

Elemar Júnior

Sou fundador e CEO da EximiaCo e atuo como tech trusted advisor ajudando diversas empresas a gerar mais resultados através da tecnologia.

Elemar Júnior

Sou fundador e CEO da EximiaCo e atuo como tech trusted advisor ajudando diversas empresas a gerar mais resultados através da tecnologia.

Mais insights para o seu negócio

Veja mais alguns estudos e reflexões que podem gerar alguns insights para o seu negócio:

Software em funcionamento é mais relevante que documentação abrangente. Concordo com esse princípio expresso no manifesto ágil. Entretanto, acho que...
O desenvolvimento de uma aplicação com ótima performance só é possível mediante considerações desde sua arquitetura. Otimizações pontuais de código,...
Autorização, em qualquer aplicação não é processo simples. Quando estamos implementando Microsserviços, o desafio pode ser um pouco maior. Neste...
[tweet]Transformação Digital é sobre como o negócio será impactado (transformado) pela adoção de recursos digitais.[/tweet] Portanto, começando uma nova série...
Microsserviços podem se transformar, rapidamente, em um pesadelo para a área de operações. Diferente do que ocorre com um monolítico,...
Quando pensamos sobre o código-fonte do Roslyn, deveríamos pensar em performance! Eu gostaria de compartilhar algumas técnicas de performance e...

Crie sua conta

Preencha os dados para iniciar o seu cadastro no plano anual do Clube de Estudos:

Crie sua conta

Preencha os dados para iniciar o seu cadastro no plano mensal do Clube de Estudos:

× Precisa de ajuda?