Skip to main content

ROS 2 Practical Examples

Simple Publisher-Subscriber Example

Let's implement a complete publisher-subscriber example that demonstrates basic ROS 2 communication.

Publisher Node

Create simple_publisher.py:

#!/usr/bin/env python3

"""
Simple publisher node that sends messages to a topic
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


class SimplePublisher(Node):
def __init__(self):
super().__init__('simple_publisher')

# Create a publisher for the 'robot_commands' topic
self.publisher_ = self.create_publisher(String, 'robot_commands', 10)

# Create a timer to publish messages every 0.5 seconds
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)

# Counter for messages
self.i = 0

self.get_logger().info('Simple publisher node initialized')

def timer_callback(self):
"""Callback function that publishes messages"""
msg = String()
msg.data = f'Command {self.i}: Move forward'

self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')

self.i += 1


def main(args=None):
rclpy.init(args=args)

simple_publisher = SimplePublisher()

try:
rclpy.spin(simple_publisher)
except KeyboardInterrupt:
pass
finally:
simple_publisher.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

Subscriber Node

Create simple_subscriber.py:

#!/usr/bin/env python3

"""
Simple subscriber node that receives messages from a topic
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


class SimpleSubscriber(Node):
def __init__(self):
super().__init__('simple_subscriber')

# Create a subscription to the 'robot_commands' topic
self.subscription = self.create_subscription(
String,
'robot_commands',
self.listener_callback,
10) # QoS depth

# Prevent unused variable warning
self.subscription # type: ignore

self.get_logger().info('Simple subscriber node initialized')

def listener_callback(self, msg):
"""Callback function that processes received messages"""
self.get_logger().info(f'Received command: "{msg.data}"')

# Process the command (in a real robot, this would trigger movement)
self.process_command(msg.data)

def process_command(self, command):
"""Process the received command"""
if 'Move forward' in command:
self.get_logger().info('Executing forward movement...')
elif 'Turn left' in command:
self.get_logger().info('Executing left turn...')
elif 'Turn right' in command:
self.get_logger().info('Executing right turn...')
elif 'Stop' in command:
self.get_logger().info('Stopping robot...')


def main(args=None):
rclpy.init(args=args)

simple_subscriber = SimpleSubscriber()

try:
rclpy.spin(simple_subscriber)
except KeyboardInterrupt:
pass
finally:
simple_subscriber.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

Running the Example

  1. Make the files executable:
chmod +x simple_publisher.py
chmod +x simple_subscriber.py
  1. Open two terminal windows and source ROS 2 in both:
source /opt/ros/humble/setup.bash
  1. In the first terminal, run the publisher:
python3 simple_publisher.py
  1. In the second terminal, run the subscriber:
python3 simple_subscriber.py

You should see the publisher sending commands and the subscriber receiving them.

Service Example

Now let's create a service example for robot control commands.

Service Definition

First, create the service definition file RobotControl.srv:

# Request
string command
float64 value
---
# Response
bool success
string message

Service Server

Create robot_control_server.py:

#!/usr/bin/env python3

"""
Service server for robot control commands
"""
import rclpy
from rclpy.node import Node
from example_interfaces.srv import Trigger # Using Trigger as a simple example
import time


class RobotControlServer(Node):
def __init__(self):
super().__init__('robot_control_server')

# Create a service
self.srv = self.create_service(
Trigger, # Using Trigger as example - in practice, use custom service
'robot_control',
self.control_callback)

self.get_logger().info('Robot control server initialized')

def control_callback(self, request, response):
"""Handle control requests"""
self.get_logger().info('Received control request')

# Simulate processing time
time.sleep(0.1)

# In a real implementation, this would control actual robot hardware
response.success = True
response.message = 'Command executed successfully'

return response


def main(args=None):
rclpy.init(args=args)

robot_control_server = RobotControlServer()

try:
rclpy.spin(robot_control_server)
except KeyboardInterrupt:
pass
finally:
robot_control_server.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

Service Client

Create robot_control_client.py:

#!/usr/bin/env python3

"""
Service client for robot control commands
"""
import rclpy
from rclpy.node import Node
from example_interfaces.srv import Trigger # Using Trigger as example


class RobotControlClient(Node):
def __init__(self):
super().__init__('robot_control_client')

# Create a client for the robot control service
self.cli = self.create_client(Trigger, 'robot_control')

# Wait for the service to be available
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for robot control service...')

self.req = Trigger.Request()

def send_request(self):
"""Send a request to the service"""
self.future = self.cli.call_async(self.req)
return self.future


def main(args=None):
rclpy.init(args=args)

robot_control_client = RobotControlClient()

# Send a request
future = robot_control_client.send_request()

try:
rclpy.spin_until_future_complete(robot_control_client, future)

if future.result() is not None:
response = future.result()
robot_control_client.get_logger().info(
f'Result: success={response.success}, message={response.message}')
else:
robot_control_client.get_logger().info('Service call failed')

except KeyboardInterrupt:
pass
finally:
robot_control_client.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

Robot Movement Example

Let's create a more practical example for controlling a robot's movement using Twist messages.

Movement Publisher

Create robot_movement.py:

#!/usr/bin/env python3

"""
Robot movement controller using Twist messages
"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String


class RobotMovement(Node):
def __init__(self):
super().__init__('robot_movement')

# Create publisher for velocity commands
self.vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)

# Create publisher for status messages
self.status_publisher = self.create_publisher(String, '/robot_status', 10)

# Timer for sending movement commands
self.timer = self.create_timer(0.1, self.timer_callback)

# Movement parameters
self.linear_speed = 0.5 # m/s
self.angular_speed = 0.5 # rad/s
self.movement_mode = 'forward' # forward, backward, left, right, stop

self.get_logger().info('Robot movement controller initialized')

def timer_callback(self):
"""Send movement commands based on current mode"""
twist_msg = Twist()

if self.movement_mode == 'forward':
twist_msg.linear.x = self.linear_speed
twist_msg.angular.z = 0.0
elif self.movement_mode == 'backward':
twist_msg.linear.x = -self.linear_speed
twist_msg.angular.z = 0.0
elif self.movement_mode == 'left':
twist_msg.linear.x = 0.0
twist_msg.angular.z = self.angular_speed
elif self.movement_mode == 'right':
twist_msg.linear.x = 0.0
twist_msg.angular.z = -self.angular_speed
elif self.movement_mode == 'stop':
twist_msg.linear.x = 0.0
twist_msg.angular.z = 0.0

# Publish the velocity command
self.vel_publisher.publish(twist_msg)

# Publish status
status_msg = String()
status_msg.data = f'Moving {self.movement_mode}'
self.status_publisher.publish(status_msg)


def main(args=None):
rclpy.init(args=args)

robot_movement = RobotMovement()

try:
rclpy.spin(robot_movement)
except KeyboardInterrupt:
pass
finally:
robot_movement.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

C++ Examples

For completeness, here are equivalent examples in C++:

C++ Publisher

Create simple_publisher.cpp:

#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using namespace std::chrono_literals;

class SimplePublisher : public rclcpp::Node
{
public:
SimplePublisher()
: Node("simple_publisher"), count_(0)
{
publisher_ = this->create_publisher<std_msgs::msg::String>("robot_commands", 10);
timer_ = this->create_wall_timer(
500ms, std::bind(&SimplePublisher::timer_callback, this));
}

private:
void timer_callback()
{
auto message = std_msgs::msg::String();
message.data = "Command " + std::to_string(count_++) + ": Move forward";

RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_->publish(message);
}

rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
size_t count_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<SimplePublisher>());
rclcpp::shutdown();
return 0;
}

C++ Subscriber

Create simple_subscriber.cpp:

#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

class SimpleSubscriber : public rclcpp::Node
{
public:
SimpleSubscriber()
: Node("simple_subscriber")
{
subscription_ = this->create_subscription<std_msgs::msg::String>(
"robot_commands", 10,
[this](const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(this->get_logger(), "Received command: '%s'", msg->data.c_str());
process_command(msg->data);
});
}

private:
void process_command(const std::string& command)
{
if (command.find("Move forward") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Executing forward movement...");
} else if (command.find("Turn left") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Executing left turn...");
} else if (command.find("Turn right") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Executing right turn...");
} else if (command.find("Stop") != std::string::npos) {
RCLCPP_INFO(this->get_logger(), "Stopping robot...");
}
}

rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<SimpleSubscriber>());
rclcpp::shutdown();
return 0;
}

Running Examples in Simulation

To test these examples with a simulated robot:

  1. Launch Gazebo with a differential drive robot:
ros2 launch gazebo_ros empty_world.launch.py
  1. Spawn a robot model with differential drive:
ros2 run gazebo_ros spawn_entity.py -entity my_robot -file /path/to/robot/model.sdf -x 0 -y 0 -z 0
  1. Run your movement controller:
ros2 run your_package robot_movement.py

The robot should respond to the velocity commands published by your node.

Exercise: Create Your Own Node

Try creating a node that:

  1. Publishes sensor data (e.g., simulated laser scan)
  2. Subscribes to movement commands
  3. Implements a simple behavior (e.g., obstacle avoidance)

Use the examples above as a starting point and modify them to implement your own robot behavior.