Skip to main content

ROS 2 Basic Publisher-Subscriber Exercise

Objective

Create a simple ROS 2 publisher-subscriber system that demonstrates the fundamental communication pattern in ROS 2. The publisher will send messages containing sensor data, and the subscriber will process and display this data.

Prerequisites

  • ROS 2 Humble Hawksbill installed
  • Basic Python programming knowledge
  • Understanding of ROS 2 concepts (nodes, topics, messages)

Learning Goals

By completing this exercise, you will:

  • Create ROS 2 publisher and subscriber nodes
  • Understand the publish-subscribe communication pattern
  • Work with ROS 2 message types
  • Use ROS 2 launch files to run multiple nodes
  • Implement proper node lifecycle management

Exercise Steps

Step 1: Create a ROS 2 Package

First, create a new ROS 2 package for your exercise:

mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python ros2_exercises
cd ros2_exercises

Step 2: Create the Publisher Node

Create a publisher node that simulates sensor data:

  1. Create the directory structure:
mkdir -p ros2_exercises/sensor_publisher
  1. Create the publisher script ros2_exercises/sensor_publisher/publisher_member_function.py:
#!/usr/bin/env python3

"""
Sensor publisher node that simulates sensor readings
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
from random import uniform


class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')

# Create publishers for different sensor types
self.temperature_publisher = self.create_publisher(Float32, 'temperature', 10)
self.humidity_publisher = self.create_publisher(Float32, 'humidity', 10)
self.status_publisher = self.create_publisher(String, 'sensor_status', 10)

# Create a timer to publish sensor data every 1 second
timer_period = 1.0 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)

self.i = 0
self.get_logger().info('Sensor publisher node initialized')

def timer_callback(self):
"""Publish simulated sensor data"""
# Simulate temperature reading (20-30 degrees Celsius)
temp_msg = Float32()
temp_msg.data = uniform(20.0, 30.0)
self.temperature_publisher.publish(temp_msg)
self.get_logger().info(f'Published temperature: {temp_msg.data:.2f}°C')

# Simulate humidity reading (30-70%)
humidity_msg = Float32()
humidity_msg.data = uniform(30.0, 70.0)
self.humidity_publisher.publish(humidity_msg)
self.get_logger().info(f'Published humidity: {humidity_msg.data:.2f}%')

# Publish status message
status_msg = String()
status_msg.data = f'Sensor reading {self.i} published'
self.status_publisher.publish(status_msg)

self.i += 1


def main(args=None):
rclpy.init(args=args)

sensor_publisher = SensorPublisher()

try:
rclpy.spin(sensor_publisher)
except KeyboardInterrupt:
sensor_publisher.get_logger().info('Shutting down sensor publisher...')
finally:
sensor_publisher.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

Step 3: Create the Subscriber Node

Create a subscriber node that processes the sensor data:

  1. Create the directory structure:
mkdir -p ros2_exercises/sensor_subscriber
  1. Create the subscriber script ros2_exercises/sensor_subscriber/subscriber_member_function.py:
#!/usr/bin/env python3

"""
Sensor subscriber node that processes sensor readings
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String


class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')

# Create subscribers for different sensor topics
self.temperature_subscriber = self.create_subscription(
Float32,
'temperature',
self.temperature_callback,
10)

self.humidity_subscriber = self.create_subscription(
Float32,
'humidity',
self.humidity_callback,
10)

self.status_subscriber = self.create_subscription(
String,
'sensor_status',
self.status_callback,
10)

# Store the latest sensor readings
self.latest_temp = None
self.latest_humidity = None

self.get_logger().info('Sensor subscriber node initialized')

def temperature_callback(self, msg):
"""Process temperature readings"""
self.latest_temp = msg.data
self.get_logger().info(f'Received temperature: {msg.data:.2f}°C')

# Check for temperature thresholds
if msg.data > 28.0:
self.get_logger().warn(f'High temperature warning: {msg.data:.2f}°C')
elif msg.data < 22.0:
self.get_logger().warn(f'Low temperature warning: {msg.data:.2f}°C')

def humidity_callback(self, msg):
"""Process humidity readings"""
self.latest_humidity = msg.data
self.get_logger().info(f'Received humidity: {msg.data:.2f}%')

# Check for humidity thresholds
if msg.data > 60.0:
self.get_logger().info(f'High humidity: {msg.data:.2f}%')
elif msg.data < 40.0:
self.get_logger().info(f'Low humidity: {msg.data:.2f}%')

def status_callback(self, msg):
"""Process status messages"""
self.get_logger().info(f'Status: {msg.data}')

# If we have both temperature and humidity readings, calculate heat index
if self.latest_temp is not None and self.latest_humidity is not None:
heat_index = self.calculate_heat_index(self.latest_temp, self.latest_humidity)
self.get_logger().info(f'Calculated heat index: {heat_index:.2f}°C')


def calculate_heat_index(self, temp_celsius, humidity):
"""
Simplified heat index calculation
Note: This is a simplified version for demonstration
"""
# Convert to the formula's required format
temp_f = (temp_celsius * 9/5) + 32

# Simplified heat index formula
heat_index_f = (
-42.379 + 2.04901523 * temp_f +
10.14333127 * humidity -
0.22475541 * temp_f * humidity -
0.00683783 * temp_f * temp_f -
0.05481717 * humidity * humidity +
0.00122874 * temp_f * temp_f * humidity +
0.00085282 * temp_f * humidity * humidity -
0.00000199 * temp_f * temp_f * humidity * humidity
)

# Convert back to Celsius
heat_index_c = (heat_index_f - 32) * 5/9
return heat_index_c


def main(args=None):
rclpy.init(args=args)

sensor_subscriber = SensorSubscriber()

try:
rclpy.spin(sensor_subscriber)
except KeyboardInterrupt:
sensor_subscriber.get_logger().info('Shutting down sensor subscriber...')
finally:
sensor_subscriber.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

Step 4: Create Launch File

Create a launch file to run both nodes simultaneously:

  1. Create the launch directory:
mkdir -p ros2_exercises/launch
  1. Create the launch file ros2_exercises/launch/sensor_launch.py:
from launch import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description():
return LaunchDescription([
Node(
package='ros2_exercises',
executable='sensor_publisher',
name='sensor_publisher',
output='screen'
),
Node(
package='ros2_exercises',
executable='sensor_subscriber',
name='sensor_subscriber',
output='screen'
)
])

Step 5: Update Package Configuration

Update the setup.py file to make your nodes executable:

from setuptools import find_packages, setup

package_name = 'ros2_exercises'

setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', ['launch/sensor_launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='your_name',
maintainer_email='your_email@example.com',
description='ROS 2 exercises package',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'sensor_publisher = ros2_exercises.sensor_publisher.publisher_member_function:main',
'sensor_subscriber = ros2_exercises.sensor_subscriber.subscriber_member_function:main',
],
},
)

Step 6: Build and Run

  1. Build your package:
cd ~/ros2_ws
colcon build --packages-select ros2_exercises
source install/setup.bash
  1. Run the launch file:
ros2 launch ros2_exercises sensor_launch.py

Validation Criteria

Your implementation is successful if:

  • The publisher node successfully publishes sensor data to the specified topics
  • The subscriber node receives and processes the sensor data
  • Temperature and humidity values are displayed in the console
  • Status messages are processed correctly
  • Heat index is calculated when both temperature and humidity readings are available
  • The system runs without errors for at least 10 seconds

Advanced Challenge

Extend the exercise by:

  1. Adding a third node that acts as a data logger, saving sensor readings to a file
  2. Implementing a service that allows external nodes to request the latest sensor readings
  3. Adding a parameter to the publisher node to control the publishing frequency
  4. Creating a custom message type for the sensor data instead of using standard Float32 messages

Solution Verification

To verify your solution:

  1. Use ros2 topic list to confirm the topics exist
  2. Use ros2 topic echo /temperature to see the published data
  3. Check that both nodes are running with ros2 run
  4. Verify that the subscriber processes the data correctly by examining the console output

Troubleshooting

  • If nodes don't connect, ensure both are on the same network namespace
  • Check that the package is built correctly with colcon build
  • Verify that the setup.bash file is sourced in each terminal
  • Use ros2 doctor to diagnose common ROS 2 issues