In modern event-driven architectures, particularly those configured with the Axon Framework on Spring Boot, it is often necessary to reprocess events from a specific point in time when discrepancies, data corrections, or debugging requirements arise. This guide explains how to configure a tracking event processor that is empowered to have its tracking token set to any arbitrary point on the event stream.
The configuration includes the ability to attach a replay context – a custom piece of metadata – that becomes available during event replays. This context allows your event processing logic to differentiate between regular event consumption and a replay scenario, thereby enabling conditional processing or specialized logging.
Before configuring the tracking processor, ensure that you are using Axon Framework version 4.6.0 or later. Your Maven configuration should include the Axon Spring Boot starter dependency as shown below:
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.6.0</version>
</dependency>
This dependency provides all the necessary components to configure event processors, tracking tokens, and replay context propagation.
The core of our configuration consists of registering a Tracking Event Processor that is capable of handling replay context propagation. In the configuration class, we define the tracking processor, set an initial tracking token (using a token from the tail of the event stream), and enable replay context propagation through a custom lambda expression. This lambda is called during event replay to provide additional processing logic based on the supplied context.
Below is a detailed configuration class for Axon Framework. In the configuration:
// Package and import declarations as required
package com.example.axonconfig;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.StreamableMessageSource;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.TrackingToken;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Instant;
import java.util.Map;
@Configuration
public class AxonEventProcessorConfig {
// Name of the event processor
private static final String PROCESSOR_NAME = "myEventProcessor";
/<b>
* Registers a tracking event processor with the event store as its message source.
* The processor uses a parallel processing configuration (with a single thread in this example),
* sets the initial tracking token from the tail, and enables replay context propagation.
*/
@Bean
public void configureEventProcessor(EventProcessingConfigurer configurer,
StreamableMessageSource<?> eventStore) {
configurer.registerTrackingEventProcessor(
PROCESSOR_NAME,
// Specify the message source. In this configuration, the event store is being used.
conf -> eventStore,
// Provides custom processor configuration
config -> TrackingEventProcessorConfiguration
.forParallelProcessing(1)
// Use the tail of the stream for initial processing, ensuring all events are processed.
.andInitialTrackingToken(StreamableMessageSource::createTailToken)
// Enable replay context propagation. This lambda is invoked for each event during a replay.
.andReplayContextPropagator((replayContext, eventMessage, processingResult) -> {
System.out.println("Replaying event: " + eventMessage.getIdentifier()
+ " with context: " + replayContext);
// Here, the context is simply passed along; modify if needed.
return replayContext;
})
);
}
/</b>
* Resets the tracking token at a specified point in time and stores an accompanying context.
* This method simulates setting the token to a new position based on the provided timestamp.
*
* @param tokenStore The token store used to persist tracking tokens.
* @param timestamp The desired point in time from which events should be reprocessed.
* @param context A map representing the custom replay context (e.g., {"replayReason": "data-correction"}).
* @param eventStore The streamable event source used to generate the tracking token.
*/
public void resetTrackingToken(TokenStore tokenStore, Instant timestamp,
Map<String, Object> context, StreamableMessageSource<?> eventStore) {
// Create a tracking token that positions the stream at the specified timestamp.
TrackingToken newToken = eventStore.createTokenAt(timestamp);
// Store the new token in the token store along with the custom replay context.
tokenStore.storeToken(newToken, PROCESSOR_NAME, context);
System.out.println("Tracking token for processor '" + PROCESSOR_NAME + "' reset to "
+ timestamp + " with context: " + context);
}
}
With the processor configured to propagate replay contexts, the event handlers need to be aware of whether they are processing events in a regular flow or as part of a replay. This enables distinct handling logic (for instance, skipping operations that trigger external side effects during a replay).
The following event handler demonstrates how to consume the replay context. It injects a ResetContext which indicates if the handler is processing a replay. In a replay scenario, the custom context can be extracted and used to decide on conditional logic.
package com.example.axonconfig.events;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.eventhandling.ResetContext;
import java.util.Map;
public class MyEventHandler {
/**
* Processes events with consideration for a replay scenario.
*
* @param event The event message being processed.
* @param resetContext The reset context which carries reinforcement information during a replay.
*/
@EventHandler
public void on(MyEvent event, ResetContext resetContext) {
if (resetContext.isReplay()) {
// Retrieve the supplied replay context from the reset context.
@SuppressWarnings("unchecked")
Map<String, Object> replayCtx = (Map<String, Object>) resetContext.getResetContext();
System.out.println("Handling replayed event " + event.getId() + " with context: " + replayCtx);
// Insert context-specific handling logic here (e.g., conditional projection updates).
} else {
// Normal event processing logic.
System.out.println("Handling event " + event.getId() + " in regular mode.");
}
}
}
In a production environment, you might trigger token resets and event replays programmatically, for example, via a REST endpoint or an admin interface. The service below demonstrates a typical scenario:
package com.example.axonconfig.services;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.StreamableMessageSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import com.example.axonconfig.AxonEventProcessorConfig;
@Service
public class EventReplayService {
private static final String PROCESSOR_NAME = "myEventProcessor";
@Autowired
private EventProcessingConfiguration eventProcessingConfiguration;
@Autowired
private TokenStore tokenStore;
@Autowired
private StreamableMessageSource<?> eventStore;
@Autowired
private AxonEventProcessorConfig axonConfig;
/**
* Replays events starting from a specific timestamp.
* A custom context is supplied to influence the event handler processing logic during a replay.
*
* @param timestamp The point from which to start replaying events.
* @param context The custom context map (e.g., {"replayReason": "data-correction", "initiator": "admin"}).
*/
public void triggerReplay(Instant timestamp, Map<String, Object> context) {
// Reset the tracking token to the desired position with the provided context.
axonConfig.resetTrackingToken(tokenStore, timestamp, context, eventStore);
// Retrieve the event processor and orchestrate a processor restart.
Optional<TrackingEventProcessor> processorOptional =
eventProcessingConfiguration.eventProcessor(PROCESSOR_NAME, TrackingEventProcessor.class);
processorOptional.ifPresent(processor -> {
processor.shutDown();
// Here we invoke the resetTokens method which, combined with our token store update, replays the events.
processor.resetTokens();
processor.start();
System.out.println("Event processor '" + PROCESSOR_NAME + "' restarted for replay.");
});
}
}
The table below summarizes key aspects of this configuration:
Aspect | Description | Implementation |
---|---|---|
Initial Token | Sets the starting position for the processor (default: tail token). | StreamableMessageSource::createTailToken() |
Replay Context Propagation | Allows passing custom context during event replay to differentiate processing behavior. | andReplayContextPropagator(lambda) |
Token Reset | Resets the tracking token at an arbitrary timestamp and stores a custom context. | resetTrackingToken() |
Processor Restart | Shuts down, resets tokens, and restarts the event processor to begin reprocessing. | processor.shutDown(), resetTokens(), start() |
Beyond the basic configuration, consider the following advanced topics to further refine your event processing strategy:
Replaying events can lead to duplicate actions if not managed properly. Ensure that your event handlers are idempotent, meaning that processing the same event more than once does not result in unwanted side effects. Use the replay context to conditionally bypass certain side-effect operations.
Store tokens in a persistent TokenStore (e.g., JPA, JDBC) to maintain processor state across application restarts and distribute processing in a multi-node setup. This guarantees that replay operations resume consistently.
Properly manage the event processor’s lifecycle during a reset. Shut down the processor before resetting the token and restart it only after the token has been successfully updated. This prevents race conditions and ensures reliable replay execution.
The custom replay context allows you to embed metadata that affects event handling. For example, use the context to trigger specific transformations or alternate processing routes in the event handlers based on administrative actions or error correction workflows.
This guide detailed a comprehensive Spring Boot configuration for Axon Framework that sets up a tracking event processor with a customizable tracking token and replay context propagation. Leveraging the features introduced in Axon Framework v4.6.0, you can easily reset the processor at any specified point to reprocess events, all while supplying context information that helps event handlers adapt their behavior during a replay.
With this approach, your event-driven system gains improved resilience and flexibility, enabling sophisticated recovery scenarios and fine-grained control over event processing.