12 min read

Implementing an event store on NATS: Design

Implementing an event store on NATS: Design
Photo by Ganapathy Kumar / Unsplash

In the first post, I introduced the primary requirements of an event store and the specific capabilities in NATS which satisfies them. To recap, this includes:

  • persistent streams via JetStream
  • subjects used to model granular event sequences
  • the "expected last sequence for subject" header for fine-grained optimistic concurrency control
  • JetStream consumers for point-in-time or live event consumers

The goal of this post is to explore the initial design that went into a (work-in-progress) Go module called Rita. The scope of the module goes beyond a bare-bones event store, however we need to get this in place first to build up the desired programming model.

I welcome all feedback including alternate designs, strong opinions, or use cases that I did not consider.. but first let's explore the current design.

Intro to Rita

We setup a Rita instance by using the constructor and passing a NATS connection.

r, err := rita.New(nc)

There are additional functional options, but the NATS connection is the only required one. Now we get a handler to an event store by name and create it.

orders := r.EventStore("orders")
err := orders.Create(nil)

The argument to Create is *nats.StreamConfig which can be specified to override the defaults. Create delegates to js.AddStream with one new default which is to set the bound subject to orders.> rather than just the stream name, e.g. orders in this case, however this can be overwritten.

Since an event store should support many aggregates/entities, it assumes we want to encode at least the identifier in the subject, e.g. orders.1, but additional subject tokens could be used. In fact, depending on the scale needs of your domain all subdomains, bounded contexts, and aggregates could be handled by one stream using the subject hierarchy.

Now that our event store is created, we know there are two fundamental operations that need to be supported, appending events and loading events 1.

We append like this..

seq, err := orders.Append(ctx, "orders.1", []*rita.Event{
  {Type: "order-placed", Data: []byte("...")},
})

seq is the stream sequence number of the last event that has been appended relative to the subject. And then we can load them back again...

events, seq, err := orders.Load(ctx, "orders.1")

Although this bare-bones API is (hopefully) simple to understand, there is quite a bit to unpack here in the implementation.

Event model

The rita.Event type has a handful of fields, most of which do not need to be provided by default when appending. By default, the only required fields are the Type and Data which must be a string and a byte slice, respectively (more on this in the next section).

Fields including ID and Time can be provided, but are added automatically if not specified. The ID is encoded as the Nats-Msg-Id header for automatic de-duplication if an append retry occurs. The optional Meta field is a map for application-defined metadata about the event.

An event is packed into an NATS message by leveraging headers. For example, given an event to append:

rita.Event{
  Type: "order-placed",
  Data: []byte(`...`),
}

The missing optional fields will be filled in if not set:

rita.Event{
  ID:   "ECY04xVdGe8SGsNIsCMIZU",
  Time: time.Date(2022, time.May, 19, 7, 18, 12, 707985190, time.Local),
  Type: "order-placed",
  Data: []byte(`...`),
  Meta: map[string]string{},
}

And then it will be converted to a NATS message.

nats.Msg{
  Subject: "orders.1",
  Header: nats.Header{
    "Nats-Expected-Stream": {"orders"},
    "Nats-Msg-Id": {"ECY04xVdGe8SGsNIsCMIZU"},
    // We will come back to this expected sequence header..
    "Nats-Expected-Last-Subject-Sequence": {"1"},
    "rita-time": {"2022-05-19T07:18:12.70798519-04:00"},
    "rita-type": {"order-placed"},
    "rita-codec": {"binary"},
  },
  Data: []byte(`...`),
}

On load, the event will be unpacked from the NATS message and will have the Subject and Sequence fields set for reference.

rita.Event{
  ID: "ECY04xVdGe8SGsNIsCMIZU",
  Time: time.Date(2022, time.May, 19, 7, 18, 12, 707985190, time.Local),
  Type: "order-placed",
  Data: []byte(`...`),
  Meta: map[string]string{},
  Subject: "orders.1",
  Sequence: 2,
}

Type registry

It is a common need to serialize and deserialize native values to bytes and vice versa. While serialized, the Type field effectively describes what Data models, e.g. com.example.order-shipped. However, when deserialized, ideally, we want a reference to a native type value, e.g. &OrderShipped{...}. The boilerplate of serializing and deserializing values is a straightforward process that can be abstracted away.

This is where the types.Registry type fits in (types is a sub-package of rita) . We can define a type registry and then pass it as an option to rita.New(...) to get transparent support for serializing and deserializing various type values (current scope are events, but could be expanded to commands, queries, and state).

tr, err := types.NewRegistry(map[string]*types.Type{
  "com.example.order-shipped": {
    Init: func() any { return &OrderShipped{} },
  },
  "com.example.order-placed": {
    Init: func() any { return &OrderPlaced{} },
  },
})
// handle error

r, err := rita.New(nc, rita.TypeRegistry(tr))

A future feature of a type definition may be to associate a schema.

For the event store append and load operations above, this simplifies two parts.

  • Only the Data field needs to be defined without the Type and it can be the struct value, e.g. &OrderShipped{} rather than a byte slice. The Append call will delegate to the type registry to lookup the associated type and serialize it on your behalf. The rita-codec value above would be set to the name of the codec that is in use, e.g. json.
  • When Load is called, it will auto-deserialize the value based on the type name and will assign it to the Data field. When an event is used, a type assertion can be used on event.Data based on the native type, e.g. event.Data.(*OrderShipped).

Going forward we are going to assume a type registry is in use.

Appending.. multiple events

In event sourcing, events model state transitions, however they don't get created in a vacuum. There is another message type called a command that is modeled as an imperative action that a client is requesting to perform, such as place-order.

A command has intention, but it does not mean it will be accepted. Only if the resulting state transition the command would yield is valid would it be accepted. For example, if the place-order command had an invalid shipping address, it could be rejected (validation) or if any of the items in the order are no longer available (violating an invariant).

This logic and trade-offs of what is enforced up front are very domain/business specific. That said, what comes out the other end is one event.. or multiple?

A debatable topic is whether a command should result in only one event or may result in more than one event. The most common example I have seen are cases when a state change occurs that then would result in another implicit state change.

Here are a couple examples to get a mental model:

  • Move a piece in a game of chess that would result in the Moved event normally, but if the move results in a checkmate or stalemate, this would be detected and two events would be emitted, e.g. Moved and StaleMate.
  • A todo list that when the last item is checked, the todo list itself is marked completed as a second emitted event.

Supporting multiple events is a generalization, so I think people feel better about this, "I know this second thing is going to result, so let's just emit it together, right?" However, to me it feels like a leaky design or an over-optimization.

The alternative way of handling this chain reaction of events is to have listeners on those events that would then would emit the resulting event. In other words, its not the player that decides that they have checkmate, it is the result of the board being in a state that the opponent's king can no longer move (which happens only after the move is performed). This may sound like one in the same, but it boils down to decoupling decision events from consequential events.

All that being said.. as shown in the Append example above, it does indeed support appending multiple events in case there is a valid need for it. Alternatively, one could think of it as shortcut way to append the decision followed by known consequences if that is your preferred way of modeling.

It is worth pointing out that all events are not appended transactionally since each event translates to one NATS message and NATS does not currently support atomic multi-message appends. This could be designed around by packing multiple events into a single message and transparently unpacking them on Load.

So then what are the guarantees of the multi-event Append? Optimistic concurrency control can be applied using the rita.ExpectSequence option which applies to the first event in the slice. This ensures the primary decision event can be appended in lock-step with any other concurrent actors handling commands for the same subject (this is a whole other topic that will be discussed in the future.. subjects as names for aggregates).

If you choose to include more than one event, those subsequent events do not get the "expected sequence" header since there could be interleaving events if decisions on the same subject are being handled concurrently. Note that interleaved events, would be extremely rare, but it could happen and we don't want the events to be rejected due to the wrong sequence if the primary event has already been committed.

In a future post, I will discuss correlation and causation metadata that could be used for tracking relationships along with a way to detect when concurrent appends do occur.

Loading events

The Load method is fairly straightforward, however, since NATS is inherently always streaming messages, it is worth discussing how the end-of-stream is determined.

NATS has an API to get the last message for a given subject.

nc.Request(
  "$JS.API.STREAM.MSG.GET.orders", // <- last token is the stream name
  []byte(`{"last_by_subj": "orders.1"}`),
  time.Second,
)

The JSON-encoded reply data includes the message sequence. This is used as the cut-off sequence for an individual Load operation to prevent it from waiting for new events (although a long-lived object with an active subscription may be something of interest).

Internally, an ephemeral OrderedConsumer is used to get the events on that subject as fast as possible in a loop with a break once the latest sequence is observed. The slice of events are returned along with the last (known) sequence.

Aside: Stale decision making

I want to draw a parallel with databases. When you run a query (e.g. select ... in a relational database), assuming the database is still accepting writes, the data should be considered immediately stale once returned.

Most decisions are inherently based on stale information. Preventing new writes, reducing latency, and/or providing user interfaces that receive updates in real-time will reduce this staleness factor. The impact of staleness on decision making will vary based on the decision being made and, importantly, whether it can be compensated for.

BUT.. what if, we could know precisely how stale our data is we are using to make a decision based off of?

This is an inherent feature of event sourcing where each subject corresponds to a model of data that is a consistency boundary for state changes. Every new event can be asserted using the last sequence as the expected sequence when attempting to append. If that does not match up, then the event store will reject the append.

If rejected, there are two options:

  • fetch the new events and re-handle the command to see if it would still result in a valid transition.. if so prepare the new event(s) and try appending again
  • simply reject it back to the client to require an explicit refresh of the view with a new command issued

One final note is that much of the event sourcing literature uses the term aggregate which has its roots in domain-driven design (DDD). At the time Eric Evans wrote the first DDD book and coined the term aggregate in this context, it was described in terms of objects and entities. However, these are programming concepts rather than implying each aggregate corresponds to some object in the world.

In fact, aggregates are often the opposite. They are a set of related data that requires a consistency boundary and are often a subset of a real entity as one would typically think. Apart from good design, the decision for how granular your aggregates are come down to scale requirements (since good design is hard and takes time).

The above example of orders.1 is a poor example of aggregate! Reading it makes it seem like all possible attributes about an order are shoved into one type. I mean you could do this, but there are plenty of reasons not to, one such is the open-closed principle which is a strength of event sourcing.

Drawing another analogy from databases, when you perform an update, you generally target specific columns or fields (for document stores), rather than the whole row or document. Transactional updates should be as small and scoped as possible.

An aggregate is the group of fields that are designed for a specific set of commands to provide that transactional isolation. If we want a "big picture" of an entity, e.g. the full order details, we can derive a view (sometimes called projection) from across multiple aggregates about that order (views will discussed in a future post).

šŸ˜… Alright, moving on..

Demo application

Of course you are welcome to check out Rita and play around with it, but I also wanted to create a demo application to showcase how the library is used in this nascent stage.

It is called Kids Money Manager (KMM) and provides a way for kids to learn how to manage money. Money can be (logically) deposited to the account by parents or guardians, and kids can withdraw up to a certain amount per day/week/month that their parent(s) defines via a budget. For ease of demo purposes, there is also a budget withdraw period of a minute so one can observe the behavior without needing to wait an entire day...

There are four commands:

  • deposit (parent)
  • withdraw (child)
  • set-budget (parent)
  • remove-budget (parent)

two queries:

  • balance
  • last-budget-period

and one stream:

  • ledger

(In this initial revision, no authentication or authorization is implemented to differentiate parent from child. Also some of these names could probably be better.)

KMM comes with a CLI that contains both the server implementation (services and streams) as well as the client commands. I will first show the usage as a user using the client commands.

We can deposit some funds to accounts by a unique name/id and provide an optional description.

$ kmm deposit bobby 50 'birthday money'
$ kmm deposit susie 20 'grass mowing'
$ kmm deposit susie 10 'allowance'

We can then get the current funds in the account.

$ kmm balance susie
30

Of course funds can be withdrawn:

$ kmm withdraw susie 7.50 'baseball cards'

We can also see the ledger, which creates an ephemeral consumer filtered to the account (in this case susie) and streams it. If you open this in a separate shell and continue depositing and withrawing, you will see the ledger continue to stream.

$ kmm ledger susie
+20 | Mon May 30 13:41:53 2022 | grass mowing
+10 | Mon May 30 13:42:02 2022 | allowance
-7.5 | Mon May 30 13:43:13 2022 | baseball cards

A primary feature of this (example) app is to provide a budget for kids, based on some period of time (as suppose to a category of spending). For real life, the budget would likely be daily, weekly, or monthly, but for the purpose of this demo app, it also supports per minute I call minutely (yes I know "minutely" is a word and it doesn't mean "per minute", but for consistency..).

For example, this will set budget of $20 (or whatever currency is applicable) per minute.

$ kmm set-budget susie 20 minutely

During this period, withdrawals are tracked and if a withdraw request would exceed the 20 budget, it would be rejected. set-budget can be called anytime changing the amount or frequency dynamically.

A parent can get a budget summary for an account.

$ kmm last-budget-period susie
period start: Mon May 30 13:45:00 2022
period end: Mon May 30 13:46:00 2022
withdrawals: 2
total withdrawn: 11.75

This indicates the last period that had at least one withdrawal, the start and end time and the total amount withdrawn (a future feature could just as easily list all known periods to see how the withdrawals changed over time).

If the parent wants to remove the budget for some special occasion (or to assess trust), then it can be removed.

$ kmm remove-budget susie

Live deployment

There is a live deployment of the application hosted on Fly.io, a modern platform as a service, and leveraging NGS, a global, multi-cloud, multi-geo NATS deployment by Synadia. Both platforms provide free tiers to get started.

If you want to interact with the deployment, sign up for a free NGS account (if you don't already have one), follow the instructions to setup your local environment (which configures nsc and the nats CLIs).

At this point, you can import the public streams and services to your NGS from the account used by the KMM application.

$ nsc add import --name kmm-services \
  --service \
  --remote-subject "kmm.services.>" \
  --src-account ACKGPTACAQFG2HT377VHTZM2TXLOOI3R5PBMWZCUBU5G4R63LUM6SHCY

$ nsc add import --name kmm-streams \
  --remote-subject "kmm.streams.*" \
  --src-account ACKGPTACAQFG2HT377VHTZM2TXLOOI3R5PBMWZCUBU5G4R63LUM6SHCY

Once the import is made, you can use the kmm CLI to execute the commands.

If you prefer to run the application locally, you can use the serve command which listens on 127.0.0.1:4837:

$ kmm serve --nats.embed

The above commands can take --nats.url=127.0.0.1:4837 to interact with this embedded instance.

Takeaways and next steps

  • Rita is an attempt to codify the event store design and provide a basis to a future desired programming model leveraging event sourcing.
  • Many event stores can be created with Rita and each one maps to a JetStream.
  • Although multiple events can be appended, consider design and transactionality trade-offs.
  • Loaded events should be a assumed to be stale (if there are multiple actors per subject). But we can ensure transactional appends using the ExpectSequence option.
  • Aggregates are consistency boundaries, each of which map to a single concrete subject, e.g. orders.1.shipping. Big aggregates are discouraged, but perfectly fine if your use case/scale allows for it.

In the next post, I will introduce the current bigger picture of Rita and propose a few more abstractions that will be integrated into the KMM application including command dispatching and a streams-first approach to the queries.