Axon replaying made easy with endpoints

No Comments

The event store is at the heart of any event sourced application. It contains every event that occurred in the lifetime of your system. Those events contain every state change in the application. EventSourcing is often combined with Command Query Responsibility Segregation (CQRS). Which for Axon means that there is a separate read-side which can be implemented in projections. In general there are two main places where events have affected the current state of the software: the aggregate and the projection. 

Event Processors
If you are familiar with Axon, you may know that there are two types of processors: Subscribing Event Processors and Tracking Event Processors. The main difference between these processors is that the Subscribing Event Processors subscribe themselves to a source of Events and are invoked by the thread managed by the publishing mechanism. Tracking Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.

Projections
The current state of our software is the result of the execution of those events by our aggregates. Projections are specific views based on these events which each serve its own purpose. For example, if you register yourself to a new bank, an event will be emitted in the lines of `CustomerRegisteredEvent`. You can imagine that some projections are interested in this event. Think of a projection that keeps track of all the personal information of customers that are registered to the bank, or a projection that keeps track of the progress a customer is making into filling in all the information they need.  

However, as time goes by requirements will change, and you will see that you will want to update your projections both in structure and content. This is where Axon replays kick in. Replays are a core concept when doing CQRS and EventSourcing which is supported by the Axon framework.

Behind the scenes
Replaying in Axon is only possible if you are using Tracking Event Processors. 

Once you have specified from which event you want to replay, Axon will:

  • Create a replay token with the provided event and the value of the token before reset
  • Update the token in the Token Store
  • Open the event stream from that point
  • Read the events
  • Process the events in a Unit of Work on the Tracking Event Processor

Spring Rest API to the rescue!
So how do you initiate a replay? There are multiple ways to do this. Since Axon 4, it is possible to replay in Axon Server, but that is out of scope for this blogpost. Another way is to reset the Tracking Event Processor at a certain point in time. A caveat for this solution is that you have to manually stop the Tracking Event processor before resetting. As a third option, you can manually update the tracking token in the Token store to re-process the past events and finally and our preferable solution is to use the Replay API provided by Axon. 

We created a secured Rest API in Spring that utilizes this Replay API. The only information that we had to provide was the position from which we wanted to replay and the name of the Tracking Event Processor. Before we reset the processor, we checked if the processor is actually running on this instance and if so, shut it down through code. Eventually Axon created a replay token that contained the value of the token at reset and inserted it into the Token store. When we replayed all the events, Axon would convert the replay token back to a tracking token and continue where it left off.

To give a bit of an understanding of what our TrackingEventProcessorService looked like:

@Service
@Slf4j
public class TrackingEventProcessorService {
 
  private final EventProcessingConfiguration eventProcessingConfiguration;
 
  public TrackingEventProcessorService(
        EventProcessingConfiguration eventProcessingConfiguration
  ) {
     this.eventProcessingConfiguration = eventProcessingConfiguration;
  }
 
  private TrackingEventProcessor getTrackingEventProcessor(String name) {
     return this.eventProcessingConfiguration
           .eventProcessor(name, TrackingEventProcessor.class)
           .orElseThrow(TrackingEventProcessorNotFoundException::new);
  }
 
  public boolean replay(String trackingEventProcessorName, Long index) {
     TrackingEventProcessor trackingEventProcessor = this.getTrackingEventProcessor(trackingEventProcessorName);
     if (!trackingEventProcessor.isRunning()) {
        this.logger
              .warn(
                    "Tracking event processor {} is not running in current instance or not running at all",
                    trackingEventProcessorName
              );
        return false;
     }
 
     trackingEventProcessor.shutDown();
 
     try {
        trackingEventProcessor.resetTokens(GapAwareTrackingToken.newInstance(index - 1, Collections.emptySortedSet()));
     } catch (UnableToClaimTokenException e) {
        // Ignore this exception and let the caller know setting the replay failed.
        this.logger.warn("Unable to claim token for trackingEventProcessor {} on id {}", trackingEventProcessorName, index - 1, e);
        return false;
     } finally {
        this.logger.info("Starting replay for trackingEventProcessor {} on id {}", trackingEventProcessorName, index - 1);
        trackingEventProcessor.start();
     }
     return true;
  }
}

This service would use the Axon configuration to fetch the Tracking Event Processor and start the replay.

Key findings

After using replay management for over a year, we have some findings that we would like to share with the world.

Obstruction in event replays
In Tracking Event Processing, events are processed in different threads which makes the error handling more complicated. Our error handling was not correct and whenever an error occurs, the replay token gets stuck. The token in the token store lost its owner and we had to redeploy the service for Axon to assign a new owner. 

Sticky sessions for the replay API
We came across a situation where we used sticky sessions on our load balancers and the Tracking Event Processor was not running on the instance we were being routed to, so the replay process was not being initiated. When your instances are load balanced, it is best to implement a round robin and repeat the calls. 

Replay batch size
Processing speed (time) is a very important factor when you are talking about replaying. Depending on the complexity, number of events to replay and memory utilisation of your projections, it is possible that a replay can take from minutes up to several days. To optimize your replay capabilities it is possible to set the batch size of your Tracking Event Processor. To understand how this can benefit your replay experience, one should know what happens when Axon is processing events. For each batch Axon needs to update the tracking token and if you are using a transactional store it begins and commits a database transaction. This presents a lot of overhead if you are handling events one at a time and you may benefit from a larger batch size. To give you an example, if you have a projection that you want to replay that has a very high memory footprint, a high batch size (e.g. >1000) would negatively impact the total time of processing since the application has to use lots of resources in reading the events, which negatively impacts the remaining resources to actually process the events. 

Event replay health metrics
It happens that token processors are unable to catch up with new events. There can be multiple causes, because the sequential handling is really important. The time it takes to handle one event is something you need to tune. We had multiple occasions where we took a full day to optimize RDMS queries so get them to optimal speed for replays. Once a token is on the head revision ideally it should take no more than 100ms to process a single subsequent event for the more simple CRUD-like events. However that is only a rough estimate. An event table can easily contain millions of events, and if one would like to get the full advantage of replays, it should take hours and not days to replay the event stream. One can log metrics about the difference between the token position and the event horizon. These metrics can be used in your logging/alerting tools to monitor token health.

Comment

Your email address will not be published. Required fields are marked *