6 min read

NATS Weekly #4

NATS Weekly #4

Week of December 6 - 12, 2021

ğŸ—ž Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

⚡Releases

⚙️ Projects️

  • nats-zero-down - A new repo I am using to explore zero downtime migrations. Active discussion thread here, however feel free to open issues in the repo if you have suggestions!

ğŸŽ¬ Media

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

Are people using NATS with serverless cloud platforms like AWS Lambda or Google Cloud Run?

This is long-form (and hopefully improved) response to a question I responded to on Slack.

This is a great question and something that I have thought about quite a bit. Since the term serverless still can be conflated with functions as a service (FaaS), here are a couple key quotes from the linked Wikipedia page

Serverless computing is a cloud computing execution model in which the cloud provider allocates machine resources on demand.
When an app is not in use, there are no computing resources allocated to the app. Pricing is based on the actual amount of resources consumed by an application.

Independent of any specific platform or deployment model (such as FaaS), serverless computing assumes the compute will not be long-running. Nearly all serverless offerings (be it cloud or open source) follow this general pattern:

  • Developer deploys the application to the serverless platform
  • Some message (event or request) is received by the platform
  • The platform checks if the application is running, if not it starts an instance
  • The platform routes the message to the application to do the computation
  • The platform keeps the application instance idle for a short amount of time in case more messages are received in succession
  • After some timeout, the platform turns off the instance

Serverless offerings can be extremely economical for use cases that have irregular or bursty workloads. You don't need to have a long-running process just in case a message shows up or even pre-provision extra capacity. Likewise these offerings will scale out in case there are a substantial number of messages received.

That said, if the expected message rate is fairly constant, serverless offerings may not be a good choice in cost, resource constraints, and added latency. There is an economic threshold once your application exceeds some number of messages per second or per message execution time.

So then where does NATS fit into all of this?

I could argue that most of these serverless compute offerings are not really necessary and they just make a certain set of trade-offs, some of which adds undesirable constraints. This can lead to a fragmented architecture of different computing services simply to account for the various types resource, latency, or processing time requirements.

In other words, deploying a set of processes as containers (or WebAssembly modules) that all connect to a NATS network would be a far simpler architecture. Every process can use the necessary patterns to support the need of that process.

All that said, if you or your organization have adopted even one managed cloud offering above IaaS, you will likely need to integrate with it. Even if you adopted NATS as your messaging substrate, you will need to be processes in your application that need to send messages to or receive messages from cloud services.

Cloud providers use their own messaging options to broker messages between services. For example, AWS' SNS, SQS, and EventBridge services, Google's PubSub, and Azure's Service Bus.

Two straightforward examples are bridging NATS to one (or multiple) of these messaging services.

If a message in NATS needs to be published to a service, a subscription on NATS and a publisher to the target service can be used for integration. Likewise the reverse is true, a NATS publisher could be written that consumes a message from a cloud messaging service and publishes it to a NATS subject.

How that is implemented exactly depends on the guarantees you require for that message hand-off as well as the transformation required of the message itself.

These messages (be it events or requests) can send to the correct receiver in order for AWS Lambda or Google Cloud Run to invoke the target serverless compute. A result can flow in the reverse direction from cloud message to NATS publisher.

This is all to say that NATS and cloud services, including cloud messaging services, can co-exist with a fairly straightforward integration. For the components of your system that do not exclusively use cloud services, NATS can be that messaging substrate. It gives you far more power in a smaller package than nearly all of the cloud messaging offerings and can be deployed in more places (on-premises, IoT devices, etc).

As an aside: If you are more curious of using NATS to create a serverless platform, check out OpenFaas.

How can I receive and validate a message before storing it into JetStream?

There was a great thread on Slack where a person asked more directly:

If I have a stream my_stream storing messages from subject my.subject.*  should I still be able to use Nats Core Request/Reply functionality to send messages into my.subject.* and receive them using the stream consumer and use nats.Msg.Reply field to send response back to sender?

The responses that followed indicated that the request-reply is not supported against streams (by design). Understandably, there was even an issue arguing it was a backwards incompatible change.

Eventually in the Slack thread, the OP states this:

I thought about just using "written to JS as confirmation", but I do need to do some validation on the receiver side before I accept the message

So the core use case was to do validation prior to writing to the stream! Fortunately there is a straightforward pattern for this.

  • Have a dedicate subject space for handling requests, e.g. requests.my.subject.*
  • Deploy a subscriber (or queue group) that handles these requests.
  • Perform the validation, respond to the client on error.
  • Otherwise derive the stream subject, prepare a message with the same data and header, and publish to the stream subject.
  • Optionally, reply to the client with the pub ack info serialized as the body.
nc.Subscribe('requests.my.subject.*', func(msg *nats.Msg) {
    // Validate the msg via headers, inspect the body or whatever.
    // Reply to the client on failure.
    if err := validateRequest(msg); err != nil {
        msg.Respond([]byte(err.Error()))
        return
    }
    
    // Validation passes, derive subject for stream.
    // (There could be multiple ways to do this)
    subject := strings.TrimPrefix(msg.Subject, 'requests.')
    
    // Prepare a message that is includes the data and header
    // of the original one. Including the header ensures things
    // like Nats-Expected-Last-Sequence or Nats-Expected-Last-Msg-Id
    // headers work as expected.
    jmsg := nats.Msg{
        Subject: subject,
        Data:    msg.Data,
        Header:  msg.Header,
    }
    
    // Publish to the stream with the same data. Optionally include
    // the original message headers.
    ack, err := js.PublishMsg(&jmsg)
    // Publish failed for some reason. A retry might be appropriate
    // depending on the error.
    if err != nil {
        msg.Respond([]byte(err.Error()))
        return
    }
    
    // Reply to the client which whatever info is need. Even the ack
    // info could be serialized and replied as the message to ensure
    // client gets the sequence number.
    ackb, _ := json.Marshal(ack)
    msg.Respond(ackb)
})
This code is just for example purposes. The flow and use of the Go client should be correct, but the error handling could be better! Like serializing a proper message envelope to distinguish the error from the good response.

As a client, I can now do this:

header := make(nats.Header)
header.Add(nats.MsgIdHdr, "...")
// Add other headers..

msg := nats.Msg{
    Subject: 'requests.my.subject.foo',
    Data: []byte("..."),
    Header: header,
}

rep, err := nc.RequestMsg(&msg, time.Second)
// Handle network/connection error

// Otherwise, decode rep.Data as an application error, the JetStream error, or the (successful) pub ack info. Lets the assume the happy path.
var ack nats.PubAck
json.Unmarshal(msg.Data, &ack)

// Now we have access to ack.Sequence and other metadata.
Like I said in the code subtext above, there are straightforward ways to differentiate the various error/response types even with this light indirection between the client and writing to the stream.

If you would like more in-depth information with examples to any of these questions, please reach out on Slack or Twitter!