7 min read

Implementing an event store on NATS: Introduction

Implementing an event store on NATS: Introduction
Photo by Adrian Hernandez / Unsplash

An event store is a database optimized for reading and transacting sequences of events. Unlike a relational database which has tables with fixed schema storing homogeneous records, each sequence in an event store can contain heterogenous events.

The term event can be interpreted a few different ways, but the basis is a "record of something that happened." What happened, in what context, and how it is modeled completely depends on the domain.

There are two common ways of modeling an event, in a specific context of the domain/business (typically called a domain event), or as a literal change data capture/snapshot/patch representation (I will call this a change event).

A domain event may look like this.

type PrescriptionOrdered struct {
  PatientID string
  OrderTime time.Time
  DrugName  string
  Quantity  int
  Unit      string
  // etc..
}
Note, that what happened is obvious.. a prescription was ordered.

Where as a change event may look like this.

type RecordChanged struct {
  ID      string
  Added   map[string]any
  Removed map[string]any
  Changed map[string]any
}
This event contains what changed, but the reason for the change is not apparent. The added, removed, and changed maps correspond to fields on some value of state that has changed.

A common class of system that produces change events are known as change data capture (CDC). Debezium is one example of CDC for capturing changes from a variety of database systems. Depending on how deeply you can integrate with source systems, you may be stuck with diffing snapshots yourself.

How does an event store differ from other databases?

Event stores have been designed to be used when applying event sourcing, which is alternate strategy for data modeling and persistence.

Most databases are optimized for storing point-in-time snapshots of state (tuples, records, documents, etc) identified by a key that can change over time. For example, a PostgreSQL record, a MongoDB document, or a Redis key-value will typically change over time in an online system.

There are two key differences between an event-centric and state-centric database:

  • when a state-centric database value is updated, the previous state is no longer accessible
  • there is no information for why the state has changed (or a way to capture it)

One exception is Datomic, which maintains all state transitions (essentially change events) for every entity over time. It supports querying the state at any point in time, either by wall clock time or transaction.

That said, Datomic is still state-centric and the reason/event underlying the state change is still not present by default (transactions in Datomic themselves can be reified with metadata so some representation of the reason for the change could be stored, but its secondary to the core data model).

Why can't we use existing databases for an event store?

You can. In fact, it is pretty straightforward and common to do so on relational, document, or key-value databases. However an event store comes with a heavily desired requirement of making these events easily and quickly consumable by what are generically called read models (more on this later).

Since the primary unit stored is an event, there isn't any state natively stored as a source of truth. When you read from an event store, you literally get events, not any form of state. You need to derive the state.

This concept of deriving state from events as of a point in time, is arguably the most compelling feature. Since these are domain events, we know why (or at least how) the state transition occurred in some context. The state transitions of the system are obvious to both developers and domain/business stakeholders.

Building an event store on databases that support updating records could lead to undesirable practices by less disciplined teams causing all sorts of downstream confusion and effects.

This brings us to two core features of NATS which are desirable for an event store:

  • Immutable and ordered streams (via JetStream)
  • Native (and optimized) support for distribution of events to consumers

Although an event store can implemented on existing databases, these two features are not core or supported at all.

The one notable exception is EventStoreDB which was originally developed by Greg Young who has been an advocate and practitioner of event sourcing for years. It is arguably the most well-known and production grade database for event sourcing.

That said, its subscription mechanism (and general support for broadcast and scale) is not as powerful as NATS. To be fair, I wouldn't expect it to be since it was not designed as a messaging-first system from its inception.

What are the base requirements for an event store?

There are four key requirements of an event store:

  • Indefinite persistence (with optional snapshots)
  • Granular event sequences
  • Optimistic concurrency control on sequences
  • Ordered replay and subscriptions

There are other features that may be desirable or make it easier to develop event-sourced models, but these are fundamental. These also, of course, don't speak to the production/operational concerns of an event store.

Indefinite persistence

The first requirement of indefinte persistence is satisfied by defining a file-based stream without any limits. As a bonus Raft-based replication and mirroring comes for free.

js.AddStream(&nats.StreamConfig{
  Name: "sales.>",
  Storage: nats.FileStorage,
  Replicas: 3,
})
The default retention policy is limits-based without any limits set.

Since each stream has its own subject space that can be bound to it, multiple event store streams can be created for different services/bounded contexts in your system, depending on scale requirements. This ability to create separate streams with there own geo and replication within the cluster is unique to NATS compared to other event store implementations.

The second half of the first requirement are snapshots. Snapshots are often used as an optimization for loading the derived state necessary when deciding on whether a command should be accepted or rejected.

A snapshot is simply a cache of a derived value. It is valid until new events are produced or the logic for deriving the snapshot has changed.

Fortunately, NATS has a native key-value layer built on top of JetStream that can be used for storing snapshots. The flow then is to read the events, derive the state, decide the next events and store then, compute the new snapshot, and then store that. Importantly, if the new snapshot fails to be store for any reason (or is out of date), we can simply re-derive the state.

Granular event sequences

In other event stores and content about event sourcing, the most common parlance used are event streams. This is fine, but I don't want to conflate this with a NATS stream.. so I am just calling them "granular event sequences."

In any case, these nicely map to NATS subjects. Since each stream binds a subject space, it is trivial to publish (append) events to a specific subject which corresponds to some entity/aggregate/state machine/workflow in your domain.

js.Publish(
  "sales.orders.1",
  []byte("..."),
)
Append an event to the sequence identified by "orders.1"

Another common feature in event stores that is already supported in NATS is declaring the category and the identifier within that category of the entity. For example, orders.1 as "the order identified by the ID 1". This categorization is useful when building read models (views) from the event store that are cross-cutting, such as all of the orders. However, since subjects are hierarchical, we are not limited to one categorical level if more granularity is desired.

Optimistic concurrency control

In basic terms, optimistic concurrency control (OCC) is the ability for a client to specify an expected version of some keyed-value and only if that version matches what the server has, the operation is accepted. If the version differes, the operation is rejected to the client for which is can refresh its state and retry or not.

Recall that each key (orders.1 in the sales context shown above) is its own sequence of events. Most of the time these sequences are fairly short (order of 10s of events). Often they even have a finite lifetime.

NATS supports this OCC natively by using the ExpectedLastSequencePerSubject publish option. This takes the expected sequence for the subject and ensures it matches the on the server before the event is written to the stream.

js.Publish(
  "sales.orders.1",
  []byte("..."),
  nats.ExpectedLastSequencePerSubject(2),
)
This can be translated to "publish this event only if the subject sequence is 2."

Note: there is the more general ExpectedLastSequence option which is at the stream level, but this is generally not necessary or desired since each entity stream (per subject) has its own consistency boundary and thus can be appended or read concurrently with other entity streams.

Aside: see this article (among many out there) as to why Apache Kafka is not well-suited for event sourcing in lieu of Confluent producing a course on the topic. It is possible, communities have different definitions of the concept 🤷.

Ordered replay and subscriptions

The final requirement of an event store is "ordered replay and subscriptions." Since events are being appended in a specific order, of course we want to be able read them back in order. This is necessary for deriving the state of the entity for making the next decision or determining the next state transition (each of which would yield an event) within our domain model.

Although modeling and storing events is desirable for transacting domain/business decisions, it is not a particularly good data model for aggregating or querying the data in those events. This leads to the desire to get the events out of the store into other database types or specialized indexes in order to serve queries.

Since the events in the stream are immutable, they can broadcasted out to interested consumers in order to build whatever read models that are needed. This is, of course, where NATS shines. Any number of consumers can be created for the stream for one-off derived views or durable consumers for long-lived ingesting/indexing into a relational database, for example.

js.PullSubscribe(
  "sales.orders.>",
  "relational-indexer",
)
Dedicated pull consumer for indexing event data into a relational model. This may be to support analytical queries, for example.

A key benefit of having a log of events as the source of truth is that these read models (and their consumers) can be thrown away and rebuilt at any time.

Takeaways and going forward

The goal of this first post was highlight the core requirements of an event store and highlight that NATS can satisfy all of them today. In my opinion I am underselling how good NATS can satisfy these requirements both in terms of simplicity and scale.

As a quick recap on the core NATS features we can leverage for an event store implementation:

  • persistent streams without limits via JetStream
  • subjects used as keys for granular event sequences
  • the "expected last sequence for subject" header for fine-grained OCC
  • JetStream consumers for a variety of point-in-time or live event consumption

In the next post, I will be introducing a few API options for creating, managing, and using event stores that are implemented on NATS. This API will add semantics on the NATS-provided primitives optimized for event sourcing.