Overview

CQRS and Event Sourcing with Lagom

No Comments

Lagom is the new microservices framework from Lightbend (formerly Typesafe, the company behind Scala and Akka). The framework and the concepts behind it are heavily based on CQRS (Command Query Responsibility Segregation) and ES (Event Sourcing). This dictates how state is handled and persisted internally.

In this article I will describe the basics of Lagom and then look more closely at the concepts of CQRS and ES in combination with the framework.

Lagom, The Framework

The philosophy behind Lagom is that it

  • has to be distributed
  • has to have asynchronous communication
  • has to support high development productivity

These ideas dictate how the framework is built. The goal is to develop services on top of Lagom which are very small (in lines of code) and compact. Certain conventions make it straightforward to let the services communicate asynchronously. To give an example of this:

ServiceCall<CreateCustomerMessage, Done> createCustomer();
ServiceCall<NotUsed, Customer> getCustomerByEmail(String email);
ServiceCall<NotUsed, String> getCustomerAverageAge();
 
@Override
default Descriptor descriptor() {
   return named("customer-store").withCalls(
           pathCall("/api/customer/average-age", this::getCustomerAverageAge),
           restCall(Method.POST, "/api/customer", this::createCustomer),
           restCall(Method.GET, "/api/customer/:email", this::getCustomerByEmail)
   ).withAutoAcl(true).withCircuitBreaker(CircuitBreaker.perNode());
}

Three interfaces are being defined here. Because getCustomerAverageAge is a ServiceCall with NotUsed as first generic parameter, it will be automatically generated as an HTTP GET request. A ServiceCall with an object as first parameter and Done as second type will turn this automatically into a POST (even though the type doesn’t have to be explicit within the restCall method. This shows it’s possible with minimal code to define RESTful interfaces that internally are handled asynchronously.
Besided CQRS and ES some other important concepts are applied, such as immutability of objects, design-driven APIs and polyglot programming. Java as well as Scala are supported by the framework APIs, but by using RESTful APIs with JSON data, communication with other services has been made easy.
As the Lagom framework is developed by Lightbend, the technology it is based on should not come as a surprise. Akka, together with Akka Streams, Akka Persistence and Akka Cluster constitute the fundamentals and take care of communication and storage of data. Play is integrated for creation of the RESTful interfaces and for configuration of the framework. Slick is used as ORM framework, where SQL calls are also handled asynchronously. Lastly, ConductR takes care of deploying and scaling the application in production environments.

Some other noteworthy libraries are Logback (logging), Jackson (JSON serialization), Guice (dependency injection), Dropwizard (metrics) and Immutables (immutable objects).
The focus on immutability, non-blocking APIs and a strong presence of the CQRS and Event Sourcing concepts makes the biggest difference when comparing it to frameworks like Spring Boot. Moreover, Lagom is a much compacter framework and offers less functionality. For example, interfaces for queueing are not there and would need work to add and configure. In general Lagom prevents you from having to touch the underlying layers of the framework, but for any more advanced requirements, it will be essential to know and learn about these layers.

Persistence in Lagom

By default Lagom uses the Cassandra key-value store for persistency. As of version 1.2 it is also possible to use a JDBC store, where the principles and APIs are more or less comparable. Later we will dive into using a JDBC store more specifically.
Storing of data works by implementing the PersistentEntity abstract class (a code example will follow later). The PersistentEntity corresponds with the Aggregate Root from the Domain Driven Design concepts.

Every PersistentEntity has a fixed identifier (primary key) that can be used to fetch the current state and at any time only one instance (as a “singleton”) is kept in memory. This is in constrast to JPA, where multiple instances with the same identifier can exist in memory. To add to that, with JPA only the current state is usually stored in the database, whereas Lagom stores a PersistentEntity with its history and all events leading to the current states.
In alignment with the CQRS ‘flow’ a PersistentEntity needs a Command, Event and State. All interaction proceeds by sending Commands to the entity, followed by either an update being executed, or by a response that contains the requested data. So even the querying of the current state is handled by sending Commands.
In case of a change, the Command will lead to an Event that will be persisted. The Event then again results in the State being modified.
CQRS Command, Event, State flow
Fig 1: CQRS Command, Event, State flow

The next listing shows an example Command for adding a new customer.

public interface CustomerCommand extends Jsonable {
 
   @Immutable
   @JsonDeserialize
   public final class AddCustomer implements CustomerCommand, CompressedJsonable, PersistentEntity.ReplyType<Done> {
       public final String firstName;
       public final String lastName;
       public final Date birthDate;
       public final Optional<String> comment;
 
       @JsonCreator
       public AddCustomer(String firstName, String lastName, Date birthDate, Optional<String> comment) {
           this.firstName = Preconditions.checkNotNull(firstName, "firstName");
           this.lastName = Preconditions.checkNotNull(lastName, "lastName");
           this.birthDate = Preconditions.checkNotNull(birthDate, "birthDate");
           this.comment = Preconditions.checkNotNull(comment, "comment");
       }
   }
 
}

How to implement a service (the interface of which we saw in the first listing) and send a Command to an entity is shown in the next listing.

@Override
public ServiceCall<CreateCustomerMessage, Done> createCustomer() {
   return request -> {
       log.info("===> Create or update customer {}", request.toString());
       PersistentEntityRef<CustomerCommand> ref = persistentEntityRegistry.refFor(CustomerEntity.class, request.userEmail);
       return ref.ask(new CustomerCommand.AddCustomer(request.firstName, request.lastName, request.birthDate, request.comment));
   };
}

As you can see, the PersistentEntityRef is fetched by using a combination of the type and the identity / primary key. The reference is an instance that you can interact with by sending Commands.

The CreateCustomerMessage implementation (not shown in any listing) is comparable to the AddCustomer implementation from the second source code listing, but also conains the email address from the user as primary key.
To process Commands it is necessary to define so-called ‘Command Handlers’ in Lagom. These determine the Behavior for your PersistentEntity and always start with a clean State. The following listing shows the implementation for the CustomerEntity with its Behavior:

public class CustomerEntity extends PersistentEntity<CustomerCommand, CustomerEvent, CustomerState> {
 
   @Override
   public Behavior initialBehavior(Optional<CustomerState> snapshotState) {
 
      /*
       * The BehaviorBuilder always starts with a State, which can be initially empty
       */
       BehaviorBuilder b = newBehaviorBuilder(
               snapshotState.orElse(new CustomerState.EMPTY));
 
      /*
       * Command handler for the AddCustomer command.
       */
       b.setCommandHandler(CustomerCommand.AddCustomer.class, (cmd, ctx) ->
               // First we create an event and persist it
               // {@code entityId() } gives you automatically the 'primary key', in our case the email
               ctx.thenPersist(new CustomerEvent.AddedCustomerEvent(entityId(), cmd.firstName, cmd.lastName, cmd.birthDate, cmd.comment),
                       // if this succeeds, we return 'Done'
                       evt -> ctx.reply(Done.getInstance())));
 
      /*
       * Event handler for the AddedCustomerEvent event, where we update the status for real
       */
       b.setEventHandler(CustomerEvent.AddedCustomerEvent.class,
               evt -> {
                   return new CustomerState(Optional.of(evt.email), Optional.of(evt.firstName), Optional.of(evt.lastName), Optional.of(evt
                           .birthDate), evt.comment);
               });
 
      /*
       * Command handler to query all data of a customer (String representation of our customer)
       */
       b.setReadOnlyCommandHandler(CustomerCommand.CustomerInfo.class,
               (cmd, ctx) -> ctx.reply(state().toString()));
 
       return b.build();
   }
 
}

Finally a handler definition in the code listing, a ‘read only command handler’ is being created. You are not allowed to mutate any state through this handler, but it can be used to query the current state of the entity.

The BehaviorBuilder can also contain business logic, for example to mutate state differently when a customer already exists and as such has to be updated instead of created. The AddedCustomerEvent is identical to the AddCustomerCommand except for having the e-mail address, because we’ll need it later on.
Missing until now from the code listings is the CustomerState, which you can see below. The fields are all of type Optional because the initial state for a certain customer is ’empty’.

public final class CustomerState implements Jsonable {
 
   public static final CustomerState EMPTY = new CustomerState(Optional.empty(), Optional.empty, Optional.empty, Optional.empty, Optional.empty);
 
   private final Optional<String> email;
   private final Optional<String> firstName;
   private final Optional<String> lastName;
   private final Optional<Date> birthDate;
   private final Optional<String> comment;
 
   @JsonCreator
   public BlogState(Optional<String> email, Optional<String> firstName, Optional<String> lastName, Optional<Date> birthDate, Optional<String> comment) {
       this.email = email;
       this.firstName = firstName;
       this.lastName = lastName;
       this.birthDate = birthDate;
       this.comment = comment;
   }
 
   @JsonIgnore
   public boolean isEmpty() {
       return !email.isPresent();
   }
}

Read-side with JDBC in Lagom

In a CQRS (Command Query Responsibility Segregation) architecture the manipulation of data is separated from the querying of data. One of the more interesting aspects about this separation is that the read-side can be optimized for querying. Specifically by using denormalized tables on the read-side, grouping data in the most efficient way and by duplicating data where needed. This keeps queries simple and fast.

Additionally this will prevent so-called ORM impedance mismatch; the conceptual and technical difficulties of translating object structures to relational tables, for example translation of inheritance and encapsulation to relational schemas.
As I have shown above Lagom will automatically take care of storage and processing of events in the same way the framework supports storing of data on the read-side inside denormalized tables, shown in Figure 2.
CQRS Separate Read Write
Fig 2: Separated ‘read’ and ‘write’ side in line with CQRS
© Microsoft – CQRS Journey

Within Lagom you can define “ReadSideProcessor”s that can receive and process events and thereby store the data in a different form. The next listing shows an example of a ReadSideProcessor.

public class CustomerEventProcessor extends ReadSideProcessor<CustomerEvent> {
 
   private final JdbcReadSide readSide;
 
   @Inject
   public CustomerEventProcessor(JdbcReadSide readSide) {
       this.readSide = readSide;
   }
 
   @Override
   public ReadSideHandler<CustomerEvent> buildHandler() {
       JdbcReadSide.ReadSideHandlerBuilder<CustomerEvent> builder = readSide.builder("votesoffset");
 
       builder.setGlobalPrepare(this::createTable);
       builder.setEventHandler(CustomerEvent.AddedCustomerEvent.class, this::processCustomerAdded);
 
       return builder.build();
   }
 
   private void createTable(Connection connection) throws SQLException {
       connection.prepareStatement(
               "CREATE TABLE IF NOT EXISTS customers ( "
                       + "id MEDIUMINT NOT NULL AUTO_INCREMENT, "
                       + "email VARCHAR(64) NOT NULL, "
                       + "firstname VARCHAR(64) NOT NULL, "
                       + "lastname VARCHAR(64) NOT NULL, "
                       + "birthdate DATETIME NOT NULL, "
                       + "comment VARCHAR(256), "
                       + "dt_created DATETIME DEFAULT CURRENT_TIMESTAMP, "
                       + " PRIMARY KEY (id))").execute();
   }
 
   private void processCustomerAdded(Connection connection, CustomerEvent.AddedCustomerEvent event) throws SQLException {
       PreparedStatement statement = connection.prepareStatement(
               "INSERT INTO customers (email, firstname, lastname, birthdate, comment) VALUES (?, ?, ?, ?, ?)");
       statement.setString(1, event.email);
       statement.setString(2, event.firstName);
       statement.setString(3, event.lastName);
       statement.setDate(4, event.birthDate);
       statement.setString(5, event.comment.orElse(""));
       statement.execute();
   }
 
   @Override
   public PSequence<AggregateEventTag<CustomerEvent>> aggregateTags() {
       return TreePVector.singleton(CustomerEvent.CUSTOMER_EVENT_TAG);
   }
}

Now the ReadSideProcessor can be registered in the service implementation as follows (showing the full constructor for the sake of completeness):

@Inject
public CustomerServiceImpl(PersistentEntityRegistry persistentEntityRegistry, JdbcSession jdbcSession, ReadSide readSide) {
   this.persistentEntityRegistry = persistentEntityRegistry;
   this.persistentEntityRegistry.register(CustomerEntity.class);
   this.jdbcSession = jdbcSession;
   readSide.register(CustomerEventProcessor.class);
}

For the Event class a ‘tag’ needs to be defined as shown in the following listing, so Lagom can keep track of which events have been processed. This is important particularly for restarts or crashes, so that the data can be kept consistent between write- and read-side.

AggregateEventTag<CustomerEvent> CUSTOMER_EVENT_TAG = AggregateEventTag.of(CustomerEvent.class);
 
@Override
default AggregateEventTag<CustomerEvent> aggregateTag() {
   return CUSTOMER_EVENT_TAG;
}

Now that the processing of events is implemented and data is stored in denormalized tables, it can be easily queried using SQL queries. For example the next listing shows a simple query for the average age of customers in the system, added to the service implementation.

@Override
public ServiceCall<NotUsed, String> getCustomerAverageAge() {
   return request -> jdbcSession.withConnection(connection -> {
       ResultSet rsCount = connection.prepareStatement("SELECT COUNT(*) FROM customers").executeQuery();
       ResultSet rsAverage = connection.prepareStatement("SELECT AVG(TIMESTAMPDIFF(YEAR,birthDate,CURDATE())) FROM customers").executeQuery();
 
       if (rsCount.next() && rsAverage.next() && rsCount.getInt(1) > 0) {
           return String.format("# %s customers resulted in average age; %s", rsCount.getString(1), rsAverage.getString(1));
       } else {
           return "No customers yet";
       }
   });
}

Conclusion

CQRS and Event Sourcing are a powerful means to optimize the write- and read-side for a service separately. And while a NoSQL store certainly has its advantages, a relational database is highly suitable for querying over multiple object structures.
I hope to have shown you how Lagom supports this architecture perfectly and supports different solutions for persistence. With the principle of ‘convention over configuration’ developers can focus on implementing business logic instead of typing boilerplate code.

Lagom recently arrived at version 1.2.x and you will sometimes note this is still a young framework in some minor issues. Partly because of this I advise to take some caution and thoroughly evaluate whether Lagom is suitable for your production use-cases. But it certainly is a framework to keep an eye on.

Comment

Your email address will not be published. Required fields are marked *