Chat
Ask me anything
Ithy Logo

Spring Boot Axon Framework Tracking Token Configuration

A comprehensive guide to reprocessing events with replay context propagation

axon framework event processor

Key Takeaways

  • Custom Tracking Token Positioning: Set the token arbitrarily to reprocess events from any desired point in time.
  • Replay Context Propagation: Supply a custom context during token reset so that event handlers can evaluate this context during a replay.
  • Seamless Processor Reset: Utilize Axon Framework v4.6.0 features to reset tokens and trigger event reprocessing, enhancing resilience and recovery.

Overview

In modern event-driven architectures, particularly those configured with the Axon Framework on Spring Boot, it is often necessary to reprocess events from a specific point in time when discrepancies, data corrections, or debugging requirements arise. This guide explains how to configure a tracking event processor that is empowered to have its tracking token set to any arbitrary point on the event stream.

The configuration includes the ability to attach a replay context – a custom piece of metadata – that becomes available during event replays. This context allows your event processing logic to differentiate between regular event consumption and a replay scenario, thereby enabling conditional processing or specialized logging.


Dependencies and Setup

Add Axon Framework Dependencies

Before configuring the tracking processor, ensure that you are using Axon Framework version 4.6.0 or later. Your Maven configuration should include the Axon Spring Boot starter dependency as shown below:


<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-spring-boot-starter</artifactId>
    <version>4.6.0</version>
</dependency>
  

This dependency provides all the necessary components to configure event processors, tracking tokens, and replay context propagation.


Tracking Event Processor Configuration

The core of our configuration consists of registering a Tracking Event Processor that is capable of handling replay context propagation. In the configuration class, we define the tracking processor, set an initial tracking token (using a token from the tail of the event stream), and enable replay context propagation through a custom lambda expression. This lambda is called during event replay to provide additional processing logic based on the supplied context.

Event Processor Registration

Configuration Class

Below is a detailed configuration class for Axon Framework. In the configuration:

  • A tracking event processor is registered with a consistent name.
  • The tracking token is initially set using the event store’s tail token.
  • Replay context propagation is enabled. During a replay, the processor logs the event details along with the custom context.

// Package and import declarations as required
package com.example.axonconfig;

import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.StreamableMessageSource;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.TrackingToken;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Instant;
import java.util.Map;

@Configuration
public class AxonEventProcessorConfig {

    // Name of the event processor
    private static final String PROCESSOR_NAME = "myEventProcessor";

    /<b>
     * Registers a tracking event processor with the event store as its message source.
     * The processor uses a parallel processing configuration (with a single thread in this example),
     * sets the initial tracking token from the tail, and enables replay context propagation.
     */
    @Bean
    public void configureEventProcessor(EventProcessingConfigurer configurer,
                                          StreamableMessageSource<?> eventStore) {
        configurer.registerTrackingEventProcessor(
            PROCESSOR_NAME,
            // Specify the message source. In this configuration, the event store is being used.
            conf -> eventStore,
            // Provides custom processor configuration
            config -> TrackingEventProcessorConfiguration
                        .forParallelProcessing(1)
                        // Use the tail of the stream for initial processing, ensuring all events are processed.
                        .andInitialTrackingToken(StreamableMessageSource::createTailToken)
                        // Enable replay context propagation. This lambda is invoked for each event during a replay.
                        .andReplayContextPropagator((replayContext, eventMessage, processingResult) -> {
                            System.out.println("Replaying event: " + eventMessage.getIdentifier() 
                                               + " with context: " + replayContext);
                            // Here, the context is simply passed along; modify if needed.
                            return replayContext;
                        })
        );
    }

    /</b>
     * Resets the tracking token at a specified point in time and stores an accompanying context.
     * This method simulates setting the token to a new position based on the provided timestamp.
     *
     * @param tokenStore The token store used to persist tracking tokens.
     * @param timestamp  The desired point in time from which events should be reprocessed.
     * @param context    A map representing the custom replay context (e.g., {"replayReason": "data-correction"}).
     * @param eventStore The streamable event source used to generate the tracking token.
     */
    public void resetTrackingToken(TokenStore tokenStore, Instant timestamp,
                                   Map<String, Object> context, StreamableMessageSource<?> eventStore) {
        // Create a tracking token that positions the stream at the specified timestamp.
        TrackingToken newToken = eventStore.createTokenAt(timestamp);
        // Store the new token in the token store along with the custom replay context.
        tokenStore.storeToken(newToken, PROCESSOR_NAME, context);
        System.out.println("Tracking token for processor '" + PROCESSOR_NAME + "' reset to " 
                           + timestamp + " with context: " + context);
    }
}
  

Implementing Replay-Aware Event Handling

With the processor configured to propagate replay contexts, the event handlers need to be aware of whether they are processing events in a regular flow or as part of a replay. This enables distinct handling logic (for instance, skipping operations that trigger external side effects during a replay).

Event Handler Example

Using the Replay Context

The following event handler demonstrates how to consume the replay context. It injects a ResetContext which indicates if the handler is processing a replay. In a replay scenario, the custom context can be extracted and used to decide on conditional logic.


package com.example.axonconfig.events;

import org.axonframework.eventhandling.EventHandler;
import org.axonframework.eventhandling.ResetContext;
import java.util.Map;

public class MyEventHandler {

    /**
     * Processes events with consideration for a replay scenario.
     *
     * @param event        The event message being processed.
     * @param resetContext The reset context which carries reinforcement information during a replay.
     */
    @EventHandler
    public void on(MyEvent event, ResetContext resetContext) {
        if (resetContext.isReplay()) {
            // Retrieve the supplied replay context from the reset context.
            @SuppressWarnings("unchecked")
            Map<String, Object> replayCtx = (Map<String, Object>) resetContext.getResetContext();
            System.out.println("Handling replayed event " + event.getId() + " with context: " + replayCtx);
            // Insert context-specific handling logic here (e.g., conditional projection updates).
        } else {
            // Normal event processing logic.
            System.out.println("Handling event " + event.getId() + " in regular mode.");
        }
    }
}
  

Service for Resetting and Replaying Events

In a production environment, you might trigger token resets and event replays programmatically, for example, via a REST endpoint or an admin interface. The service below demonstrates a typical scenario:

  • The service resets the tracking token at a specified point in time and supplies a replay context.
  • It then shuts down, resets tokens, and restarts the event processor.

package com.example.axonconfig.services;

import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.StreamableMessageSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;

import com.example.axonconfig.AxonEventProcessorConfig;

@Service
public class EventReplayService {

    private static final String PROCESSOR_NAME = "myEventProcessor";

    @Autowired
    private EventProcessingConfiguration eventProcessingConfiguration;

    @Autowired
    private TokenStore tokenStore;

    @Autowired
    private StreamableMessageSource<?> eventStore;

    @Autowired
    private AxonEventProcessorConfig axonConfig;

    /**
     * Replays events starting from a specific timestamp.
     * A custom context is supplied to influence the event handler processing logic during a replay.
     *
     * @param timestamp The point from which to start replaying events.
     * @param context   The custom context map (e.g., {"replayReason": "data-correction", "initiator": "admin"}).
     */
    public void triggerReplay(Instant timestamp, Map<String, Object> context) {
        // Reset the tracking token to the desired position with the provided context.
        axonConfig.resetTrackingToken(tokenStore, timestamp, context, eventStore);

        // Retrieve the event processor and orchestrate a processor restart.
        Optional<TrackingEventProcessor> processorOptional =
                eventProcessingConfiguration.eventProcessor(PROCESSOR_NAME, TrackingEventProcessor.class);
        processorOptional.ifPresent(processor -> {
            processor.shutDown();
            // Here we invoke the resetTokens method which, combined with our token store update, replays the events.
            processor.resetTokens();
            processor.start();
            System.out.println("Event processor '" + PROCESSOR_NAME + "' restarted for replay.");
        });
    }
}
  

Configuration Summary

The table below summarizes key aspects of this configuration:

Aspect Description Implementation
Initial Token Sets the starting position for the processor (default: tail token). StreamableMessageSource::createTailToken()
Replay Context Propagation Allows passing custom context during event replay to differentiate processing behavior. andReplayContextPropagator(lambda)
Token Reset Resets the tracking token at an arbitrary timestamp and stores a custom context. resetTrackingToken()
Processor Restart Shuts down, resets tokens, and restarts the event processor to begin reprocessing. processor.shutDown(), resetTokens(), start()

Advanced Considerations

Beyond the basic configuration, consider the following advanced topics to further refine your event processing strategy:

Idempotency in Event Handlers

Replaying events can lead to duplicate actions if not managed properly. Ensure that your event handlers are idempotent, meaning that processing the same event more than once does not result in unwanted side effects. Use the replay context to conditionally bypass certain side-effect operations.

Persistent Token Storage

Store tokens in a persistent TokenStore (e.g., JPA, JDBC) to maintain processor state across application restarts and distribute processing in a multi-node setup. This guarantees that replay operations resume consistently.

Replay Lifecycle Management

Properly manage the event processor’s lifecycle during a reset. Shut down the processor before resetting the token and restart it only after the token has been successfully updated. This prevents race conditions and ensures reliable replay execution.

Utilizing the Replay Context

The custom replay context allows you to embed metadata that affects event handling. For example, use the context to trigger specific transformations or alternate processing routes in the event handlers based on administrative actions or error correction workflows.


Conclusion

This guide detailed a comprehensive Spring Boot configuration for Axon Framework that sets up a tracking event processor with a customizable tracking token and replay context propagation. Leveraging the features introduced in Axon Framework v4.6.0, you can easily reset the processor at any specified point to reprocess events, all while supplying context information that helps event handlers adapt their behavior during a replay.

With this approach, your event-driven system gains improved resilience and flexibility, enabling sophisticated recovery scenarios and fine-grained control over event processing.


References


More


Last updated February 18, 2025
Ask Ithy AI
Download Article
Delete Article