In robotic applications, it's common to handle data from multiple sensors or sources. Ensuring that messages from different topics are processed in a synchronized manner is crucial for tasks like sensor fusion, where data consistency and timing are paramount. ROS2 provides tools to facilitate this synchronization, primarily through the message_filters
package.
The message_filters
package in ROS2 is designed to synchronize incoming messages from multiple topics based on their timestamps. This ensures that related messages are processed together, maintaining data integrity and temporal coherence.
There are two primary types of synchronizers provided by the message_filters
package:
Synchronizer Type | Description | Use Case |
---|---|---|
TimeSynchronizer | Requires messages to have exact timestamp matches. | Ideal when messages are published at precise intervals and synchronization is strict. |
ApproximateTimeSynchronizer | Allows for a configurable time difference (slop) between messages. | Suitable when messages may have slight timing discrepancies. |
The choice between TimeSynchronizer
and ApproximateTimeSynchronizer
depends on the precision required for synchronization:
Ensure that the message_filters
package is installed in your ROS2 environment. It typically comes pre-installed with standard ROS2 distributions.
Begin by importing the necessary modules in your Python script:
import rclpy
from rclpy.node import Node
from message_filters import Subscriber, TimeSynchronizer, ApproximateTimeSynchronizer
from your_custom_msgs.msg import TypeX, TypeY # Replace with actual message types
Define a new class that inherits from Node
. This class will handle the synchronization logic.
class SynchronizedSubscriber(Node):
def __init__(self):
super().__init__('synchronized_subscriber')
# Create message_filters subscribers
self.sub_x = Subscriber(self, TypeX, 'topic_x')
self.sub_y = Subscriber(self, TypeY, 'topic_y')
Depending on your synchronization needs, initialize either TimeSynchronizer
or ApproximateTimeSynchronizer
.
# For exact time synchronization
ts = TimeSynchronizer([self.sub_x, self.sub_y], queue_size=10)
ts.registerCallback(self.sync_callback)
# For approximate time synchronization
ats = ApproximateTimeSynchronizer([self.sub_x, self.sub_y], queue_size=10, slop=0.1)
ats.registerCallback(self.sync_callback)
The queue_size
determines how many incoming messages to store for synchronization, and slop
specifies the allowable time difference for approximate synchronization.
The callback function is invoked when synchronized messages are received. Here, you can process the paired messages.
def sync_callback(self, msg_x, msg_y):
self.get_logger().info(f'Received synchronized messages: X={msg_x.data}, Y={msg_y.data}')
# Add your processing logic here
Finalize your node by initializing ROS2 and starting the event loop.
def main(args=None):
rclpy.init(args=args)
node = SynchronizedSubscriber()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
Below is a complete example demonstrating how to synchronize two subscribers using ApproximateTimeSynchronizer
.
import rclpy
from rclpy.node import Node
from message_filters import Subscriber, ApproximateTimeSynchronizer
from std_msgs.msg import String, Int32 # Replace with your actual message types
class SyncNode(Node):
def __init__(self):
super().__init__('sync_node')
# Initialize subscribers
self.sub_x = Subscriber(self, String, 'topic_x')
self.sub_y = Subscriber(self, Int32, 'topic_y')
# Initialize ApproximateTimeSynchronizer
self.ats = ApproximateTimeSynchronizer([self.sub_x, self.sub_y], queue_size=10, slop=0.1)
self.ats.registerCallback(self.synced_callback)
def synced_callback(self, msg_x, msg_y):
self.get_logger().info(f'Synchronized callback: Received msg_x: "{msg_x.data}" and msg_y: {msg_y.data}')
# Process the synchronized messages here
def main(args=None):
rclpy.init(args=args)
node = SyncNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("Keyboard Interrupt (SIGINT)")
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
String
and Int32
respectively.synced_callback
function processes the synchronized messages.Proper configuration of the queue_size
and slop
parameters is essential for effective synchronization:
Synchronization relies on message timestamps. If your messages do not include a header with a timestamp, you may need to add one or implement alternative synchronization strategies.
std_msgs/Header
.The message_filters
package supports synchronization across more than two topics. To synchronize multiple topics:
# Example for synchronizing three topics
self.sub_z = Subscriber(self, TypeZ, 'topic_z')
ats = ApproximateTimeSynchronizer([self.sub_x, self.sub_y, self.sub_z], queue_size=10, slop=0.1)
ats.registerCallback(self.synced_callback)
def synced_callback(self, msg_x, msg_y, msg_z):
# Process synchronized messages from three topics
pass
In sensor fusion applications, synchronized messages from different sensors (e.g., camera and lidar) can be combined to enhance data accuracy and reliability. Proper synchronization ensures that spatial and temporal data align correctly, enabling more effective processing and decision-making.
If messages are being dropped or not synchronized as expected, consider the following:
High latency can disrupt synchronization. Optimize your ROS2 node to minimize processing delays:
In multi-device setups, ensure that all devices have synchronized clocks. Use network time protocols (e.g., NTP) to maintain consistent time across the system.
Synchronizing multiple subscribers in a ROS2 Python node is essential for applications that rely on data from diverse sources. By leveraging the message_filters
package, developers can efficiently manage synchronized callbacks, ensuring data integrity and enhancing the performance of robotic systems. Careful configuration of synchronization parameters and adherence to best practices will lead to robust and reliable synchronization, laying the foundation for advanced robotic applications.