Skip to main content

ROS 2 Core Concepts

Nodes

A node is a process that performs computation in ROS 2. Nodes are the fundamental building blocks of a ROS 2 system, and they communicate with other nodes through various mechanisms.

Creating a Node

import rclpy
from rclpy.node import Node

class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0

def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1

Node Lifecycle

ROS 2 nodes follow a specific lifecycle with states:

  • Unconfigured: Initial state after creation
  • Inactive: Configuration applied but not yet activated
  • Active: Node is running and performing its function
  • Finalized: Node has been shut down and resources released

Topics and Messages

Topics enable asynchronous communication between nodes using a publish-subscribe pattern. Multiple publishers can send messages to the same topic, and multiple subscribers can receive messages from the same topic.

Message Types

ROS 2 provides standard message types and allows custom message definitions:

# Standard message types
from std_msgs.msg import String, Int32, Float64
from geometry_msgs.msg import Twist, Pose, Point
from sensor_msgs.msg import Image, LaserScan

Publisher Example

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'chatter', 10)
timer_period = 0.5
self.timer = self.create_timer(timer_period, self.timer_callback)

def timer_callback(self):
msg = String()
msg.data = 'Hello from ROS 2 talker'
self.publisher.publish(msg)

Subscriber Example

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class Listener(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String,
'chatter',
self.listener_callback,
10)
self.subscription # prevent unused variable warning

def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)

Services

Services provide synchronous request-response communication between nodes. A service has a specific request message type and a response message type.

Service Definition

Services are defined using .srv files:

# Request
string name
int32 age
---
# Response
bool success
string message

Service Server

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node

class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response

Service Client

from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node

class MinimalClient(Node):
def __init__(self):
super().__init__('minimal_client')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.req = AddTwoInts.Request()

def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()

Actions

Actions are used for long-running tasks that may take significant time to complete. They provide feedback during execution and can be canceled.

Action Definition

Actions are defined using .action files:

# Goal
int32 order
---
# Result
int32[] sequence
---
# Feedback
int32[] partial_sequence

Action Server

from rclpy.action import ActionServer
from rclpy.node import Node
from example_interfaces.action import Fibonacci

class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)

def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')

feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]

for i in range(1, goal_handle.request.order):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
self.get_logger().info('Goal canceled')
return Fibonacci.Result()

feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])

goal_handle.publish_feedback(feedback_msg)
time.sleep(1)

goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result

Parameter System

ROS 2 provides a parameter system for runtime configuration of nodes:

class ParameterNode(Node):
def __init__(self):
super().__init__('parameter_node')

# Declare parameters with default values
self.declare_parameter('param_name', 'default_value')
self.declare_parameter('int_param', 42)

# Get parameter value
param_value = self.get_parameter('param_name').value

Quality of Service (QoS)

QoS settings allow fine-tuning of communication behavior:

from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy

# Create a QoS profile
qos_profile = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.VOLATILE
)

publisher = self.create_publisher(String, 'topic', qos_profile)

Best Practices

  1. Node Design: Keep nodes focused on a single responsibility
  2. Topic Naming: Use descriptive, hierarchical topic names
  3. Message Design: Keep messages lightweight and well-structured
  4. Error Handling: Implement proper error handling and logging
  5. Resource Management: Clean up resources properly in node destruction
  6. Testing: Write unit tests for all components