This guide provides a detailed walkthrough for building an event-sourced application using NATS JetStream, addressing the specific requirements of event playback, filtering, subscription, and domain organization. NATS JetStream is a powerful streaming and messaging system that provides persistence, event replay, and flexible subscription mechanisms, making it an excellent choice for event sourcing.
Before diving into the implementation, ensure you have the following setup:
nats-server
as described here: https://docs.nats.io/nats-concepts/jetstream/configuring.Events are the core of an event-sourced system. Each event should include:
item-123
, order-456
).inventory
, orders
).ItemAdded
, ItemRemoved
, OrderCreated
, OrderShipped
).Example JSON structure for an event:
{
"id": "event-12345",
"aggregate_id": "item-67890",
"domain": "inventory",
"event_type": "ItemUpdated",
"timestamp": "2024-12-18T12:34:56Z",
"payload": {
"name": "Updated Item Name",
"price": 99.99
}
}
JetStream organizes data into streams. A stream is a durable log of messages, and in this case, it will store events. Streams are configured with subjects that define the message space they capture.
ecommerce.orders
, ecommerce.products
, or simply events
.events.inventory.item.*
events.orders.order.*
events.sales.customer.*
Use the NATS CLI or API to create a stream. Example using the NATS CLI:
nats stream add EVENTS \
--subjects "events.>" \
--storage file \
--retention limits \
--max-msgs -1 \
--max-bytes -1 \
--discard new \
--dupe-window 2m
This command creates a stream named EVENTS
that stores events for all domains and aggregates. The events.>
subject captures all events under the events.
prefix. The --storage file
option stores events on disk for persistence. The --retention limits
option retains events indefinitely, but this can be adjusted based on your needs. The --max-msgs -1
and --max-bytes -1
options mean that there is no limit on the number of messages or bytes stored in the stream. The --discard new
option means that when the stream reaches its limits, new messages will be discarded. The --dupe-window 2m
option means that duplicate messages will be discarded within a 2 minute window.
For detailed stream configuration options, see: https://docs.nats.io/nats-concepts/jetstream/streams
Events are published to subjects within a stream. For example, to publish an event for an order with ID order-123
in the ecommerce.orders
domain, you would use the subject events.ecommerce.orders.order-123
.
package main
import (
"encoding/json"
"log"
"time"
"github.com/nats-io/nats.go"
)
type Event struct {
ID string `json:"id"`
AggregateID string `json:"aggregate_id"`
Domain string `json:"domain"`
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
Payload interface{} `json:"payload"`
}
func main() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Enable JetStream
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Create an event
event := Event{
ID: "event-12345",
AggregateID: "order-123",
Domain: "ecommerce.orders",
EventType: "OrderCreated",
Timestamp: time.Now(),
Payload: map[string]interface{}{
"customer_id": "cust-456",
"total": 250.00,
},
}
// Serialize the event to JSON
data, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
// Publish the event
subject := "events.ecommerce.orders.order-123"
_, err = js.Publish(subject, data)
if err != nil {
log.Fatal(err)
}
log.Println("Event published successfully!")
}
For more details on publishing messages, see: https://docs.nats.io/nats-concepts/jetstream/js_walkthrough
To replay all events for a specific aggregate, use a JetStream Consumer. Consumers allow you to retrieve messages based on filters. There are two primary types of consumers: pull-based and push-based.
Create a pull consumer for a specific aggregate, such as order-123
:
nats consumer add EVENTS order-123-consumer \
--filter "events.ecommerce.orders.order-123" \
--deliver all \
--ack explicit
This command creates a consumer named order-123-consumer
that filters for events related to order-123
. The --deliver all
option ensures that all messages are delivered, and the --ack explicit
option requires explicit acknowledgement of messages.
Use the NATS client to fetch and process events:
sub, err := js.PullSubscribe("events.ecommerce.orders.order-123", "order-123-consumer")
if err != nil {
log.Fatal(err)
}
msgs, err := sub.Fetch(10) // Fetch up to 10 messages
if err != nil {
log.Fatal(err)
}
for _, msg := range msgs {
log.Printf("Received event: %s", string(msg.Data))
msg.Ack() // Acknowledge the message
}
To filter events by type, you can include the event type in the subject. For example:
events.ecommerce.orders.order-123.created
events.ecommerce.orders.order-123.updated
When creating a consumer, use the --filter
option to specify the event type:
nats consumer add EVENTS order-created-consumer \
--filter "events.ecommerce.orders.*.created" \
--deliver all \
--ack explicit
Alternatively, you can filter events by type within your application logic after fetching events by inspecting the event_type
field in the event payload.
for _, msg := range msgs {
var event Event
json.Unmarshal(msg.Data, &event)
if event.EventType == "OrderCreated" {
log.Printf("Filtered Event: %s\n", string(msg.Data))
}
msg.Ack()
}
To receive real-time updates for a specific aggregate, use a Push Consumer. Push consumers deliver messages to a callback function.
sub, err := js.Subscribe("events.ecommerce.orders.order-123", func(msg *nats.Msg) {
log.Printf("Received event: %s", string(msg.Data))
msg.Ack()
}, nats.Durable("order-123-consumer"))
if err != nil {
log.Fatal(err)
}
This code creates a durable subscription to the subject events.ecommerce.orders.order-123
. The nats.Durable("order-123-consumer")
option ensures that the consumer is durable and will receive messages even if it is temporarily disconnected.
To organize aggregates by domain, use hierarchical subjects. For example:
events.ecommerce.orders.*
events.ecommerce.products.*
events.inventory.items.*
This structure allows you to group aggregates logically and create domain-specific streams and consumers. You can replay all events in a domain by using a wildcard subject filter.
To replay all events in the ecommerce.orders
domain:
nats consumer add EVENTS all-orders-consumer \
--filter "events.ecommerce.orders.*" \
--deliver all \
--ack explicit
This command creates a consumer named all-orders-consumer
that filters for all events in the ecommerce.orders
domain.
nats-top
or Prometheus. For scaling, use clustering and super-cluster configurations. See: https://docs.nats.io/nats-server/configuration/clusteringThis guide provides a detailed roadmap for developing an event-sourced application using NATS JetStream. By leveraging JetStream’s powerful features like streams, consumers, and hierarchical subjects, you can build a scalable and efficient event-sourced system. For further reading, explore the official NATS documentation: https://docs.nats.io