6 min read

NATS Weekly #19

NATS Weekly #19

Week of March 21 - 27, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

️ 💁 News

⚡Releases

Official releases from NATS repos and others in the ecosystem.

🎬 Media

📖 Articles

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

What are my options for large payloads in messages?

First we will define what large means, since this is subjective. For NATS the default max payload size is 1MB. This includes the complete serialized representation of the message which includes subject, headers, and other (minimal) overhead.

If this limit is increased, the max recommended is 8MB (if you are curious why or if it can go higher, this all depends on desired throughput, if persistence is involved, and whether your network can handle it).

However, it is useful to step back and ask why do I need a message to contain more than 1MB of data. There are a few assertions and/or questions that typically come up:

  • I am able to send an arbitrarily-sized  payload over HTTP, how can I do this in NATS?
  • I want to enable a user to upload an asset (image, video, document). How do I do this with NATS?

There are a couple options depending on your use case:

  • Store the large asset(s) in separate storage (object store, filesystem, etc.) and include a reference in the NATS message
  • Chunk up the payload across a sequence of NATS messages. There is experimental support for JetStream based object stores.

Regarding the first option, the concrete approach would be to proxy an upload or asset transfer to an object store (S3, Cloud Storage, Minio, etc.) and then include the URL in the NATS message for the subscribe that needs to access the object later.

Another strategy if you want to be optimistic with the upload to the object storage is to generate a signed URL ahead of time, publish the message, and then deal with the object upload concurrently. This could be sending the URL to the client to upload directly or broker the upload from a dedicated endpoint (I would recommend the former with a short expiry time).

If you want to use NATS exclusively, then evaluate whether your language-specific client supports the object store API or use the nats CLI to try out the object store.

What happens when I publish a message to a stream with a replication factor of three?

The context of this question is focused on the overhead of a publish. NATS uses an tailored implementation of the RAFT consensus algorithm for the purposes of JetStream.

The RAFT protocol is leader-based and favors consistency over availability, however there is a spectrum of what this means depending on the implementation. For NATS, a publish requires a qourum of nodes to acknowledge that the message has been received (this is typical for consensus algorithms). So given a replication factor of three, only two of the three nodes need to ack, before an ack is sent back to the client publisher.

What about reading messages? It is not uncommon for systems that use RAFT to require consensus on reads as well, however for JetStream, this is not required. This is because streams are inherently append-only.

As Jean-Noel Moyne pointed out (in a Slack response), any implicit writes that occur as a result of consumption, will require consensus. This includes "interest" and "work-queue" streams since consumption may result in an implicit delete/tombstone the message for future consumers. For example, a subscriber received a message from a work queue and ack'ed it. No other future subscriber should observe this message.

How can you implement the saga pattern with NATS?

The saga pattern is a strategy for implementing a distributed transaction. In general, there are two sets of operations that need to be implemented, a set of event handlers representing the happy path as well as a set of compensating actions which are used to compensate for (not necessarily undo/rollback) what was partially done.

For example, it we assume the overused (but familiar) example of "buying something online," there are a handful of high-level steps that occur so fulfill the order (I am not a retail/ecommerce/warehouse expert, so forgive me since I am likely oversimplifying the domain). This is a list of commands modeling the intention of each step.

  • place-order
  • charge-credit-card
  • package-items
  • ship-package
  • deliver-package

There are few things to point out here:

  • A problem could occur at any one of these steps, credit card issue, low/lack of inventory, package damaged or lost, etc.
  • This end-to-end process is measured in human time, not in milliseconds like normal database transactions.
  • There are likely retry policies built-in, intermediate correcting actions, as well as timeout thresholds which are domain and business specific.
  • There is no undo (erasure) in the real world. Even refunding a credit card shows it as a separate "credit" on your statement. The world is append-only..

Each step in the saga is triggered by a prior event. Each of the above actions has a corresponding (forward-progressing) event.

  • order-placed
  • credit-card-charged
  • items-packaged
  • package-shipped
  • package-delivered

Let's start writing some code. We will use a standard request-reply handler for the place-order command since this is user-facing that should fast and transactional. For simplicity, I am assuming the command name is in the subject to switch on.

nc.Subscribe("commands.*", func(msg *nats.Msg) {
  switch msg.Subject {
  case "commands.place-order":
    // Validate details, generate the order ID, and publish the event.
    js.Publish("events.order-placed", ...)
    
    // Respond with the order ID.
    msg.Respond(...)
    
  default:
    msg.Respond([]byte("command not supported"))
  }
})
Validation logic and error handling elided for brevity.

Now that the event is in a stream, there are two general approaches to model the rest of the saga. These are referred to as orchestration and choreography (see detailed explanations in the above saga link).

Orchestration could be modeled as an event handler that dispatches the next step in the saga. For example:

js.Subscribe("events.*", func(msg *nats.Msg) {
  switch msg.Subject {
  case "events.order-placed":
    // Could be a core NATS request that this orchestrator
    // then publishes an event on behalf of (think external service),
    // or it could be a publish if the handler will be able to
    // publish the event itself.
    nc.Request("commands.charge-credit-card", ...)
    
  case "events.credit-card-charged":
    // handle..
    
  // etc.
  }
})

The state machine is effectively modeled in one place since the event -> command correspondance is all here. However the events are appended to the stream and wherever the command handlers live does not matter.

Choreography, on the other hand, relies on decouple event handlers (separate consumers and subscriptions) that would consume events and then react accordingly.

js.Subscribe("events.order-placed", func(msg *nats.Msg) {
  // Invoke a command or just handle it directly and then
  // publish an event (success or failure) once processed.
})

js.Subscribe("events.credit-card-charged", func(msg *nats.Msg) {
  // handle..
})

Importantly, these two ways of modeling are not mutually exclusion and different parts of the saga could use different strategies depending on the boundaries. For example, these macro-commands and events may be the orchestrated, but for every command, there may be an internal set of events and commands produced locally in the payment vs. warehouse vs. shipping contexts.

Choosing between the two approaches boils down to degree of control desired and/or available.

The other half of implementing a saga are the compensating actions. This means that when something goes wrong (really it is just a different condition to handle), then a set of compensating commands should be invoked. For example:

js.Subscribe("events.*", func(msg *nats.Msg) {
  switch msg.Subject {
  case "events.order-placed":
    rep, err := nc.Request("commands.charge-credit-card", ...)
    if string(rep.Data) == "credit card denied" {
    	js.Publish("events.credit-card-denied", ...)
    }
    
  case "events.credit-card-denied":
    // Invoke compensating commands
    nc.Request("commands.cancel-order", ...)
    
  case "events.product-out-of-stock":
    nc.Request("commands.send-apology-gift-card", ...)
  }
})

That last example case highlights the fact that compensating actions may not merely be to cancel things or perform credits, but it could also be more human to acknowledge something happened that we can't go back in time and fix.

Hopefully this explanation was mildly helpful even with the pseudocode. I do plan to write a series on several event-based patterns (with working code) and doing a deeper analysis of the implementation choices.


If you would like more in-depth information with examples to any of these questions, please reach out on Slack or Twitter!