Python Agents Bridging to ROS 2
Introduction to rclpy
rclpy is the Python client library for ROS 2, providing Python APIs that allow developers to create ROS 2 nodes, publish and subscribe to topics, provide and use services, and interact with actions. It serves as the bridge between Python-based AI/ML agents and the ROS 2 robotic middleware.
Why Python for AI Integration
Python has become the de facto language for AI and machine learning development due to its:
- Rich ecosystem of scientific computing libraries (NumPy, SciPy)
- Leading AI frameworks (TensorFlow, PyTorch, scikit-learn)
- Ease of prototyping and experimentation
- Strong community support in the AI/ML domain
This makes rclpy essential for integrating AI capabilities with robotic systems.
Basic Node Structure
A minimal ROS 2 Python node follows this structure:
import rclpy
from rclpy.node import Node
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
msg = String()
msg.data = 'Hello World'
self.publisher.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Creating Publishers and Subscribers
Publisher Example
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class AIOutputPublisher(Node):
def __init__(self):
super().__init__('ai_output_publisher')
self.publisher = self.create_publisher(String, 'ai_decisions', 10)
def publish_decision(self, decision):
msg = String()
msg.data = decision
self.publisher.publish(msg)
Subscriber Example
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
self.subscription = self.create_subscription(
Image,
'camera/image_raw',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
# Process the image with AI algorithm
processed_result = self.ai_process_image(msg)
# Do something with the result
Service Clients and Servers
Service Server Example
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
Service Client Example
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class MinimalClient(Node):
def __init__(self):
super().__init__('minimal_client')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
Action Clients and Servers
Action Client Example
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
class NavigateToPoseClient(Node):
def __init__(self):
super().__init__('navigate_to_pose_client')
self._action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
def send_goal(self, pose):
goal_msg = NavigateToPose.Goal()
goal_msg.pose = pose
self._action_client.wait_for_server()
send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback)
send_goal_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback}')
Integration with AI/ML Frameworks
Example: Integrating TensorFlow with ROS 2
import rclpy
from rclpy.node import Node
import tensorflow as tf
import numpy as np
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
class TensorFlowNode(Node):
def __init__(self):
super().__init__('tensorflow_node')
self.subscription = self.create_subscription(
Image,
'camera/image_raw',
self.image_callback,
10)
self.publisher = self.create_publisher(String, 'ai_output', 10)
# Load pre-trained model
self.model = tf.keras.models.load_model('path/to/model')
self.bridge = CvBridge()
def image_callback(self, msg):
# Convert ROS image to OpenCV format
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Preprocess image for model
input_tensor = tf.convert_to_tensor(cv_image)
input_tensor = tf.expand_dims(input_tensor, 0) # Add batch dimension
# Run inference
predictions = self.model(input_tensor)
# Process results and publish
result = self.process_predictions(predictions)
self.publish_result(result)
Best Practices for Python Agents
- Use appropriate QoS settings for your application's requirements
- Handle exceptions gracefully to maintain system stability
- Use threading appropriately to avoid blocking the main ROS loop
- Manage memory efficiently when processing large data (images, point clouds)
- Log appropriately for debugging and monitoring
- Follow ROS 2 naming conventions for nodes, topics, and services
Error Handling and Robustness
import rclpy
from rclpy.node import Node
import traceback
class RobustNode(Node):
def __init__(self):
super().__init__('robust_node')
# Setup with error handling
try:
self.setup_components()
except Exception as e:
self.get_logger().error(f'Failed to setup node: {e}')
self.get_logger().error(traceback.format_exc())
def setup_components(self):
# Initialize publishers, subscribers, etc.
pass
def safe_callback(self, msg):
try:
# Process message
result = self.process_message(msg)
self.publish_result(result)
except Exception as e:
self.get_logger().error(f'Error in callback: {e}')
self.get_logger().error(traceback.format_exc())
Performance Considerations
- Batch processing: Process multiple messages together when possible
- Threading: Use separate threads for CPU-intensive AI processing
- Memory management: Be mindful of memory usage with large data like images
- Timing: Consider the real-time requirements of your robotic application
The next section will cover URDF (Unified Robot Description Format) and how it's used for humanoid robot description in ROS 2 systems.