ROS 2 Practical Examples
Simple Publisher-Subscriber Example
Let's implement a complete publisher-subscriber example that demonstrates basic ROS 2 communication.
Publisher Node
Create simple_publisher.py:
#!/usr/bin/env python3
"""
Simple publisher node that sends messages to a topic
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SimplePublisher(Node):
def __init__(self):
super().__init__('simple_publisher')
# Create a publisher for the 'robot_commands' topic
self.publisher_ = self.create_publisher(String, 'robot_commands', 10)
# Create a timer to publish messages every 0.5 seconds
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
# Counter for messages
self.i = 0
self.get_logger().info('Simple publisher node initialized')
def timer_callback(self):
"""Callback function that publishes messages"""
msg = String()
msg.data = f'Command {self.i}: Move forward'
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
rclpy.init(args=args)
simple_publisher = SimplePublisher()
try:
rclpy.spin(simple_publisher)
except KeyboardInterrupt:
pass
finally:
simple_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Subscriber Node
Create simple_subscriber.py:
#!/usr/bin/env python3
"""
Simple subscriber node that receives messages from a topic
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SimpleSubscriber(Node):
def __init__(self):
super().__init__('simple_subscriber')
# Create a subscription to the 'robot_commands' topic
self.subscription = self.create_subscription(
String,
'robot_commands',
self.listener_callback,
10) # QoS depth
# Prevent unused variable warning
self.subscription # type: ignore
self.get_logger().info('Simple subscriber node initialized')
def listener_callback(self, msg):
"""Callback function that processes received messages"""
self.get_logger().info(f'Received command: "{msg.data}"')
# Process the command (in a real robot, this would trigger movement)
self.process_command(msg.data)
def process_command(self, command):
"""Process the received command"""
if 'Move forward' in command:
self.get_logger().info('Executing forward movement...')
elif 'Turn left' in command:
self.get_logger().info('Executing left turn...')
elif 'Turn right' in command:
self.get_logger().info('Executing right turn...')
elif 'Stop' in command:
self.get_logger().info('Stopping robot...')
def main(args=None):
rclpy.init(args=args)
simple_subscriber = SimpleSubscriber()
try:
rclpy.spin(simple_subscriber)
except KeyboardInterrupt:
pass
finally:
simple_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Running the Example
- Make the files executable:
chmod +x simple_publisher.py
chmod +x simple_subscriber.py
- Open two terminal windows and source ROS 2 in both:
source /opt/ros/humble/setup.bash
- In the first terminal, run the publisher:
python3 simple_publisher.py
- In the second terminal, run the subscriber:
python3 simple_subscriber.py
You should see the publisher sending commands and the subscriber receiving them.
Service Example
Now let's create a service example for robot control commands.
Service Definition
First, create the service definition file RobotControl.srv:
# Request
string command
float64 value
---
# Response
bool success
string message
Service Server
Create robot_control_server.py:
#!/usr/bin/env python3
"""
Service server for robot control commands
"""
import rclpy
from rclpy.node import Node
from example_interfaces.srv import Trigger # Using Trigger as a simple example
import time
class RobotControlServer(Node):
def __init__(self):
super().__init__('robot_control_server')
# Create a service
self.srv = self.create_service(
Trigger, # Using Trigger as example - in practice, use custom service
'robot_control',
self.control_callback)
self.get_logger().info('Robot control server initialized')
def control_callback(self, request, response):
"""Handle control requests"""
self.get_logger().info('Received control request')
# Simulate processing time
time.sleep(0.1)
# In a real implementation, this would control actual robot hardware
response.success = True
response.message = 'Command executed successfully'
return response
def main(args=None):
rclpy.init(args=args)
robot_control_server = RobotControlServer()
try:
rclpy.spin(robot_control_server)
except KeyboardInterrupt:
pass
finally:
robot_control_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Service Client
Create robot_control_client.py:
#!/usr/bin/env python3
"""
Service client for robot control commands
"""
import rclpy
from rclpy.node import Node
from example_interfaces.srv import Trigger # Using Trigger as example
class RobotControlClient(Node):
def __init__(self):
super().__init__('robot_control_client')
# Create a client for the robot control service
self.cli = self.create_client(Trigger, 'robot_control')
# Wait for the service to be available
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for robot control service...')
self.req = Trigger.Request()
def send_request(self):
"""Send a request to the service"""
self.future = self.cli.call_async(self.req)
return self.future
def main(args=None):
rclpy.init(args=args)
robot_control_client = RobotControlClient()
# Send a request
future = robot_control_client.send_request()
try:
rclpy.spin_until_future_complete(robot_control_client, future)
if future.result() is not None:
response = future.result()
robot_control_client.get_logger().info(
f'Result: success={response.success}, message={response.message}')
else:
robot_control_client.get_logger().info('Service call failed')
except KeyboardInterrupt:
pass
finally:
robot_control_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Robot Movement Example
Let's create a more practical example for controlling a robot's movement using Twist messages.
Movement Publisher
Create robot_movement.py:
#!/usr/bin/env python3
"""
Robot movement controller using Twist messages
"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
class RobotMovement(Node):
def __init__(self):
super().__init__('robot_movement')
# Create publisher for velocity commands
self.vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
# Create publisher for status messages
self.status_publisher = self.create_publisher(String, '/robot_status', 10)
# Timer for sending movement commands
self.timer = self.create_timer(0.1, self.timer_callback)
# Movement parameters
self.linear_speed = 0.5 # m/s
self.angular_speed = 0.5 # rad/s
self.movement_mode = 'forward' # forward, backward, left, right, stop
self.get_logger().info('Robot movement controller initialized')
def timer_callback(self):
"""Send movement commands based on current mode"""
twist_msg = Twist()
if self.movement_mode == 'forward':
twist_msg.linear.x = self.linear_speed
twist_msg.angular.z = 0.0
elif self.movement_mode == 'backward':
twist_msg.linear.x = -self.linear_speed
twist_msg.angular.z = 0.0
elif self.movement_mode == 'left':
twist_msg.linear.x = 0.0
twist_msg.angular.z = self.angular_speed
elif self.movement_mode == 'right':
twist_msg.linear.x = 0.0
twist_msg.angular.z = -self.angular_speed
elif self.movement_mode == 'stop':
twist_msg.linear.x = 0.0
twist_msg.angular.z = 0.0
# Publish the velocity command
self.vel_publisher.publish(twist_msg)
# Publish status
status_msg = String()
status_msg.data = f'Moving {self.movement_mode}'
self.status_publisher.publish(status_msg)
def main(args=None):
rclpy.init(args=args)
robot_movement = RobotMovement()
try:
rclpy.spin(robot_movement)
except KeyboardInterrupt:
pass
finally:
robot_movement.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
C++ Examples
For completeness, here are equivalent examples in C++:
C++ Publisher
Create simple_publisher.cpp:
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
using namespace std::chrono_literals;
class SimplePublisher : public rclcpp::Node
{
public:
SimplePublisher()
: Node("simple_publisher"), count_(0)
{
publisher_ = this->create_publisher<std_msgs::msg::String>("robot_commands", 10);
timer_ = this->create_wall_timer(
500ms, std::bind(&SimplePublisher::timer_callback, this));
}
private:
void timer_callback()
{
auto message = std_msgs::msg::String();
message.data = "Command " + std::to_string(count_++) + ": Move forward";
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_->publish(message);
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
size_t count_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<SimplePublisher>());
rclcpp::shutdown();
return 0;
}
C++ Subscriber
Create simple_subscriber.cpp:
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
class SimpleSubscriber : public rclcpp::Node
{
public:
SimpleSubscriber()
: Node("simple_subscriber")
{
subscription_ = this->create_subscription<std_msgs::msg::String>(
"robot_commands", 10,
[this](const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Received command: '%s'", msg->data.c_str());
process_command(msg->data);
});
}
private:
void process_command(const std::string& command)
{
if (command.find("Move forward") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Executing forward movement...");
} else if (command.find("Turn left") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Executing left turn...");
} else if (command.find("Turn right") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Executing right turn...");
} else if (command.find("Stop") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Stopping robot...");
}
}
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<SimpleSubscriber>());
rclcpp::shutdown();
return 0;
}
Running Examples in Simulation
To test these examples with a simulated robot:
- Launch Gazebo with a differential drive robot:
ros2 launch gazebo_ros empty_world.launch.py
- Spawn a robot model with differential drive:
ros2 run gazebo_ros spawn_entity.py -entity my_robot -file /path/to/robot/model.sdf -x 0 -y 0 -z 0
- Run your movement controller:
ros2 run your_package robot_movement.py
The robot should respond to the velocity commands published by your node.
Exercise: Create Your Own Node
Try creating a node that:
- Publishes sensor data (e.g., simulated laser scan)
- Subscribes to movement commands
- Implements a simple behavior (e.g., obstacle avoidance)
Use the examples above as a starting point and modify them to implement your own robot behavior.