ROS 2 Basic Publisher-Subscriber Exercise
Objective
Create a simple ROS 2 publisher-subscriber system that demonstrates the fundamental communication pattern in ROS 2. The publisher will send messages containing sensor data, and the subscriber will process and display this data.
Prerequisites
- ROS 2 Humble Hawksbill installed
- Basic Python programming knowledge
- Understanding of ROS 2 concepts (nodes, topics, messages)
Learning Goals
By completing this exercise, you will:
- Create ROS 2 publisher and subscriber nodes
- Understand the publish-subscribe communication pattern
- Work with ROS 2 message types
- Use ROS 2 launch files to run multiple nodes
- Implement proper node lifecycle management
Exercise Steps
Step 1: Create a ROS 2 Package
First, create a new ROS 2 package for your exercise:
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python ros2_exercises
cd ros2_exercises
Step 2: Create the Publisher Node
Create a publisher node that simulates sensor data:
- Create the directory structure:
mkdir -p ros2_exercises/sensor_publisher
- Create the publisher script
ros2_exercises/sensor_publisher/publisher_member_function.py:
#!/usr/bin/env python3
"""
Sensor publisher node that simulates sensor readings
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
from random import uniform
class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
# Create publishers for different sensor types
self.temperature_publisher = self.create_publisher(Float32, 'temperature', 10)
self.humidity_publisher = self.create_publisher(Float32, 'humidity', 10)
self.status_publisher = self.create_publisher(String, 'sensor_status', 10)
# Create a timer to publish sensor data every 1 second
timer_period = 1.0 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
self.get_logger().info('Sensor publisher node initialized')
def timer_callback(self):
"""Publish simulated sensor data"""
# Simulate temperature reading (20-30 degrees Celsius)
temp_msg = Float32()
temp_msg.data = uniform(20.0, 30.0)
self.temperature_publisher.publish(temp_msg)
self.get_logger().info(f'Published temperature: {temp_msg.data:.2f}°C')
# Simulate humidity reading (30-70%)
humidity_msg = Float32()
humidity_msg.data = uniform(30.0, 70.0)
self.humidity_publisher.publish(humidity_msg)
self.get_logger().info(f'Published humidity: {humidity_msg.data:.2f}%')
# Publish status message
status_msg = String()
status_msg.data = f'Sensor reading {self.i} published'
self.status_publisher.publish(status_msg)
self.i += 1
def main(args=None):
rclpy.init(args=args)
sensor_publisher = SensorPublisher()
try:
rclpy.spin(sensor_publisher)
except KeyboardInterrupt:
sensor_publisher.get_logger().info('Shutting down sensor publisher...')
finally:
sensor_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 3: Create the Subscriber Node
Create a subscriber node that processes the sensor data:
- Create the directory structure:
mkdir -p ros2_exercises/sensor_subscriber
- Create the subscriber script
ros2_exercises/sensor_subscriber/subscriber_member_function.py:
#!/usr/bin/env python3
"""
Sensor subscriber node that processes sensor readings
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
# Create subscribers for different sensor topics
self.temperature_subscriber = self.create_subscription(
Float32,
'temperature',
self.temperature_callback,
10)
self.humidity_subscriber = self.create_subscription(
Float32,
'humidity',
self.humidity_callback,
10)
self.status_subscriber = self.create_subscription(
String,
'sensor_status',
self.status_callback,
10)
# Store the latest sensor readings
self.latest_temp = None
self.latest_humidity = None
self.get_logger().info('Sensor subscriber node initialized')
def temperature_callback(self, msg):
"""Process temperature readings"""
self.latest_temp = msg.data
self.get_logger().info(f'Received temperature: {msg.data:.2f}°C')
# Check for temperature thresholds
if msg.data > 28.0:
self.get_logger().warn(f'High temperature warning: {msg.data:.2f}°C')
elif msg.data < 22.0:
self.get_logger().warn(f'Low temperature warning: {msg.data:.2f}°C')
def humidity_callback(self, msg):
"""Process humidity readings"""
self.latest_humidity = msg.data
self.get_logger().info(f'Received humidity: {msg.data:.2f}%')
# Check for humidity thresholds
if msg.data > 60.0:
self.get_logger().info(f'High humidity: {msg.data:.2f}%')
elif msg.data < 40.0:
self.get_logger().info(f'Low humidity: {msg.data:.2f}%')
def status_callback(self, msg):
"""Process status messages"""
self.get_logger().info(f'Status: {msg.data}')
# If we have both temperature and humidity readings, calculate heat index
if self.latest_temp is not None and self.latest_humidity is not None:
heat_index = self.calculate_heat_index(self.latest_temp, self.latest_humidity)
self.get_logger().info(f'Calculated heat index: {heat_index:.2f}°C')
def calculate_heat_index(self, temp_celsius, humidity):
"""
Simplified heat index calculation
Note: This is a simplified version for demonstration
"""
# Convert to the formula's required format
temp_f = (temp_celsius * 9/5) + 32
# Simplified heat index formula
heat_index_f = (
-42.379 + 2.04901523 * temp_f +
10.14333127 * humidity -
0.22475541 * temp_f * humidity -
0.00683783 * temp_f * temp_f -
0.05481717 * humidity * humidity +
0.00122874 * temp_f * temp_f * humidity +
0.00085282 * temp_f * humidity * humidity -
0.00000199 * temp_f * temp_f * humidity * humidity
)
# Convert back to Celsius
heat_index_c = (heat_index_f - 32) * 5/9
return heat_index_c
def main(args=None):
rclpy.init(args=args)
sensor_subscriber = SensorSubscriber()
try:
rclpy.spin(sensor_subscriber)
except KeyboardInterrupt:
sensor_subscriber.get_logger().info('Shutting down sensor subscriber...')
finally:
sensor_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 4: Create Launch File
Create a launch file to run both nodes simultaneously:
- Create the launch directory:
mkdir -p ros2_exercises/launch
- Create the launch file
ros2_exercises/launch/sensor_launch.py:
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='ros2_exercises',
executable='sensor_publisher',
name='sensor_publisher',
output='screen'
),
Node(
package='ros2_exercises',
executable='sensor_subscriber',
name='sensor_subscriber',
output='screen'
)
])
Step 5: Update Package Configuration
Update the setup.py file to make your nodes executable:
from setuptools import find_packages, setup
package_name = 'ros2_exercises'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', ['launch/sensor_launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='your_name',
maintainer_email='your_email@example.com',
description='ROS 2 exercises package',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'sensor_publisher = ros2_exercises.sensor_publisher.publisher_member_function:main',
'sensor_subscriber = ros2_exercises.sensor_subscriber.subscriber_member_function:main',
],
},
)
Step 6: Build and Run
- Build your package:
cd ~/ros2_ws
colcon build --packages-select ros2_exercises
source install/setup.bash
- Run the launch file:
ros2 launch ros2_exercises sensor_launch.py
Validation Criteria
Your implementation is successful if:
- The publisher node successfully publishes sensor data to the specified topics
- The subscriber node receives and processes the sensor data
- Temperature and humidity values are displayed in the console
- Status messages are processed correctly
- Heat index is calculated when both temperature and humidity readings are available
- The system runs without errors for at least 10 seconds
Advanced Challenge
Extend the exercise by:
- Adding a third node that acts as a data logger, saving sensor readings to a file
- Implementing a service that allows external nodes to request the latest sensor readings
- Adding a parameter to the publisher node to control the publishing frequency
- Creating a custom message type for the sensor data instead of using standard Float32 messages
Solution Verification
To verify your solution:
- Use
ros2 topic listto confirm the topics exist - Use
ros2 topic echo /temperatureto see the published data - Check that both nodes are running with
ros2 run - Verify that the subscriber processes the data correctly by examining the console output
Troubleshooting
- If nodes don't connect, ensure both are on the same network namespace
- Check that the package is built correctly with
colcon build - Verify that the setup.bash file is sourced in each terminal
- Use
ros2 doctorto diagnose common ROS 2 issues