Fixing history — An event sourcing journey

No Comments

Introduction

Elescore, a platform built by me that tracks elevator disruptions, integrates multiple external data sources. One of these sources is the DB FaSta API, providing disruption information for all facilities operated by Deutsche Bahn.

In the past, this API has had several major outages reporting invalid facility status information. Consequentally, the information about disruptions displayed on the platform were wrong. Using event sourcing and statistics, I was able to detect outages and compensate them.

This blog post will describe my journey and experiences as well as evaluate the results.

How Elescore stores disruption data

Elescore uses event sourcing to persist all of its data. It is only natural to use this persistence method since disruptions are events by definition. Early in the development of Elescore I was questioning myself: What is the source of truth? There are several possibilities. For example, the truth could be the disruptions that Elescore detected from incoming sources. Another possibility is to actually event source the primary information source, i. e. the API, itself. This might look like a conceptual difference at first, but, as we can see later, it turned out to be essential. Elescore event sources every source separately.

At the time writing this article, the (simplified) definition of a disruption event is as follows:

data DisruptionEvent
  = FacilityDisrupted FacilityId Reason
  | DisruptionReasonUpdated FacilityId Reason
  | FacilityRestored FacilityId

The event store is a database table in an sqlite database. Its data type representation is defined like this:

data PersistedEvent a = PersistedEvent
  { evId         :: EventId
  , evType       :: EventType
  , evStream     :: StreamName
  , evOccurredOn :: DateTime
  , evPayload    :: a
  }

This is probably the simplest possible implementation of an event store. Every source inside Elescore is persisted in its own event stream. This enables us to replay every stream individually. A database row will then look like this:

Id # Event Type Stream Name Occurred On Payload
id1 1 FacilityDisrupted Disruptions.DB 2018-07-22 12:55:33.882466387 json payload
id2 2 FacilityRestored Disruptions.DB 2018-07-22 13:00:33.845970502 json payload

To plug sources and projections together I used a streaming approach with Pipes. A disruption source therefore must be some sort of event producer. Each integration implements the same data type:

data Source = Source
  { disruptionEvents :: Producer (PersistedEvent DisruptionEvent)
  -- ...
  }

Because of the conceptual difference between event sourcing the source, and event sourcing the disruptions that occurred, each source replays all of the past events when the application starts. This makes it possible to easily correct events emitted from a source. But before we’re looking into correcting data, let me explain how the mapping from the DB FaSta API to domain events works.

How Elescore maps the DB FaSta API to events

The FaSta API basically consists of one relevant endpoint.

/fasta/v2/facilities?state=INACTIVE,UNKNOWN

This endpoint returns all facilities (~3.200) that are operated by the Deutsche Bahn. If a facility is disrupted, the key state within the JSON object will contain either INACTIVE or UNKNOWN along with an optional reason in stateExplanation. It is possible to filter facilities based on status (see path above). You can look at the “documentation” in more detail.

As you can see, this has little to do with event sourcing. The API returns disrupted facilities as is, at the time the request is made. That is unfortunate, but not unfixable.

In order to derive events out of that, two actions have to be performed:

  1. Polling the API regularly. Elescore, well within the API throttling limit, polls every 10 seconds.
  2. Calculate the difference between the previous state and the current state.

This results in CRUD-like events:

type FacilityState = Map FacilityId Facility

data Change a
  = New a
  | Updated a a
  | Deleted a

calculateChanges :: FacilityState -> FacilityState -> [Change Facility]

Using these two maps makes it possible to calculate the difference and intersection, ensuing the changes that happened between two requests. With this kind of information we can derive the domain events used by Elescore.

deriveEventFromChange :: Change Facility -> DisruptionEvent
deriveEventFromChange change =
  case change of
    New facility ->
      FacilityDisrupted (facilityId facility) (reason facility)
    Updated _ facility ->
      DisruptionReasonChanged (facilityId facility) (reason facility)
    Deleted facility ->
      FacilityRestored (facilityId facility)

The events are persisted in the event store and the PersistedEvents are emitted from the Source. The fun begins when the API starts returning wrong results.

API outages

Unfortunately, the world is not perfect and, every once in a while, @dbopendata tweets about outages.


Tweet 1 Tweet 2

Roughly translated: The Tweet at the bottom notifies about an outage of the FaSta API. The Tweet above announces the issue has been resolved. So outages do happen and we also have some sort of indication about their duration (this was a major outage). We will use these dates later (30th Oct. to 6th Nov.) to check if we identified the outage correctly.

Empirically, I discovered two patterns of outages that happen: Either unusually high numbers of facilities are suddenly reported disrupted (with state = unknown), or an unusual high amount of facilities are reported operational. Normally, there are about 180 facilities reported inactive (which is, given the overall amount of ~3.200 facilities, not such a bad quota as one might expect!). With that in mind, let me explain my initial approach at fixing this.

Initial attempt at compensating API outages

Something that worked really well until recently was to plain out ignore all disruptions that were of status UNKNOWN. Basically, if a facility was disrupted with reason UnderMaintenance and was later changed to MonitoringDisrupted, I just assumed it was still under construction. I interpreted every unknown status as “they don’t know (anymore)”.

In code this looks like the following example. If you’re unfamiliar with pipes, read this as a pipeline (a stream) that consumes persisted disruption events and emits them while possibly deciding to drop certain events. “Unless the event reason is UNKNOWN, pass it on, otherwise drop it”.

type PersistedDisruptionEvent = PersistedEvent DisruptionEvent 

filterUnknown ::
  Pipe PersistedDisruptionEvent PersistedDisruptionEvent IO ()
filterUnknown = for cat $ \ev ->
  case evPayload ev of
    FacilityDisrupted _ r       -> unless (unknownReason r) (yield ev)
    DisruptionReasonUpdated _ r -> unless (unknownReason r) (yield ev)
    _                           -> yield ev

  where
    unknownReason :: Reason -> Bool
    unknownReason MonitoringNotAvailable = True
    unknownReason MonitoringDisrupted    = True
    unknownReason _                      = False

Note how this also drops facilities that are newly reported as disrupted. Elescore would simply not display them as disrupted until they would have one of the known reasons for being disrupted. This also effectively excluded the few facilities that just did not have monitoring installed (MonitoringNotAvailable).

That implementation has one flaw: It is vulnerable to erroneously emitted FacilityRestored events. And this is exactly what happened somewhere on the November 4th.

On that day, the API suddenly returned zero facilities. Not only zero disrupted facilities, but zero facilities. And the result was what you’d expect it to be: A lot of Deleted changes (as explained above), and because of that, equally many FacilityRestored events. Consequentally, there were zero disruptions on Elescore at that time.

Statistics to the rescue

This brings me to the more sophisticated method at detecting API outages: Analyzing spikes in the number of disruptions. We know something fishy is going on if the number of disruptions reported drops to 0. Equally, it’s suspicious if it raises well above normal to something like 500 (or even 3.000). It’s just very unlikely so many facilities are broken in ten seconds.

After some unsatisfying attempts at coming up with a mathematical function detecting spikes, I vaguely remembered statistics classes I used to have in the university. And sure enough, statistics has just the thing: The 1.5 × interquartile range (IQR) rule. Staticians refer to spikes as residuals. The 1.5 × IQR rule detects residuals. Let me explain how it works and then apply it to the problem at hand.

1.5 × IQR

At first, sample data is required. In our case, this is easy: It’s the number of disrupted facilities over time. So, a real sample looks like this.

sample = [278, 279, 279, 279, 280,
          280, 280, 280, 280, 280,
          281, 281, 281, 281, 281,
          281, 282, 283, 283, 285] 

From that sample, some numbers need to be calculated. Specifically, the median, the first and third quartile as well as the IQR.

Median:
(280 + 281) / 2 = 280.5
25th Percentile (Q1):
280
75th Percentile (Q3):
281
Interquartile Range (IQR):
Q3 – Q1 = 1

According to the 1.5 × IQR rule, everything below Q1 – 1.5 × IQR and everything above Q3 + 1.5 × IQR is considered a residual.

Lower Residuals:
280 – 1.5 * 1 = 278,5 = [278]
Upper Residuals:
281 + 1.5 * 1 = 282.5 = [283, 283, 285]

In order to compute this I wrote a small Haskell module that you can examine (no, this is far from optimized, but it works).

Now what

It is obvious that a continous stream of FacilityDisrupted and FacilityRestored events will never be able to produce any kind of residual. What’s needed is some kind of clustering of events. And it just so happens we can do that. It’s possible to cluster events by the second they occurred on. This is because new events come in clustered anyways (every 10 seconds). Technically, the optimal datum would be some sort of request id in the event metadata. But I did not implement that.

chunkBy :: Eq b => (a -> b) -> [a] -> [[a]]
chunkBy _  [] = []
chunkBy f (x:xs) =
  let (_, as, res) = foldl' doChunk (f x, [x], []) xs
  in reverse (reverse as : res)
  where
    doChunk (b, as, ass) a =
      if f a == b
      then (b, a : as, ass)
      else (f a, [a], reverse as : ass)

This will take a list of as, a function to chunk them by, and turn them into a list of list of as. Applied to a PersistedEvent and a function that turns a DateTime to a string that is identifying a timestamp with second precision, we have an event stream chunked by seconds.

Now, whether we’re replaying events from the store or processing data coming right in from the API, they are chunked by the time of when they occurred. This is essential because now there will be spikes. By being less accurate, there are larger gaps in the number of disruptions. To make this more visible imagine these two interpretations of the same event stream:

| Time -->
|-----------------------------------------------------------|
| +1 | +1 | +1 | +1 | -1 | +1 | +1 | -1 | -1 | -1 | -1 | +1 |
|         4         |         4         |         2         |

Using the same approch to detect changes explained in the chapter “How Elescore maps the DB FaSta API to events” above, it’s now possible to calculate the number of disrupted facilities for each chunk. I then add each of the disruptions to the sample and apply the 1.5 × IQR rule.

Picking the right sample

It’s high time to try out this more sophisticated method of API outage detection that will fix all of Elescore’s problems.

Starting Elescore v3
2018-07-22T13:51:49 Residual detected: 280 -> 281.0
2018-07-22T13:55:45 Back to normal: 280 -> 280.0
2018-07-22T14:31:46 Residual detected: 285 -> 286.0
2018-07-22T14:34:46 Back to normal: 285 -> 285.0
2018-07-22T15:45:50 Residual detected: 285 -> 286.0
2018-07-22T15:50:43 Back to normal: 285 -> 285.0
2018-07-22T15:51:43 Residual detected: 285 -> 286.0
2018-07-22T15:56:45 Back to normal: 285 -> 283.0
2018-07-22T17:50:49 Residual detected: 274 -> 273.0
2018-07-22T17:51:50 Back to normal: 274 -> 274.0
2018-07-22T21:48:48 Residual detected: 287 -> 288.0
2018-07-23T03:31:46 Back to normal: 287 -> 287.0
2018-07-23T03:33:46 Residual detected: 287 -> 288.0
2018-07-23T03:35:47 Back to normal: 287 -> 287.0
2018-07-23T04:23:43 Residual detected: 284 -> 290.0
2018-07-23T11:49:45 Back to normal: 284 -> 289.0
2018-07-23T11:53:46 Residual detected: 289 -> 290.0
2018-07-23T12:02:49 Back to normal: 289 -> 289.0
2018-07-23T12:03:49 Residual detected: 289 -> 290.0
2018-07-23T12:06:49 Back to normal: 289 -> 288.0
2018-07-23T14:30:49 Residual detected: 289 -> 299.0
2018-07-23T15:07:45 Back to normal: 289 -> 290.0
... this goes on forever

Whoopsie, what happened here? The change 280 -> 281 is considered a residual? Something clearly went wrong. Back when I explained the 1.5 x IQR rule the astute reader might have noticed it already. This approach is way to sensitive for our use case. What’s happening is that in reality, disruptions somehow are on a stable level: 281, 280, 280, 280, 281, 280, 279, 280, and so on. And given a sample like this, 285 is a giant residual. The method isn’t wrong, the samples are.

I played around with it a bit and what turned out to be promising is a) using a rather large sample size and b) having a unique sample. Using my IQR module mentioned above, I will demonstrate the difference:

sample = mkSample (278 :| [279,279,279,280,280,280,280,280,280,
                           281,281,281,281,281,281,282,283,283,285])
uniqueSample = mkSample (278 :| [279,280,281,282,283,285])

upperResiduals sample
[283.0,283.0,285.0]
lowerResiduals sample
[278.0]

upperResiduals uniqueSample
[]
lowerResiduals uniqueSample
[]

With a unique sample size of 1.000, the API outages are identified correctly. We can clearly see how the API had an outage on Oct. 30th. And we see that somewhere on Nov. 5th it went back to normal. Going back to the Tweets above, this looks about right (Oct. 30th to Nov. 6th)!

You can even see that they restarted the API a couple of times but that didn’t actually fix the problem. On the 4th, the disruptions dropped to 1. This was when the API suddenly returned no facilities anymore (technically, 200 OK with empty array JSON payload). With the power of event sourcing, I was not only able to identify outages but correct them retroactively. Let that sink in for a while.

Starting Elescore v3
2018-08-05T01:44:39 Residual detected: 351 -> 785.0
2018-08-05T12:48:12 Back to normal: 351 -> 397.0
2018-08-05T13:18:43 Residual detected: 361 -> 435.0
2018-08-05T13:24:29 Back to normal: 361 -> 367.0
2018-09-28T04:51:40 Residual detected: 466 -> 508.0
2018-09-28T08:15:36 Back to normal: 466 -> 439.0
2018-09-28T09:00:06 Residual detected: 458 -> 822.0
2018-09-28T10:51:00 Back to normal: 458 -> 476.0
2018-09-29T12:36:30 Residual detected: 66 -> 0.0
2018-10-01T11:06:37 Back to normal: 66 -> 261.0

-- latest major outage lasting several days (Oct. 30th to Nov. 5th)
2018-10-30T08:48:04 Residual detected: 344 -> 510.0
2018-11-04T13:22:45 Back to normal: 344 -> 219.0
2018-11-04T13:44:07 Residual detected: 401 -> 961.0
2018-11-04T16:24:58 Back to normal: 401 -> 360.0

-- I assume they "fixed" it here, but not really ;)
2018-11-04T20:52:37 Residual detected: 298 -> 1.0
2018-11-05T15:14:30 Back to normal: 298 -> 239.0
(end of output)

All in all I am very happy with this result. There is one more thing to do, though, and that is emitting compensating events after an outage has ended.

Compensate with events

Compensating an outage means to filter events that occurred. So the interesting part is figuring out the events that are actually correct. Fortunately, this is easy to do and almost no additional code needs to be written. It’s possible to re-use what is already there.

When an outage is detected, the last known good state is kept. As the outage goes on, every event that occurred is kept. After the end of the outage, we have the next known good state. All that is left to do is to calculate the domain events that should have been emitted by calculating the changes from the last good state and the new one. For each event that should’ve been emitted, a matching event is taken out of the ones that did.

compensateApiDisruption
  :: FacilityState
  -> FacilityState
  -> [PersistedEvent DisruptionEvent]
  -> [PersistedEvent DisruptionEvent]
compensateApiDisruption lastGoodState nextGoodState eventsHappenedInBetween =
  let (_, eventsThatShouldHaveHappened) = calculateChanges lastGoodState nextGoodState
  in foldl'
      (\acc ev -> acc ++ find (firstEventWithSamePayload ev) eventsHappenedInBetween)
      []
      eventsThatShouldHaveHappened

And that is all the magic. Let’s see if that works.

Evaluating the result

First thing I did was booting up Elescore at the state it was in on Nov. 5th. This is, again, possible because of event sourcing. By replaying the events up until that date, I was able to see how it would have looked like had I applied this fix on the 5th. Instead of it showing zero disruptions, it showed what I expected it to look like. There were around 250 disruptions, many of them going for several days. So far so good.

I then booted up an up-to-date instance to compare number of disruptions with the production instance. Turned out, the local instance with outage detection and compensation had around 10 more disruptions than production. This, I thought, could not be correct. So I looked at one randomly chosen disruption that was new in more detail. More specifically, a disruption of one of the elevators of the train station “Kassel-Wilhelmshöhe”.

The event store for this facility looked like this:

Event Type Occurred On Reason
FacilityDisrupted 2018-10-08 12:44:44 NotAvailable
DisruptionReasonUpdated 2018-10-08 13:27:38 MonitoringDisrupted
FacilityRestored 2018-11-04 20:52:37 n/a
FacilityDisrupted 2018-11-05 15:14:30 MonitoringDisrupted

The last two events were caused by the outage. Previous to that, there was the original disruption and an update. If you remember, Elescore ignores disruption reasons like MonitoringDisrupted, as the actual state of the facility is unknown. Knowing that, it’s logically why the production version considered the elevator to be operational: The disruption was reported on Nov. 5th with reason MonitoringDisrupted.

The local version, however, had identified the outage and filtered out the last two events making it possible to look back in time to the first reported disruption of this elevator. And that was indeed on Oct. 8th with the reason NotAvailable. And this is the reason why the local instance of Elescore reported more disruptions than the production instance.

Summary

This was a fun challenge. Because I had event sourced the DB FaSta API, I was not only able to retroactively identify outages using the 1.5 × IQR rule, but also compensate for the outage by filtering the emitted events.

Moving Forward

The foundations for Elescore have been set. It is rapidly evolving from a personal pet project to a useful product for both consumers and producers. When you visit Elescore today, you will see the latest iteration of Elescore along with an all new Vue/Vuetify frontend.

What’s missing now is bringing value to elevator manufacturers and service providers. In order to achieve this, I will research the topic “predictive maintenance”, which tries to predict facility outages before they occur. The plan is to cooperate with companies to develop a platform that helps them manage their facilities.

Philipp Maier

Philipp likes domain events. So much that he stores them, sends them on journeys and folds over them. In his free time, he does exactly this but with Haskell.

Comment

Your email address will not be published. Required fields are marked *