First steps for Implementing a Saga Execution Coordinator using TheFlow

In a previous post, I wrote about how to support sagas using a Workflow Engine as Saga Execution Coordinator (if you want to have a better understanding about the Saga pattern, I recommend you to read that post first). In that moment, I shared that I am working on a lightweight .NET workflow engine called TheFlow.

TheFlow is open source, and it is available as a NuGet package.

In this post, I will explain some basic concepts that you need to understand to get TheFlow up and running in your projects (including as a Saga Execution Coordinator).

The ProcessManager

Understanding the ProcessManager is the first step for using TheFlow. It is the component responsible for taking care of creating new process instances, persisting it and loading whenever it is necessary.

You could create multiple ProcessManager instances. Anyway, in practical scenarios, it makes no sense. Because of that, I recommend following the same pattern adopted by the RavenDB team to manage a single instance of IDocumentStore.

public static class ProcessManagerHolder
{
    private static readonly Lazy<ProcessManager> LazyProcessManager = 
        new Lazy<ProcessManager>(() =>
        {
            var models = new InMemoryProcessModelsStore();
            var instances = new InMemoryProcessInstancesStore();

            return new ProcessManager(models, instances);
        });

    public static ProcessManager Instance =>
        LazyProcessManager.Value;
}

The ProcessManager uses two independent stores. The first one, for the models (description of the process components, including activities, events and other elements). The second one, for the instances (the data related with an executing process).

For long-running processes, it would be recommendable to use a persistent store for instances. Currently, there is a instances store using RavenDB as an underlying persistence mechanism under development.

The ProcessModel

After creating the ProcessManager instance, we are ready to define the models for processes that we want to support.

var model = ProcessModel.Create(Guid.Parse("a12637a3-72de-4774-b60c-d98310438c26"))
    .AddEventCatcherFor<StartEvent>()
    .AddActivity<Activity1>()
    .AddActivity<CompensatingActivity1>()
    .AttachAsCompensationActivity("CompensatingActivity1", "Activity1")
    .AddActivity<Activity2>()
    .AddActivity<CompensatingActivity2>()
    .AttachAsCompensationActivity("CompensatingActivity2", "Activity2")
    .AddActivity<Activity3>()
    .AddEventThrower<EndEventThrower>()
    .AddSequenceFlow("OnStartEvent", 
        "Activity1",
        "Activity2",
        "Activity3", 
        "End");

ProcessManagerHolder.Instance.ModelsStore.Store(model);

In the example, we are defining the process for a Saga (a sequence of activities, each activity with a corresponding compensating one [that will be executed if the process fails]).

The ProcessModel class exposes a fluent interface easy to understand and use. Currently, TheFlow has support for activities, event catchers, gateways and more.

The Starting Event

The ProcessManager will start a new process (in this example, a new Saga) instance whenever handling an event that matches with the one specified in the model.

ProcessManagerHolder.Instance.HandleEvent(new StartEvent());

The event is a plain clr object. You could retrieve it from a Message Broker (RabbitMQ, for example) and send it to the ProcessManager.

Activities

After starting a new process instance, the process manager will begin to execute all the activities in the defined sequence.

Activities are implemented by inheriting the Activity class.

public class Activity1 : Activity
{
    public override void Run(ExecutionContext context)
    {
        Console.WriteLine("Running activity 1. Is it working?");
        var response = Console.ReadKey();

        if (!(response.KeyChar == 'Y' || response.KeyChar == 'y'))
        {
            ProcessManagerHolder.Instance.HandleActivityCompletion(
                context.Instance.Id,
                context.Token.Id,
                null
            );
        }
        else
        {
            ProcessManagerHolder.Instance.HandleActivityFailure(
                context.Instance.Id,
                context.Token.Id,
                null
            );
        }
    }
}

Each activity can report success or failure to the ProcessManager. If a failure is reported, the ProcessManager will start to run the compensating flow.

The Log

Each process instance contains a detailed log describing all his events (start and end times for all the executed activities, for example).

The ProcessManager allows you to search for instances using the log information through the instances store.

Call to action

As you can see, TheFlow makes easy to define, execute, and control customized processes (including Sagas). It’s under active development and being improved every day. In this post, I shared just the basic concepts.

I invite you to use TheFlow, to improve the code, or register new issues. I would be grateful for any feedback in the comments.

Compartilhe este insight:

5 respostas

  1. Would it make sense creating a method (or extension method) for the ProcessModel class that adds both: Activity and Compensation Activity ? Since it seems that in most of the scenarios this would be the pattern: add activity + compensation.

    The example would become even easier to understand.

    I could see the usage as:

    var model = ProcessModel.Create(Guid.Parse(“a12637a3-72de-4774-b60c-d98310438c26”))
       .AddEventCatcherFor<StartEvent>()
       .AddActivityWithCompensation<RequestPayment, CancelPayment>()
       … 
    

    —-

    Question 1: Using this model, would it be possible to implement different activities in different applications, so that pieces of this process can be scaled out ? Maybe it wouldn’t be that necessary, but just curious.

    Question 2: I can see that on the method ContinueExecutionFromTheContextPoint on the class ProcessInstance is running the activities. Would it make sense putting a resiliency strategy, so that transient failures could go away ?

    Question 3: Do you have plans to support asynchronous activities ? I’m thinking of activities sending messages through the bus and the process would need to be idle until something happens (domain event or so…)

    1. Question 1.
      I am not sure if I got it right. Are you talking about having activities running in another process/machine? How would you implement the communication?

      Question 2.
      What would you suggest? Today I am starting the compensating flow…

      Question 3.
      Your activity can send the message through the Bus without reporting success or failure to the ProcessManager.

      Later, when you have a result, you can invoke the ProcessManager, passing the context information, and the process execution will continue.

      1. Question 1.
        Yes, that’s the idea. Maybe activities that by nature listens to a queue could react to it and process parts of it. I’m not sure whether this would be compatible with the process definition. Maybe the process definition would declare an async activity that sends a message to a queue. Another process would pick this message from the queue and process it. Optionally, it could publish an event saying that the activity completed containing the outputs of it. Then the main process would eventually listen to this event and progress the coming steps.

        Question 2.
        I would suggest using Polly (or any other method) to retry a couple of times (ideally customizable). This way, it is possible to guarantee that The Flow only starts the compensation flow when it is very likely that there is a bigger issue and not a transient failure.

        This is just a suggestion, but it can also be up to the developer that implements the activity to decide whether this strategy is important or not.

        Question 3.
        Cool. This actually helps and the flow doesn’t need to depend on any bus implementation. This also could be applied to my suggestion on Question n1.

  2. Hi,

    I am working on the Kubernetes microservices implementation and the nd user can communicate with API endpoints to access services. As one of my services will communicate with other services to accomplish a flow, I would like to implement a Sagas pattern to achieve consistency across services. How can I integrate SEC in Apis? Is it a separate service or will it be one of the services?

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Elemar Júnior

Sou fundador e CEO da EximiaCo e atuo como tech trusted advisor ajudando diversas empresas a gerar mais resultados através da tecnologia.

Elemar Júnior

Sou fundador e CEO da EximiaCo e atuo como tech trusted advisor ajudando diversas empresas a gerar mais resultados através da tecnologia.

Mais insights para o seu negócio

Veja mais alguns estudos e reflexões que podem gerar alguns insights para o seu negócio:

Mais cedo, uma de minhas parceiras de negócio entrou em contato comigo pedindo uma “ajudinha rápida”. Respondi para ela que...
Estamos, a maioria, em casa. Nossas rotinas não são as mesmas. Boa parte das atividades econômicas estão paradas. Aqueles que...
O ano era 2001 ou 2002. Não lembro ao certo. Eu era um jovem programador, pai recente, tentando “encontrar meu...
Na Guiando, buscamos entregar o melhor software no melhor tempo, com o melhor custo. Nos preocupamos em melhorar nossos processos...
Como consultor, tenho a oportunidade de ajudar desenvolvedores, arquitetos e executivos a desenvolver soluções em TI que atendam os objetivos...
This is another post about how to implement a basic Search Engine. Previously, I explained: how to produce an inverted...
× Precisa de ajuda?