5 min read

NATS Weekly #21

NATS Weekly #21

Week of April 4 - 10, 2022

šŸ—ž Announcements, writings, and projects

A short list of Ā announcements, blog posts, projects updates and other news.

āš”Releases

Official releases from NATS repos and others in the ecosystem.

šŸ“– Articles

Blog posts, tutorials, or any other text-based content about NATS.

šŸ’¬ Discussions

Github Discussions from various NATS repositories.

šŸ’” Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

Where can self-signed TLS certs be used?

NATS supports TLS in multiple places:

For cluster, gateway, and leafnode connections, they support the same tls configuration options. The basics include the cert_file the key_file and the ca_file.

By default, client connections will use TLS if the server supports it. The server can optionally require client verification (mutual TLS) which requires the client to provide a certificate that the key on the server can verify. Even one step further allows for client verification and mapping to a specific user.

So where do self-signed TLS certs fit in? For the uninitiated, a self-signed cert simply means that the root key that signed the certificate is not a known certificate authority (CA). Organizations like Entrust, Lets Encrypt, Digicert, Google, etc. are trusted certificate authorites and their root certicates are pre-installed in operating systems and various clients (browsers, language stores, etc.).

There are more details, but that is out of scope here.. the purpose of stating this is to point out that a self-signed certificate is no less secure, it is just not (by default) trusted since the signing key is not a deemed authority.

All that said, within the context of your infrastructure and application, if you sign your own certificates.. or better used a public key infrastructure (PKI) service, the same level of security can be achieved. The only requirement is that all clients (or configurations) will require the root CA certificate to so that presented certificates can be verified rather than relying on pre-installed CA certs as noted above.

This is implemented as nats.RootCAs as an client option for a NATS connection in Go (other languages have similar constructs). For all server/node configurations, there is the ca_file that can be specified.

Where self-signed certs present friction is if client connections or leafnodes are not controlled by you or your org. If this is the case, the root CA cert will need to be distributed so clients can download and reference when establishing a connection. If this is the case, then using a trusted CA to sign the certs required for these connections would be wise.

My final note is just acknowledging manual PKI management is hard and a service provider or tooling should be used for provisioning, distributing, rotating, etc. certs and keys.

How to model a "passage of time" stream?

There was a use case brought up to support intervals of fetching messages from a stream (e.g. via a pull consumer subscription). Another common use case for "passage of time" events are for business operations, such "closing the books", "coupon expiration", "reservation timeout", etc.

Some of these are fixed based on calendar time such as "end of business day" (at least in one specific time zone) or "end of the month." Other intervals/timers may be relative to a triggering event, such as a ticket reservation to a concert. Once a ticket is reserved, you may have 10 minutes to finalize the sale before the reservation is given up for other customers.

Fortunately, in NATS modeling these use cases are straightforward. The first code snippet show a basic process that can publish on a fixed schedule. This uses the github.com/robfig/cron/v3 module for ease of using familiar cron expressions, but the built-in time.Timer or time.Ticker would work fine as well for simpler needs.

func main() {
  // Connnect to NATS.
  nc, _ := nats.Connnect("localhost:4222")
  defer nc.Drain()
  
  // Initialize a cron instance to schedule one or more functions
  // to run. Each schedule could correspond to some domain-specific
  // schedule or a general "passage of time" event.
  c := cron.New(cron.WithSeconds())
  
  // Example of fixed schedule, run at 8pm each day to do end-of-day
  // operations.
  c.AddFunc("0 20 * * * *", func() {
    // Optionally add payload such as the actual time..
    nc.Publish("schedule.end-of-day", nil)
  })
  
  // Start the scheduler.
  c.Start()
  defer c.Stop()
  
  // Wait for interrupt.
  sigchan := make(os.Signal, 1)
  signal.Notify(sigchan, os.Interrupt)
  <-sigchan
}

Running the above code will just sit for the period of time and publish events as they occur. Of course if there are more schedules added to this process.

The next code snippet shows how the end-of-day processor would subscribe to the stream and then trigger fetching messages from the EOD queue.

func main() {
  // Connnect to NATS.
  nc, _ := nats.Connnect("...")
  defer nc.Drain()
  
  js, _ := nc.JetStream()

  // Create a subscription to the pull consumer.
  sub, _ := js.PullSubscribe("end-of-day-queue.>", "end-of-day")
  
  // This uses a standard subscription so that multiple instances
  // could be deployed that would each pull separate batches of
  // messages from the queue.
  nc.Subscribe("schedule.end-of-day", func(msg *nats.Msg) {
    // Start fetching from the pull consumer subscription until
    // no more messages are available.
    for {
      msgs, err := sub.Fetch(10)
      // Handle messages or error..
    }
  })
  
  // Wait for interrupt.
  sigchan := make(os.Signal, 1)
  signal.Notify(sigchan, os.Interrupt)
  <-sigchan
}

The next code snippet shows what a dynamic schedule might look like. This code is just for getting across the basic structure, but definitely not production ready (but read the comments!)

func main() {
  // Connnect to NATS.
  nc, _ := nats.Connnect("localhost:4222")
  defer nc.Drain()
  
  // Example of dynamic schedule, set 10 minute timer when a
  // ticket-reserved event occurs.
  js, _ := nc.JetStream()
  
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  
  // Map reservation id to cancel func if the tickets are
  // purchased before the timer expires.
  timers := map[string]context.CancelFunc{}
  var timersMux sync.Mutex
  
  // Subscribe to the reservation subjects where each concrete subject
  // corresponds to a single reservation, e.g. events.reservation.231.
  sub, _ := js.Subscribe(
    "events.reservation.*",
    "reservation-timer",
    func(msg *nats.Msg) {
      // Assume some way of getting the id and event type from
      // the message..
      id := getReservationID(msg)
      eventType := getEventType(msg)
      
      switch eventType {
      case "tickets-reserved":
        // Start goroutine, passing the parent context and the
        // reservation id.
        go func(ctx context.Context, id string) {
          // Local cancel for stopping the timer..  
       	  cctx, cancel := context.WithCancel(ctx)
          defer cancel()
          
          // Add the timer by id.
          timersMux.Lock()
          timers[id] = cancel
          timersMux.Unlock()
          
          // Remove the timer when this routine returns.
          defer func() {
            timersMux.Lock()
            delete(timers, id)
            timersMux.Unlock()
          }()
          
          // Start a timer, could be derived from the event or
          // some fixed time. Deriving from the event and then setting
          // an absolute expiration time may make more sense in case
          // this process is catching up and old reservations would
          // be effectively skipped over since they are already expired.
          timer := time.NewTimer(10 * time.Minute)
          defer timer.Stop()
            
          select {
          case <-ctx.Done():
          case <-timer.C:
            // Timeout elapsed, time out the reservation.
            // Note the MsgID which is used for de-duplication by NATS.
            // This is based on the reservation ID so that if there are
            // multiple instances of this process running, redundant
            // timers would not result in duplicate events.
            js.Publish(
              msg.Subject,
              []byte("..."),
              nats.MsgID(id + "-reservation-timeout"),
            )
          }
        }(ctx, id)
        
      case "tickets-purchased":
        // Get the timer cancel func for the reservation id.
        // Remove it (delete if not exists is a no-op). Call
        // cancel if present.
        timersMux.Lock()
        cancel, ok := timers[id]
        delete(timers, id)
        timersMux.Unlock()
        
        if ok {
          cancel()
        }
    },
  )
  
  // Wait for interrupt.
  sigchan := make(os.Signal, 1)
  signal.Notify(sigchan, os.Interrupt)
  <-sigchan
}

If you would like more in-depth information with examples to any of these questions, please reach out on Slack or Twitter!