Chat
Ask me anything
Ithy Logo

Comprehensive Guide to Developing an Event-Sourced Application Using NATS JetStream (December 18, 2024)

This guide provides a detailed walkthrough for building an event-sourced application using NATS JetStream, addressing the specific requirements of event playback, filtering, subscription, and domain organization. NATS JetStream is a powerful streaming and messaging system that provides persistence, event replay, and flexible subscription mechanisms, making it an excellent choice for event sourcing.

Prerequisites

Before diving into the implementation, ensure you have the following setup:

  1. NATS Server with JetStream Enabled:
  2. Programming Language and SDK:
    • Use a language with robust NATS client libraries. This guide will provide examples in Go, but other languages like Python and JavaScript are also suitable. Install the NATS Go client: https://github.com/nats-io/nats.go.
  3. Basic Knowledge of Event Sourcing:
    • Understand the principles of event sourcing, including aggregates, event streams, and projections. Refer to resources like the Beginner’s Guide to Event Sourcing: https://www.eventstore.com/event-sourcing.

Step 1: Designing the Event Model

Events are the core of an event-sourced system. Each event should include:

  • ID: A unique identifier for the event.
  • Aggregate ID: A unique identifier for the aggregate (e.g., item-123, order-456).
  • Domain: The domain to which the aggregate belongs (e.g., inventory, orders).
  • Event Type: The type of the event (e.g., ItemAdded, ItemRemoved, OrderCreated, OrderShipped).
  • Timestamp: When the event occurred.
  • Payload: Data associated with the event (e.g., item details, quantity, order details).

Example JSON structure for an event:


{
  "id": "event-12345",
  "aggregate_id": "item-67890",
  "domain": "inventory",
  "event_type": "ItemUpdated",
  "timestamp": "2024-12-18T12:34:56Z",
  "payload": {
    "name": "Updated Item Name",
    "price": 99.99
  }
}

Step 2: Setting Up JetStream Streams

JetStream organizes data into streams. A stream is a durable log of messages, and in this case, it will store events. Streams are configured with subjects that define the message space they capture.

Stream Configuration

  • Stream Name: Use a descriptive name for each domain, such as ecommerce.orders, ecommerce.products, or simply events.
  • Subjects: Define subjects for each aggregate type, using a hierarchical structure to support domain-based organization. For example:
    • events.inventory.item.*
    • events.orders.order.*
    • events.sales.customer.*

Create a Stream

Use the NATS CLI or API to create a stream. Example using the NATS CLI:


nats stream add EVENTS \
  --subjects "events.>" \
  --storage file \
  --retention limits \
  --max-msgs -1 \
  --max-bytes -1 \
  --discard new \
  --dupe-window 2m

This command creates a stream named EVENTS that stores events for all domains and aggregates. The events.> subject captures all events under the events. prefix. The --storage file option stores events on disk for persistence. The --retention limits option retains events indefinitely, but this can be adjusted based on your needs. The --max-msgs -1 and --max-bytes -1 options mean that there is no limit on the number of messages or bytes stored in the stream. The --discard new option means that when the stream reaches its limits, new messages will be discarded. The --dupe-window 2m option means that duplicate messages will be discarded within a 2 minute window.

For detailed stream configuration options, see: https://docs.nats.io/nats-concepts/jetstream/streams

Step 3: Publishing Events to JetStream

Events are published to subjects within a stream. For example, to publish an event for an order with ID order-123 in the ecommerce.orders domain, you would use the subject events.ecommerce.orders.order-123.

Example in Go:


package main

import (
    "encoding/json"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

type Event struct {
    ID          string      `json:"id"`
    AggregateID string      `json:"aggregate_id"`
    Domain      string      `json:"domain"`
    EventType   string      `json:"event_type"`
    Timestamp   time.Time   `json:"timestamp"`
    Payload     interface{} `json:"payload"`
}

func main() {
    // Connect to NATS
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Enable JetStream
    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    // Create an event
    event := Event{
        ID:          "event-12345",
        AggregateID: "order-123",
        Domain:      "ecommerce.orders",
        EventType:   "OrderCreated",
        Timestamp:   time.Now(),
        Payload: map[string]interface{}{
            "customer_id": "cust-456",
            "total":       250.00,
        },
    }

    // Serialize the event to JSON
    data, err := json.Marshal(event)
    if err != nil {
        log.Fatal(err)
    }

    // Publish the event
    subject := "events.ecommerce.orders.order-123"
    _, err = js.Publish(subject, data)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("Event published successfully!")
}

For more details on publishing messages, see: https://docs.nats.io/nats-concepts/jetstream/js_walkthrough

Step 4: Playback of Events for a Specific Aggregate

To replay all events for a specific aggregate, use a JetStream Consumer. Consumers allow you to retrieve messages based on filters. There are two primary types of consumers: pull-based and push-based.

Create a Pull Consumer

Create a pull consumer for a specific aggregate, such as order-123:


nats consumer add EVENTS order-123-consumer \
  --filter "events.ecommerce.orders.order-123" \
  --deliver all \
  --ack explicit

This command creates a consumer named order-123-consumer that filters for events related to order-123. The --deliver all option ensures that all messages are delivered, and the --ack explicit option requires explicit acknowledgement of messages.

Fetch Events (Pull-Based)

Use the NATS client to fetch and process events:


sub, err := js.PullSubscribe("events.ecommerce.orders.order-123", "order-123-consumer")
if err != nil {
    log.Fatal(err)
}
msgs, err := sub.Fetch(10) // Fetch up to 10 messages
if err != nil {
    log.Fatal(err)
}
for _, msg := range msgs {
    log.Printf("Received event: %s", string(msg.Data))
    msg.Ack() // Acknowledge the message
}

Step 5: Filtering Events by Type

To filter events by type, you can include the event type in the subject. For example:

  • events.ecommerce.orders.order-123.created
  • events.ecommerce.orders.order-123.updated

When creating a consumer, use the --filter option to specify the event type:


nats consumer add EVENTS order-created-consumer \
  --filter "events.ecommerce.orders.*.created" \
  --deliver all \
  --ack explicit

Alternatively, you can filter events by type within your application logic after fetching events by inspecting the event_type field in the event payload.


for _, msg := range msgs {
    var event Event
    json.Unmarshal(msg.Data, &event)

    if event.EventType == "OrderCreated" {
        log.Printf("Filtered Event: %s\n", string(msg.Data))
    }
    msg.Ack()
}

Step 6: Subscriptions for Real-Time Updates

To receive real-time updates for a specific aggregate, use a Push Consumer. Push consumers deliver messages to a callback function.

Example in Go:


sub, err := js.Subscribe("events.ecommerce.orders.order-123", func(msg *nats.Msg) {
    log.Printf("Received event: %s", string(msg.Data))
    msg.Ack()
}, nats.Durable("order-123-consumer"))
if err != nil {
    log.Fatal(err)
}

This code creates a durable subscription to the subject events.ecommerce.orders.order-123. The nats.Durable("order-123-consumer") option ensures that the consumer is durable and will receive messages even if it is temporarily disconnected.

Step 7: Organizing Aggregates by Domain

To organize aggregates by domain, use hierarchical subjects. For example:

  • events.ecommerce.orders.*
  • events.ecommerce.products.*
  • events.inventory.items.*

This structure allows you to group aggregates logically and create domain-specific streams and consumers. You can replay all events in a domain by using a wildcard subject filter.

Example: Replay All Events in a Domain

To replay all events in the ecommerce.orders domain:


nats consumer add EVENTS all-orders-consumer \
  --filter "events.ecommerce.orders.*" \
  --deliver all \
  --ack explicit

This command creates a consumer named all-orders-consumer that filters for all events in the ecommerce.orders domain.

Additional Considerations

  1. Snapshots: Use snapshots to store the current state of an aggregate and reduce replay time. Store snapshots in a separate JetStream stream or an external database.
  2. Cold Storage: Archive older events to cold storage (e.g., AWS S3) using JetStream’s export functionality. See: https://docs.nats.io/nats-concepts/jetstream/archiving
  3. Monitoring and Scaling: Monitor JetStream performance using tools like nats-top or Prometheus. For scaling, use clustering and super-cluster configurations. See: https://docs.nats.io/nats-server/configuration/clustering
  4. Event Schema Versioning: Implement event versioning using a version field in event metadata and provide upcasting mechanisms for older event versions.
  5. Error Handling: Implement comprehensive error handling for network issues and handle message delivery guarantees using NATS JetStream's at-least-once delivery.
  6. Performance Optimization: Use appropriate batch sizes for event replay, implement caching for frequently accessed aggregates, and consider implementing snapshots for large event streams.
  7. Monitoring: Track event store metrics (latency, throughput), monitor stream sizes and cleanup policies, and implement health checks for the NATS connection.

Conclusion

This guide provides a detailed roadmap for developing an event-sourced application using NATS JetStream. By leveraging JetStream’s powerful features like streams, consumers, and hierarchical subjects, you can build a scalable and efficient event-sourced system. For further reading, explore the official NATS documentation: https://docs.nats.io


December 18, 2024
Ask Ithy AI
Download Article
Delete Article