All posts

NATS Weekly #25

Week of May 2 - 8, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

️ 💁 News

Random info/announcements that don’t fit into the other categories.

⚡Releases

Official releases from NATS repos and others in the ecosystem.

📖 Articles

Blog posts, tutorials, or any other text-based content about NATS.

💬 Discussions

Github Discussions from various NATS repositories.

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

How can I rate limit a client connection’s consumption of a messages?

Consumers have rate limit setting, but this is for bits/second. This could be a proxy for messages/second from a bandwidth perspective, but not at the API level.

Fundamentally, when a client connects it will either want to interact with a service (request-reply) or consume a stream (core NATS or JetStream subscription).

Service-based rate limiting is fairly straightforward since the service handler implementation can contain this logic. However, since publishers and subscribers are decoupled, by default a service handler (which is a subscriber itself) would not know who/what is making the request. Some kind of identity would need to be propagated as part of the request message (such as in the header). This would then enable the service handler to rate limit on a per client/identity basis (akin to rate limiting in HTTP services).

Rate limiting consumption of a stream is a bit more interesting. For core NATS, the only way I can see that working is if every (authenticated/identified) client would have a dedicated subject it could consume from (or a subject per tier of allowed rate limit). The publishing side would then need to control the flow/rate of messages publish from the sources. In other words, an intermediary subscriber would receive messages from the source and then buffer them and then re-publish them on the client consumable subjects at the rate that is allowed.

The buffer use case is perfect for JetStream since there could be a stream that accepts all source publishes and then a consumer for different clients or rates could be created independently that then republish to a subject for client consumption.

What does it mean to seal a stream?

The docs show the term “seal” to be associated with the object store layer. This option landed in September 2021. Per the PR message:

Sealed streams can not accept new messages, allow you to delete or purge messages, or have messages expire due to age. Sealed stream can not be unsealed through an update.

So this is a one-way street once applied.

What was the original use case? Derek Collison (NATS creator) was kind enough to share that it was for:

object store, for tamper-proof assets

The idea being that assets can be pre-loaded and then seal. However, this could be applied to the KV store or a stream in general.

Another use case may be for active streams that need to be sourced and succeeded by a new stream. An old stream can be sealed and then sourced by one or more new streams that then writable.

How can I encrypt message data?

One approach would be to use age, a modern, open source command line tool and Go library (there is also a Rust port) for doing file/data encryption.

The API is small and straightforward to use. Here is one usage example on the publisher side (error handling omitted for brevity).

// Each line is a public key of recipients that can decrypt

// the message with their private key. Alternatively, a single

// public key can be parsed using "ParseX25519Recipient".

f, _ := os.Open("list-of-recipients.txt")

defer f.Close()

rs, _ := age.ParseRecipients(f)

// Buffer that the encrypted data will be written to.

var b bytes.Buffer

wc, _ := age.Encrypt(&b, rs...)

defer wc.Close()

// Pass the unencrypted data to the writer to encrypt.

wc.Write([]byte("..."))

// Publish the encrypted bytes.

nc.Publish("foo", b.Bytes())

On the subscription side..

// Parse the private key passed in via config or read from a file.

ident, _ := age.ParseX25519Identity("AGE-SECRET-KEY-1...")

// Subscribe to the subject containing encrypted messages.

sub, _ := nc.SubscribeSync("foo")

// Get a message.

msg, _ := sub.NextMsg(time.Second)

// Create a io.Reader with the encrypted bytes.

er := bytes.NewBuffer(msg.Data)

// Pass the reader and the identity which returns a reader

// containing the decrypted bytes.

r, _ := age.Decrypt(br, ident)

// Read all the, now, decrypted bytes.

data, _ := ioutil.ReadAll(r)

The same workflow can be done for headers as well, however the value would need to be encoded as a string first (e.g. base64).