Implementando CQRS e Event Sourcing com RavenDB

Tempo de leitura: 7 minutos

Se você não sabe o que é CQRS, recomendo a leitura de uma pequena revisão que escrevi há bem pouco tempo. O mesmo vale para Event Sourcing


Sobre RavenDB

Antes de começar a falar sobre CQRS e Event Sourcing, vamos introduzir alguns conceitos fundamentais sobre RavenDB: Document, Index e Transformer.

Document

São objetos JSON, como o apresentado abaixo, representando o que estamos persistindo no banco de dados.

{
    "Name": {
        "GivenName": "John",
        "Surname": "Doe"
    },
    "Salary": 256,
    "HomeAddress": {
        "$type": "Payroll.Domain.Model.BrazilianAddress, Payroll.Domain",
        "StreetName": "Main avenue",
        "Number": 42,
        "AdditionalInfo": null,
        "Neighborhood": "Good Ville",
        "City": "MyCity",
        "State": "XX",
        "PostalCode": "91234-123",
        "Country": "Brazil"
    }
}

Esses objetos não precisam seguir nenhuma especificação de esquema e geralmente estão agrupados em "coleções".

Em um primeiro momento, podemos pensar em documentos como registros e coleções como tabelas em um banco de dados relacional. Entretanto, no RavenDB cada documento poderá adotar a estrutura que for mais apropriada.

RavenDB oferece suporte a transações para manutenção de documentos. Por isso, é considerado um banco que suporta ACID.

Index

São funções que são executadas no lado do servidor e que determinam que campos (e quais valores) podem ser utilizados em buscas. Essas funções geralmente criam estruturas semelhantes a materialized views que aceleram a obtenção de resultados.

Todo o processo de indexação acontece em background e é disparado sempre que um dado é adicionado ou modificado. Essa abordagem, permite que o servidor responda rapidamente mesmo quando grandes porções de dados foram modificadas e evita buscas mais pesadas. Ema compartida, não há garantias de que os dados retornados/armazenados nos índices estejam atualizados.

Índices no RavenDB são eventualmente consistentes.

Transformações

São funções que são executadas no lado do servidor, capazes de fazer projeções, aceitando parâmetros, refinando resultados e mais.

Diferente dos índices, transformações não criam estruturas adicionais de dados no servidor sendo utilizadas apenas durante consultas em que são invocadas.


RavenDB e CQRS

Por ser um banco de documentos, livre de esquema, RavenDB é uma excelente opção para banco de dados Query Stack. Mas, não precisamos parar por aí.

Vejamos:

cqrs_raven

Como a figura sugere, podemos pensar em índices e transformações como fontes excelentes de dados para a Query Stack. De forma análoga, os documentos, pela sua natureza ACID, são perfeitas para a Command Stack.


Persistência adequadas ao domínio com RavenDB

Seguindo DDD, buscamos adotar linguagem ubíqua nos serviços de domínio. Não é diferente com o repositório.

public interface IEmployeeRepository
{
    bool IsRegistered(EmployeeId id);
    Employee Load(EmployeeId id);

    void CreateEmployee(
        EmployeeId id,
        FullName name,
        decimal initialSalary);

    void RaiseSalary(EmployeeId id, decimal amount);
    void UpdateHomeAddress(EmployeeId id, Address homeAddress);
}

Salvando e recuperando documentos

A implementação em RavenDB para essa interface não oferece grandes dificuldades.

public class EmployeeRepository : IEmployeeRepository, IDisposable
{
    private readonly IDocumentStore _store;
    private JsonSerializer _serializer;

    public EmployeeRepository() { /* implementation omitted */ }

    public bool IsRegistered(EmployeeId id)
    {
        return _store.DatabaseCommands.Head($"employees/{id}") != null;
    }

    public Employee Load(EmployeeId id)
    {
        Employee result;
        using (var session = _store.OpenSession()) { result = session.Load<Employee>($"employees/{id}"); }
        return result;
    }

    public void CreateEmployee(EmployeeId id, FullName name, decimal initialSalary)
    {
        using (var session = _store.OpenSession())
        {
            var employee = new Employee(id, name, Address.NotInformed, initialSalary);
            session.Store(employee);
            session.SaveChanges();
        }
    }

    public void RaiseSalary(EmployeeId id, decimal amount)  { /* implementation omitted * }

    public void UpdateHomeAddress(EmployeeId id, Address homeAddress) { /* implementation omitted * }

    public void Dispose() { _store.Dispose(); }
}

Salvar documentos é trivial. Você pode encontrar mais informações sobre como isso ocorre nesse post.

Suportando polimorfismo

Diferentes países tem diferentes formatos de endereço. Meu domínio reconhece esse fato e possui diferentes implementações, uma para cada localidade..

public abstract class Address
{
    public abstract string Country { get; }
}

public sealed class BrazilianAddress : Address
{
    public string StreetName { get; private set; }
    public int Number { get; private set; }
    public string AdditionalInfo { get; private set; }
    public string Neighborhood { get; private set; }
    public string City { get; private set; }
    public string State { get; private set; }
    public string PostalCode { get; private set; }
    public override string Country => "Brazil";

    public override bool Equals(object obj) { /* implementation omitted /* }
    public override int GetHashCode() { /* implementation omitted /* }

    public static class Factory { /* implementation omitted /* }
}

RavenDB, com seu esquema livre, não cria dificuldades para que eu faça a persistência.

public void UpdateHomeAddress(EmployeeId id, Address homeAddress)
{
    _store.DatabaseCommands.Patch($"employees/{id}", new[]
    {
        new PatchRequest
        {
            Type = PatchCommandType.Set,
            Name = "HomeAddress",
            Value = RavenJObject.FromObject(homeAddress, _serializer)
        }
    });
}

O serializador adiciona meta-atributos que garantem que, na carga do objeto, o tipo adequado será utilizado.

Executando atualizações com scripts

Outro ponto a destacar no repositório é o método que trata do aumento do salário:

public void RaiseSalary(EmployeeId id, decimal amount)
{
    _store.DatabaseCommands.Patch($"employees/{id}", new ScriptedPatchRequest
    {
        Script = $"this.Salary += {amount.ToInvariantString()};"
    });
}

Não há necessidade de recuperar informações do banco. O Script enviado é um Javascript.


RavenDB e Event Sourcing

A ideia fundamental de ES é armazenar os diversos eventos gerados pelo domínio como fonte primária de dados.

events2

Essa abordagem permite recuperar o estado de qualquer entidade/agregado simplesmente "refazendo" as modificações registradas nos eventos. O grande ganho é que podemos saber o estado de qualquer entidade/agregado ao longo do tempo.

A implementação de ES com RavenDB é bem simples. Caso o sistema já emita eventos basta enviar os objetos correspondentes para armazenamento como documentos.

public class EmployeeEventStore :
    IMessageHandler<EmployeeRegisteredEvent>,
    IMessageHandler<EmployeeHomeAddressUpdatedEvent>,
    IMessageHandler<EmployeeSalaryRaisedEvent>,
    IDisposable
{
    private readonly DocumentStore _store;

    public EmployeeEventStore()
    {
        _store = new DocumentStore
        {
            Url = "http://localhost:8080/", // server URL
            DefaultDatabase = "RegularDb"
        };

        _store.Conventions.CustomizeJsonSerializer = 
            serializer => serializer.Converters.Add(new EmployeeIdJsonConverter());
        
        _store.Initialize();

        _store.Conventions.FindTypeTagName = t => "EmployeeEvents";
    }

    public void HandleInternal(Message message)
    {
        using (var session = _store.OpenSession())
        {
            session.Store(message);
            session.SaveChanges();
        }
    }

    public void Handle(EmployeeRegisteredEvent message)
    { HandleInternal(message); }

    public void Handle(EmployeeHomeAddressUpdatedEvent message)
    { HandleInternal(message); }

    public void Handle(EmployeeSalaryRaisedEvent message)
    { HandleInternal(message); }

    public void Dispose()
    { _store.Dispose();  }

    public class EmployeeIdJsonConverter : JsonConverter { /* implementation omitted */ }
}

O que fizemos aqui foi manifestar (para posterior registro no BUS) que temos interesse em todos os eventos relacionados a Employee. Além disso, para fins de organização, configurei para que todos os documentos fossem adicionados em uma mesma coleção.


Usando Map/Reduce para projetar uma ViewModel a partir dos eventos (número de eventos por funcionário)

Armazenar eventos, na Command Stack abre possibilidades e desafios técnicos interessantes. Como produzir ViewModels para a Query Stack? Se estivermos utilizando RavenDB podemos frequentemente recorrer aos índices.

Comecemos com um exemplo bem simples:

public class EmployeeEventsSummaryResult
{
    public EmployeeId EmployeeId { get; set; }
    public int NumberOfEvents { get; set; }
}

class EmployeeEventsSummaryIndex 
    : AbstractIndexCreationTask<EmployeeEvent, EmployeeEventsSummaryResult>
{
    public override string IndexName => "EmployeeEvents/Summary";

    public EmployeeEventsSummaryIndex()
    {
        Map = (events) =>
            from e in events
            select new EmployeeEventsSummaryResult
            {
                EmployeeId = e.EmployeeId,
                NumberOfEvents = 1
            };

        Reduce = (inputs) =>
            from input in inputs
            group input by input.EmployeeId into g
            select new EmployeeEventsSummaryResult
            {
                EmployeeId = g.Key,
                NumberOfEvents = g.Sum(x => x.NumberOfEvents)
            };
    }
}

O que esse índice responde é o total de eventos associados a cada entidade Employee. Ele faz isso através da aplicação da técnica Map Reduce.

Durante o Map todos os eventos registrados são convertidos (mapeados) para uma representação comum.

map

Depois, essas representações são agrupadas atendendo a um critério.

group

Por fim, ocorre a redução:

reduce

A beleza é que pela natureza do algoritmo, a atualização do índice ocorre muito rapidamente (um map, um reduce).


Usando Map/Reduce para projetar uma ViewModel a partir dos eventos (Os mais bem pagos)

Usando uma ideia análoga a anterior, vamos construir um outro índice.

public class EmployeeSalary
{
    public EmployeeId EmployeeId { get; set; }
    public string FullName { get; set; }
    public decimal Salary { get; set; }
}

public class EmployeeEventsSalaryPerEmployee : AbstractMultiMapIndexCreationTask<EmployeeSalary>
{
    public override string IndexName => "EmployeeEvents/SalaryPerEmployee";

    public EmployeeEventsSalaryPerEmployee()
    {
        AddMap<EmployeeSalaryRaisedEvent>(events => 
            from e in events
            where e.MessageType == "EmployeeSalaryRaisedEvent"
            select new 
            {
                e.EmployeeId,
                FullName = "",
                Salary = e.Amount
            });

        AddMap<EmployeeRegisteredEvent>(events =>
            from e in events
            where e.MessageType == "EmployeeRegisteredEvent"
            select new
            {
                e.EmployeeId,
                FullName = e.Name.GivenName + "  " + e.Name.Surname,
                Salary = e.InitialSalary
            });

        Reduce = inputs =>
            from input in inputs
            group input by input.EmployeeId
            into g
            select new EmployeeSalary()
            {
                EmployeeId = g.Key,
                FullName = g.Aggregate("", (a, b) => b.FullName != "" ? b.FullName : a),
                Salary = g.Sum(x => x.Salary)
            };
    }
}

Este índice, diferente do anterior, utiliza duas fontes par mapping. Não há problema com isso, desde que a saída seja comum.

Perceba que durante o map usamos as propriedades InitialSalary para o evento de funcionário adicionado e Amount para o evento de aumento.

Atendendo a filosofia de entregar um modelo "pronto para o uso", estou gerando um FullName concatenando as partes no evento de funcionário adicionado. Deixo o nome em branco para o evento de aumento pois esse registro não possui a informação.

O Reduce é quase auto explicativo. Basicamente, estou somando os salários e pegando o nome.

O consumo dessa informação é muito fácil também.

public IEnumerable<EmployeeSalary> TopSalaries()
{
    IEnumerable<EmployeeSalary> results;

    using (var session = _store.OpenSession())
    {
        results = session
            .Advanced
            .DocumentQuery<EmployeeSalary, EmployeeEventsSalaryPerEmployee>()
            .OrderByDescending(es => es.Salary)
            .ToList();
    }

    return results;
}

Concluindo

Phew!

Nesse post vimos como RavenDB possui fit natural com CQRS e, até mesmo, com Event Sourcing.

Para começar, temos a facilidade da estrutura de documentos com esquema livre. Ela é ideal para armazenamento de objetos prontos para consumir (ideais para Query Stack) e permite fácil adoção de abordagem polimórficas. Além disso, por fornecer transações, é sólida o suficiente para ser utilizada como base de documentos.

A característica de permitir esquema livre também facilita muito a criação de um repositório de eventos, caso desejemos trabalhar com Event Sourcing. Eventos são fáceis de registrar e consultar.

Por fim, o suporte a índices permite a criação de mecanismos avançados de projeção.

Era isso.

Você pode saber mais sobre RavenDB acessando o site oficial

Deixe aqui seu comentário... 10 Comentários

  • 1-Tem esse fonte no seu github?
    2-como funciona o Raven Index? É possível efetuar boost por um campo? Consigo especificar um analyzer?

  • Nelson Júnior disse:

    Muito bacana o artigo Elemar. Parabéns. Eu tenho estudado bastante sobre CQRS e confesso que até então o RavenDB era uma das últimas opções pra que eu usasse como persistência de eventos. Talvez porque eu tenha ficado com um pré-conceito.
    Um detalhe que venho acompanhando e tem um amigo meu que tem estudado muito pesado F#, é que o Event Sourcing combina demais com o paradigma funcional.
    Pra finalizar, gostaria de compartilhar contigo e com seus leitores um projeto que eu comecei uma arquitetura baseado nos conceitos do CQRS & Event Sourcing. Como é um projeto de estudo junto, pode ter pontos equivocados e ficarei grato se puder aponta-los. 🙂
    Segue o endereço do projeto no github https://github.com/ircnelson/mycqrs

    Abraço.

    • elemarjr disse:

      Olá Nelson,
      Obrigado por comentar e compartilhar sua experiência. Vou clonar seu repositório hoje e dar uma bola olhada.
      Venho usando F# há tempos e sou entusiasta do paradigma funcional. Concordo com você.

  • Anatoliy 'TLK' Kolesnick disse:

    You probably didn't use this in production. RavenDB does not work very well as event storage. It just stops to return the data after you have millions of events. We tried to use it for more than 3 years and gave up. Now we use EventStore which is much more suitable as event storage.

    • elemarjr disse:

      Hello Anatoliy,
      Thanks for commenting.
      This is not a production code. But, I did some stress testing with millions of records and I didn't have any problem running it.
      For RavenDB team, event storage isn't the core thing, but that is still a fairly common usage.
      I really would love to know more details about your issues. Feel free to share your experience with me (or with RavenDB support) here or using email.

  • CLEBER AZEVEDO disse:

    Elemar não há suporte do RavenDB ainda para linux, procede?

  • wilson disse:

    Fala Elemar ,cara você chegou usar o Reven no .net core , você considera o reven uma opção noSQl para .net core visando em multiplataforma?

    • elemarjr disse:

      Opa!

      Não. Ainda não cheguei a usar. Mas, é importante dizer que a próxima versão do server (versão 4) está sendo desenvolvida com .NET Core.

      Em tempo, minha opinião é um pouco suspeita pois trabalho com o time. Mas, sim, considero uma ótima opção.

Deixe uma Resposta