Cognitive Planning: Using LLMs to Translate Natural Language into Robot Actions
Introduction to Cognitive Planning
Cognitive planning represents the bridge between high-level natural language commands and low-level robotic actions. It involves using Large Language Models (LLMs) to understand complex human instructions and decompose them into executable action sequences that robots can perform. This section explores how to implement cognitive planning systems that can translate natural language commands like "Clean the room" into detailed sequences of ROS 2 actions.
Understanding the Cognitive Planning Pipeline
The cognitive planning pipeline involves several key components:
- Natural Language Understanding: Interpreting the user's command
- Task Decomposition: Breaking down complex tasks into subtasks
- Action Mapping: Converting subtasks into specific robot actions
- Execution Planning: Sequencing actions and handling dependencies
- Feedback Integration: Adapting plans based on execution results
Implementing Cognitive Planning with LLMs
Setting up the LLM Interface
First, let's create a cognitive planning system that interfaces with an LLM:
import openai
import json
import re
from typing import List, Dict, Any, Optional
import rospy
from geometry_msgs.msg import Pose, Point
from std_msgs.msg import String
from actionlib_msgs.msg import GoalStatus
class CognitivePlanner:
def __init__(self, api_key: str = None):
# Initialize LLM client (using OpenAI as an example)
if api_key:
openai.api_key = api_key
else:
# For demonstration, we'll use a mock implementation
print("No API key provided, using mock implementation")
# Robot capabilities and environment information
self.robot_capabilities = [
"move_to_location",
"pick_up_object",
"place_object",
"detect_object",
"navigate",
"grasp",
"release"
]
self.environment_objects = [
"cup", "bottle", "book", "chair", "table",
"sofa", "lamp", "trash_bin", "dustpan"
]
self.environment_locations = [
"kitchen", "bedroom", "living_room", "bathroom",
"office", "hallway", "dining_room"
]
def plan_from_command(self, command: str, context: Dict = None) -> List[Dict]:
"""
Generate an action plan from a natural language command
"""
if context is None:
context = {
"robot_position": {"x": 0, "y": 0, "z": 0},
"available_objects": self.environment_objects,
"available_locations": self.environment_locations,
"robot_capabilities": self.robot_capabilities
}
# Create a prompt for the LLM
prompt = self.create_planning_prompt(command, context)
# For demonstration, we'll use a mock implementation
# In practice, you would call the LLM API here
plan = self.mock_plan_generation(command)
return plan
def create_planning_prompt(self, command: str, context: Dict) -> str:
"""
Create a prompt for the LLM to generate an action plan
"""
prompt = f"""
You are a cognitive planning assistant for a robot. Your task is to convert a natural language command into a sequence of specific actions that the robot can execute.
Robot capabilities: {', '.join(self.robot_capabilities)}
Available objects: {', '.join(self.environment_objects)}
Available locations: {', '.join(self.environment_locations)}
Current context:
- Robot position: {context['robot_position']}
- Available objects: {context['available_objects']}
- Available locations: {context['available_locations']}
Command: "{command}"
Generate a detailed action plan as a JSON array. Each action should be a dictionary with:
- "action": the specific action to perform
- "parameters": any required parameters for the action
- "description": a brief description of what the action does
Example format:
[
{{
"action": "navigate_to",
"parameters": {{"location": "kitchen"}},
"description": "Move robot to the kitchen"
}},
{{
"action": "detect_object",
"parameters": {{"object_type": "cup"}},
"description": "Look for a cup in the current location"
}}
]
The action plan should be detailed enough for the robot to execute successfully.
"""
return prompt
def mock_plan_generation(self, command: str) -> List[Dict]:
"""
Mock implementation of plan generation for demonstration
"""
command_lower = command.lower()
if "clean the room" in command_lower:
return [
{
"action": "detect_objects",
"parameters": {"location": "current"},
"description": "Scan the current room for objects that need cleaning"
},
{
"action": "identify_debris",
"parameters": {"object_types": ["trash", "dust", "clutter"]},
"description": "Identify objects that are considered debris"
},
{
"action": "navigate_to",
"parameters": {"location": "kitchen"},
"description": "Move to kitchen to get cleaning supplies"
},
{
"action": "pick_up_object",
"parameters": {"object_type": "dustpan"},
"description": "Pick up the dustpan"
},
{
"action": "navigate_to",
"parameters": {"location": "living_room"},
"description": "Return to the living room"
},
{
"action": "clean_area",
"parameters": {"area": "living_room"},
"description": "Clean the living room using the dustpan"
},
{
"action": "navigate_to",
"parameters": {"location": "trash_bin"},
"description": "Go to trash bin to dispose of debris"
},
{
"action": "place_object",
"parameters": {"object_type": "debris"},
"description": "Place collected debris in the trash bin"
}
]
elif "go to kitchen" in command_lower:
return [
{
"action": "navigate_to",
"parameters": {"location": "kitchen"},
"description": "Navigate to the kitchen"
}
]
elif "pick up cup" in command_lower:
return [
{
"action": "detect_object",
"parameters": {"object_type": "cup"},
"description": "Look for a cup in the current location"
},
{
"action": "approach_object",
"parameters": {"object_type": "cup"},
"description": "Move close to the cup"
},
{
"action": "pick_up_object",
"parameters": {"object_type": "cup"},
"description": "Grasp and pick up the cup"
}
]
else:
return [
{
"action": "unknown_command",
"parameters": {"command": command},
"description": f"Command '{command}' is not recognized"
}
]
def execute_plan(self, plan: List[Dict]) -> bool:
"""
Execute the generated action plan
"""
print(f"Executing plan with {len(plan)} actions:")
for i, action in enumerate(plan):
print(f"Step {i+1}: {action['description']}")
success = self.execute_single_action(action)
if not success:
print(f"Action failed: {action}")
return False
print("Plan executed successfully!")
return True
def execute_single_action(self, action: Dict) -> bool:
"""
Execute a single action in the plan
"""
action_type = action["action"]
# This is where you would integrate with ROS 2 services/clients
print(f"Executing: {action_type} with parameters {action['parameters']}")
# Simulate action execution
if action_type == "navigate_to":
return self.navigate_to(action["parameters"]["location"])
elif action_type == "pick_up_object":
return self.pick_up_object(action["parameters"]["object_type"])
elif action_type == "place_object":
return self.place_object(action["parameters"]["object_type"])
elif action_type == "detect_object":
return self.detect_object(action["parameters"]["object_type"])
elif action_type == "detect_objects":
return self.detect_objects(action["parameters"]["location"])
elif action_type == "identify_debris":
return self.identify_debris(action["parameters"]["object_types"])
elif action_type == "clean_area":
return self.clean_area(action["parameters"]["area"])
elif action_type == "approach_object":
return self.approach_object(action["parameters"]["object_type"])
else:
print(f"Unknown action type: {action_type}")
return False
def navigate_to(self, location: str) -> bool:
"""Navigate to a specific location"""
print(f"Navigating to {location}")
# In ROS 2, this would call navigation services
return True
def pick_up_object(self, object_type: str) -> bool:
"""Pick up an object of specified type"""
print(f"Attempting to pick up {object_type}")
# In ROS 2, this would call manipulation services
return True
def place_object(self, object_type: str) -> bool:
"""Place an object"""
print(f"Placing {object_type}")
# In ROS 2, this would call manipulation services
return True
def detect_object(self, object_type: str) -> bool:
"""Detect an object of specified type"""
print(f"Detecting {object_type}")
# In ROS 2, this would call perception services
return True
def detect_objects(self, location: str) -> bool:
"""Detect all objects in a location"""
print(f"Detecting all objects in {location}")
return True
def identify_debris(self, object_types: List[str]) -> bool:
"""Identify debris objects"""
print(f"Identifying debris: {object_types}")
return True
def clean_area(self, area: str) -> bool:
"""Clean a specific area"""
print(f"Cleaning {area}")
return True
def approach_object(self, object_type: str) -> bool:
"""Approach an object"""
print(f"Approaching {object_type}")
return True
Advanced Cognitive Planning with State Tracking
For more sophisticated cognitive planning, we need to maintain and update the robot's state:
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Any
@dataclass
class RobotState:
"""Represents the current state of the robot"""
position: Dict[str, float] = field(default_factory=lambda: {"x": 0, "y": 0, "z": 0})
orientation: Dict[str, float] = field(default_factory=lambda: {"x": 0, "y": 0, "z": 0, "w": 1})
holding_object: Optional[str] = None
battery_level: float = 100.0
current_task: Optional[str] = None
detected_objects: List[Dict] = field(default_factory=list)
visited_locations: List[str] = field(default_factory=list)
class AdvancedCognitivePlanner(CognitivePlanner):
def __init__(self, api_key: str = None):
super().__init__(api_key)
self.robot_state = RobotState()
self.action_history = []
def update_robot_state(self, action: Dict, result: bool):
"""Update the robot's state based on action execution"""
action_type = action["action"]
if action_type == "navigate_to":
# Update position based on navigation
location = action["parameters"]["location"]
self.robot_state.visited_locations.append(location)
# In a real system, this would update based on actual navigation results
elif action_type == "pick_up_object":
obj_type = action["parameters"]["object_type"]
self.robot_state.holding_object = obj_type
elif action_type == "place_object":
self.robot_state.holding_object = None
elif action_type == "detect_object" or action_type == "detect_objects":
# Add detected objects to state
pass
def plan_with_state_awareness(self, command: str) -> List[Dict]:
"""Generate a plan considering the current robot state"""
context = {
"robot_position": self.robot_state.position,
"holding_object": self.robot_state.holding_object,
"battery_level": self.robot_state.battery_level,
"visited_locations": self.robot_state.visited_locations,
"detected_objects": self.robot_state.detected_objects,
"robot_capabilities": self.robot_capabilities,
"available_objects": self.environment_objects,
"available_locations": self.environment_locations
}
return self.plan_from_command(command, context)
def execute_plan_with_monitoring(self, plan: List[Dict]) -> bool:
"""Execute plan with state monitoring and error recovery"""
print(f"Executing plan with {len(plan)} actions (state-aware):")
for i, action in enumerate(plan):
print(f"Step {i+1}/{len(plan)}: {action['description']}")
# Check if action is still valid given current state
if not self.is_action_valid(action):
print(f"Action no longer valid: {action}")
# Replan or adjust
return False
success = self.execute_single_action(action)
# Update state based on result
self.update_robot_state(action, success)
self.action_history.append((action, success))
if not success:
print(f"Action failed: {action}")
# Attempt recovery or return failure
return self.handle_action_failure(action, plan[i+1:])
print("Plan executed successfully!")
return True
def is_action_valid(self, action: Dict) -> bool:
"""Check if an action is still valid given current state"""
action_type = action["action"]
if action_type == "pick_up_object" and self.robot_state.holding_object:
# Can't pick up if already holding something
return False
if action_type == "place_object" and not self.robot_state.holding_object:
# Can't place if not holding anything
return False
return True
def handle_action_failure(self, failed_action: Dict, remaining_plan: List[Dict]) -> bool:
"""Handle when an action fails during execution"""
print(f"Handling failure for action: {failed_action}")
# Implement recovery strategies
# This could involve replanning, skipping the action, or trying alternatives
return False # For now, just return failure
Integration with ROS 2 Actions and Services
For real-world integration, we need to connect our cognitive planner with ROS 2:
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.qos import QoSProfile
# Import ROS 2 action and message types
from geometry_msgs.msg import PoseStamped
from move_base_msgs.action import MoveBase
from manipulation_msgs.srv import PickUp, PlaceObject
from perception_msgs.srv import DetectObjects
class ROS2CognitivePlanner(Node):
def __init__(self):
super().__init__('cognitive_planner')
# Initialize LLM client
self.planner = CognitivePlanner()
# Action clients
self.nav_client = ActionClient(self, MoveBase, 'move_base')
# Service clients
self.pickup_client = self.create_client(PickUp, 'pick_up_object')
self.place_client = self.create_client(PlaceObject, 'place_object')
self.detect_client = self.create_client(DetectObjects, 'detect_objects')
# Publishers
self.status_pub = self.create_publisher(String, 'cognitive_planner_status', 10)
# Subscribers
self.command_sub = self.create_subscription(
String, 'voice_command', self.command_callback, 10)
self.get_logger().info("Cognitive Planner node initialized")
def command_callback(self, msg):
"""Handle incoming voice commands"""
command = msg.data
self.get_logger().info(f"Received command: {command}")
# Generate plan
plan = self.planner.plan_from_command(command)
# Execute plan
success = self.execute_plan_ros2(plan)
# Publish status
status_msg = String()
status_msg.data = f"Plan execution {'succeeded' if success else 'failed'} for command: {command}"
self.status_pub.publish(status_msg)
def execute_plan_ros2(self, plan: List[Dict]) -> bool:
"""Execute plan using ROS 2 actions and services"""
for action in plan:
success = self.execute_action_ros2(action)
if not success:
return False
return True
def execute_action_ros2(self, action: Dict) -> bool:
"""Execute a single action using ROS 2 interfaces"""
action_type = action["action"]
if action_type == "navigate_to":
return self.navigate_to_ros2(action["parameters"]["location"])
elif action_type == "pick_up_object":
return self.pick_up_object_ros2(action["parameters"]["object_type"])
elif action_type == "place_object":
return self.place_object_ros2(action["parameters"]["object_type"])
elif action_type == "detect_object":
return self.detect_object_ros2(action["parameters"]["object_type"])
else:
self.get_logger().error(f"Unknown action type: {action_type}")
return False
def navigate_to_ros2(self, location: str) -> bool:
"""Navigate to location using ROS 2 navigation stack"""
# Wait for action server
self.nav_client.wait_for_server()
# Create goal
goal = MoveBase.Goal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = self.get_clock().now().to_msg()
# Set target pose based on location
# This would typically involve looking up location coordinates
# For now, we'll use a simple mapping
location_poses = {
"kitchen": PoseStamped(pose=Pose(position=Point(x=1.0, y=0.0, z=0.0))),
"bedroom": PoseStamped(pose=Pose(position=Point(x=2.0, y=1.0, z=0.0))),
"living_room": PoseStamped(pose=Pose(position=Point(x=0.0, y=2.0, z=0.0)))
}
if location in location_poses:
goal.target_pose = location_poses[location]
else:
self.get_logger().error(f"Unknown location: {location}")
return False
# Send goal
future = self.nav_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self, future)
return future.result().status == GoalStatus.SUCCEEDED
def pick_up_object_ros2(self, object_type: str) -> bool:
"""Pick up object using ROS 2 manipulation services"""
if not self.pickup_client.wait_for_service(timeout_sec=1.0):
self.get_logger().error("Pick up service not available")
return False
request = PickUp.Request()
request.object_type = object_type
future = self.pickup_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
return future.result().success
def place_object_ros2(self, object_type: str) -> bool:
"""Place object using ROS 2 manipulation services"""
if not self.place_client.wait_for_service(timeout_sec=1.0):
self.get_logger().error("Place service not available")
return False
request = PlaceObject.Request()
request.object_type = object_type
future = self.place_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
return future.result().success
def detect_object_ros2(self, object_type: str) -> bool:
"""Detect object using ROS 2 perception services"""
if not self.detect_client.wait_for_service(timeout_sec=1.0):
self.get_logger().error("Detect service not available")
return False
request = DetectObjects.Request()
request.object_type = object_type
future = self.detect_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
return len(future.result().objects) > 0
def main(args=None):
rclpy.init(args=args)
cognitive_planner = ROS2CognitivePlanner()
try:
rclpy.spin(cognitive_planner)
except KeyboardInterrupt:
pass
finally:
cognitive_planner.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Prompt Engineering for Cognitive Planning
Effective cognitive planning requires well-crafted prompts for the LLM:
class PromptEngineer:
@staticmethod
def create_task_decomposition_prompt(command: str, context: Dict) -> str:
"""Create a prompt for task decomposition"""
return f"""
Decompose the following command into a sequence of executable actions: "{command}"
Context:
- Robot capabilities: {context['capabilities']}
- Current state: {context['state']}
- Environment: {context['environment']}
Provide a detailed breakdown of subtasks that must be completed to fulfill the command.
Each subtask should be achievable with the robot's capabilities.
Return the result as a JSON array of subtasks.
"""
@staticmethod
def create_action_mapping_prompt(subtasks: List[str], robot_capabilities: List[str]) -> str:
"""Create a prompt for mapping subtasks to robot actions"""
return f"""
Map the following subtasks to specific robot actions:
Subtasks: {subtasks}
Robot capabilities: {robot_capabilities}
For each subtask, identify the sequence of robot actions needed to complete it.
Consider action dependencies and required parameters.
Return the result as a JSON array of action objects.
"""
@staticmethod
def create_safety_check_prompt(plan: List[Dict], safety_constraints: List[str]) -> str:
"""Create a prompt for safety validation"""
return f"""
Validate the following action plan against safety constraints:
Plan: {plan}
Safety constraints: {safety_constraints}
Identify any potential safety issues and suggest modifications.
Return a safe version of the plan or indicate if it's already safe.
"""
Handling Complex Natural Language
For handling more complex natural language commands, we can implement semantic parsing:
import spacy
from typing import NamedTuple
class ParsedCommand(NamedTuple):
intent: str
entities: Dict[str, str]
modifiers: List[str]
class SemanticParser:
def __init__(self):
# Load spaCy model for NLP processing
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
print("spaCy English model not found. Install with: python -m spacy download en_core_web_sm")
self.nlp = None
def parse_command(self, command: str) -> ParsedCommand:
"""Parse a natural language command into structured components"""
if not self.nlp:
# Fallback to simple parsing
return self.simple_parse(command)
doc = self.nlp(command)
# Extract intent (usually the main verb)
intent = self.extract_intent(doc)
# Extract entities (objects, locations, etc.)
entities = self.extract_entities(doc)
# Extract modifiers (adverbs, adjectives)
modifiers = self.extract_modifiers(doc)
return ParsedCommand(intent=intent, entities=entities, modifiers=modifiers)
def extract_intent(self, doc) -> str:
"""Extract the main intent from the command"""
# Find main verb (not auxiliary)
for token in doc:
if token.pos_ == "VERB" and token.dep_ != "aux":
return token.lemma_
return "unknown"
def extract_entities(self, doc) -> Dict[str, str]:
"""Extract named entities from the command"""
entities = {}
# Extract noun phrases as potential objects
for chunk in doc.noun_chunks:
if chunk.root.pos_ in ["NOUN", "PROPN"]:
entities[chunk.root.text] = "object"
# Extract named entities
for ent in doc.ents:
if ent.label_ in ["GPE", "LOC", "FAC"]: # Geopolitical, location, facility
entities[ent.text] = "location"
elif ent.label_ == "OBJECT":
entities[ent.text] = "object"
return entities
def extract_modifiers(self, doc) -> List[str]:
"""Extract modifiers from the command"""
modifiers = []
for token in doc:
if token.pos_ in ["ADV", "ADJ"]:
modifiers.append(token.text)
return modifiers
def simple_parse(self, command: str) -> ParsedCommand:
"""Simple fallback parsing"""
command_lower = command.lower()
# Simple intent detection
if any(word in command_lower for word in ["go", "move", "navigate", "walk"]):
intent = "navigate"
elif any(word in command_lower for word in ["pick", "grasp", "take", "get"]):
intent = "pick_up"
elif any(word in command_lower for word in ["place", "put", "drop", "set"]):
intent = "place"
elif any(word in command_lower for word in ["clean", "tidy", "organize"]):
intent = "clean"
else:
intent = "unknown"
# Simple entity extraction
entities = {}
for obj in ["cup", "bottle", "book", "chair", "table"]:
if obj in command_lower:
entities[obj] = "object"
for loc in ["kitchen", "bedroom", "living room", "bathroom"]:
if loc in command_lower:
entities[loc] = "location"
return ParsedCommand(intent=intent, entities=entities, modifiers=[])
Error Handling and Plan Refinement
Robust cognitive planning systems must handle errors and refine plans:
class PlanRefiner:
def __init__(self):
self.known_failures = {}
self.recovery_strategies = {
"navigation_failure": ["try_alternative_path", "request_assistance", "return_to_known_location"],
"object_detection_failure": ["change_viewpoint", "adjust_sensors", "search_area"],
"manipulation_failure": ["reposition_robot", "adjust_grasp", "request_human_help"]
}
def refine_plan(self, original_plan: List[Dict], failure_point: int, failure_reason: str) -> List[Dict]:
"""Refine a plan after a failure occurs"""
# Split plan at failure point
successful_part = original_plan[:failure_point]
failed_action = original_plan[failure_point]
remaining_part = original_plan[failure_point + 1:]
# Determine recovery strategy
recovery_actions = self.get_recovery_actions(failed_action, failure_reason)
# Combine parts with recovery
refined_plan = successful_part + recovery_actions + remaining_part
return refined_plan
def get_recovery_actions(self, failed_action: Dict, failure_reason: str) -> List[Dict]:
"""Get recovery actions based on failure type"""
action_type = failed_action["action"]
if "navigation" in action_type or "move" in action_type:
return self.handle_navigation_failure(failed_action, failure_reason)
elif "detect" in action_type or "find" in action_type:
return self.handle_detection_failure(failed_action, failure_reason)
elif "pick" in action_type or "grasp" in action_type:
return self.handle_manipulation_failure(failed_action, failure_reason)
else:
return self.handle_generic_failure(failed_action, failure_reason)
def handle_navigation_failure(self, failed_action: Dict, failure_reason: str) -> List[Dict]:
"""Handle navigation-related failures"""
return [
{
"action": "report_error",
"parameters": {"error": failure_reason, "location": failed_action["parameters"]},
"description": "Report navigation error"
},
{
"action": "request_assistance",
"parameters": {},
"description": "Request human assistance for navigation"
}
]
def handle_detection_failure(self, failed_action: Dict, failure_reason: str) -> List[Dict]:
"""Handle object detection failures"""
return [
{
"action": "change_viewpoint",
"parameters": {"offset": {"x": 0.5, "y": 0.0, "z": 0.0}},
"description": "Change robot's viewpoint to improve detection"
},
{
"action": "retry_detection",
"parameters": failed_action["parameters"],
"description": "Retry the detection action"
}
]
def handle_manipulation_failure(self, failed_action: Dict, failure_reason: str) -> List[Dict]:
"""Handle manipulation-related failures"""
return [
{
"action": "reposition_robot",
"parameters": {"distance": 0.3},
"description": "Reposition robot for better manipulation access"
},
{
"action": "retry_manipulation",
"parameters": failed_action["parameters"],
"description": "Retry the manipulation action"
}
]
def handle_generic_failure(self, failed_action: Dict, failure_reason: str) -> List[Dict]:
"""Handle generic failures"""
return [
{
"action": "abort_task",
"parameters": {"reason": failure_reason},
"description": "Abort the current task due to failure"
}
]
Testing Cognitive Planning Systems
Create comprehensive tests for your cognitive planning system:
import unittest
from unittest.mock import Mock, patch
class TestCognitivePlanner(unittest.TestCase):
def setUp(self):
self.planner = CognitivePlanner()
def test_clean_room_command(self):
"""Test that 'clean the room' command generates appropriate plan"""
plan = self.planner.plan_from_command("Clean the room")
# Check that plan contains expected actions
action_types = [action["action"] for action in plan]
self.assertIn("detect_objects", action_types)
self.assertIn("navigate_to", action_types)
self.assertIn("pick_up_object", action_types)
self.assertIn("place_object", action_types)
# Check that plan has reasonable length
self.assertGreater(len(plan), 3)
def test_simple_navigation(self):
"""Test simple navigation command"""
plan = self.planner.plan_from_command("Go to kitchen")
self.assertEqual(len(plan), 1)
self.assertEqual(plan[0]["action"], "navigate_to")
self.assertEqual(plan[0]["parameters"]["location"], "kitchen")
def test_object_manipulation(self):
"""Test object manipulation command"""
plan = self.planner.plan_from_command("Pick up the cup")
action_types = [action["action"] for action in plan]
self.assertIn("detect_object", action_types)
self.assertIn("pick_up_object", action_types)
def test_unknown_command(self):
"""Test handling of unknown commands"""
plan = self.planner.plan_from_command("Do something weird")
# Should return a plan indicating unknown command
self.assertEqual(len(plan), 1)
self.assertEqual(plan[0]["action"], "unknown_command")
class TestAdvancedCognitivePlanner(unittest.TestCase):
def setUp(self):
self.planner = AdvancedCognitivePlanner()
def test_state_aware_planning(self):
"""Test that planner considers current state"""
# Set robot to be holding an object
self.planner.robot_state.holding_object = "cup"
# Plan to pick up another object should fail state validation
plan = self.planner.plan_with_state_awareness("Pick up the bottle")
# Execute should detect the state conflict
with patch.object(self.planner, 'execute_single_action', return_value=False):
success = self.planner.execute_plan_with_monitoring(plan)
self.assertFalse(success)
def test_action_validity(self):
"""Test action validity checking"""
# When holding an object, can't pick up another
self.planner.robot_state.holding_object = "cup"
action = {"action": "pick_up_object", "parameters": {"object_type": "bottle"}}
self.assertFalse(self.planner.is_action_valid(action))
# When not holding an object, can pick up
self.planner.robot_state.holding_object = None
self.assertTrue(self.planner.is_action_valid(action))
if __name__ == '__main__':
unittest.main()
Best Practices for Cognitive Planning
-
Hierarchical Planning: Break complex tasks into hierarchical subtasks for better management.
-
Context Awareness: Always consider the robot's current state, capabilities, and environment.
-
Error Recovery: Implement robust error handling and recovery mechanisms.
-
Plan Validation: Validate plans before execution to ensure feasibility.
-
User Feedback: Provide clear feedback about plan execution status to users.
-
Safety First: Always prioritize safety in plan generation and execution.
-
Learning and Adaptation: Implement mechanisms for the system to learn from experience.
Next Steps
Continue to the next section to learn about the capstone project: The Autonomous Humanoid, where we'll integrate all the concepts learned in this module to create a complete system that can receive voice commands, plan actions, navigate, and manipulate objects.