Voice-to-Action Systems: Using OpenAI Whisper for Voice Commands
Introduction to Voice-to-Action Systems
Voice-to-action systems represent a critical component of Vision-Language-Action (VLA) models, enabling robots to understand and respond to natural spoken commands. These systems bridge the gap between human language and robotic action execution, making robots more accessible and intuitive to interact with. In this section, we'll explore how to implement voice-to-action systems using OpenAI Whisper for speech recognition and integration with robotic platforms.
Overview of OpenAI Whisper
OpenAI Whisper is a state-of-the-art speech recognition model that can transcribe speech to text with high accuracy across multiple languages. It's particularly well-suited for robotics applications due to its robustness to background noise and ability to handle various accents and speaking styles.
Key Features of Whisper for Robotics:
- Multilingual support for global applications
- Robust performance in noisy environments
- Real-time and batch processing capabilities
- Open-source model variants for different performance needs
Implementing Voice Recognition with Whisper
Installing Whisper
First, install the OpenAI Whisper library and its dependencies:
pip install openai-whisper
Basic Whisper Implementation
Here's a basic implementation of Whisper for voice command recognition:
import whisper
import torch
import pyaudio
import wave
import numpy as np
import rospy
from std_msgs.msg import String
from geometry_msgs.msg import Twist
class VoiceToActionSystem:
def __init__(self):
# Load Whisper model (use 'base' for faster processing or 'large' for accuracy)
self.model = whisper.load_model("base")
# Audio recording parameters
self.chunk = 1024
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 44100
self.record_seconds = 5
# Initialize PyAudio
self.p = pyaudio.PyAudio()
# ROS initialization
rospy.init_node('voice_to_action_node', anonymous=True)
self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
self.voice_command_pub = rospy.Publisher('/voice_command', String, queue_size=10)
# Robot state
self.robot_state = {
'position': {'x': 0, 'y': 0, 'theta': 0},
'battery_level': 100,
'current_task': None
}
def record_audio(self, filename="command.wav"):
"""Record audio from microphone and save to file"""
print("Recording...")
stream = self.p.open(format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk)
frames = []
for i in range(0, int(self.rate / self.chunk * self.record_seconds)):
data = stream.read(self.chunk)
frames.append(data)
print("Recording finished.")
stream.stop_stream()
stream.close()
# Save the recorded data as a WAV file
wf = wave.open(filename, 'wb')
wf.setnchannels(self.channels)
wf.setsampwidth(self.p.get_sample_size(self.format))
wf.setframerate(self.rate)
wf.writeframes(b''.join(frames))
wf.close()
return filename
def transcribe_audio(self, audio_file):
"""Transcribe audio file using Whisper"""
result = self.model.transcribe(audio_file)
return result["text"].strip()
def process_command(self, command_text):
"""Process the transcribed command and generate robot action"""
command_text = command_text.lower()
# Define command patterns and corresponding actions
if "move forward" in command_text or "go forward" in command_text:
return self.move_forward()
elif "move backward" in command_text or "go backward" in command_text:
return self.move_backward()
elif "turn left" in command_text:
return self.turn_left()
elif "turn right" in command_text:
return self.turn_right()
elif "stop" in command_text:
return self.stop_robot()
elif "go to kitchen" in command_text:
return self.navigate_to_location("kitchen")
elif "pick up" in command_text or "grasp" in command_text:
return self.grasp_object(command_text)
elif "clean" in command_text or "tidy" in command_text:
return self.clean_room()
else:
print(f"Unknown command: {command_text}")
return None
def move_forward(self):
"""Generate forward movement command"""
cmd = Twist()
cmd.linear.x = 0.5 # Forward velocity
cmd.angular.z = 0.0 # No rotation
return cmd
def move_backward(self):
"""Generate backward movement command"""
cmd = Twist()
cmd.linear.x = -0.5 # Backward velocity
cmd.angular.z = 0.0 # No rotation
return cmd
def turn_left(self):
"""Generate left turn command"""
cmd = Twist()
cmd.linear.x = 0.0 # No forward movement
cmd.angular.z = 0.5 # Left rotation
return cmd
def turn_right(self):
"""Generate right turn command"""
cmd = Twist()
cmd.linear.x = 0.0 # No forward movement
cmd.angular.z = -0.5 # Right rotation
return cmd
def stop_robot(self):
"""Generate stop command"""
cmd = Twist()
cmd.linear.x = 0.0
cmd.angular.z = 0.0
return cmd
def navigate_to_location(self, location):
"""Navigate to a specific location using navigation stack"""
# This would typically integrate with ROS navigation stack
print(f"Planning path to {location}...")
# Implementation would use move_base or Nav2
pass
def grasp_object(self, command):
"""Handle object grasping commands"""
# Extract object name from command
# Implementation would use perception and manipulation stack
print(f"Attempting to grasp object based on command: {command}")
pass
def clean_room(self):
"""Execute room cleaning behavior"""
print("Starting room cleaning sequence...")
# Implementation would involve path planning and task execution
pass
def run_voice_loop(self):
"""Main loop for voice command processing"""
print("Voice-to-action system ready. Say a command...")
while not rospy.is_shutdown():
try:
# Record audio
audio_file = self.record_audio("temp_command.wav")
# Transcribe audio
command_text = self.transcribe_audio(audio_file)
print(f"Recognized: {command_text}")
# Publish recognized command
self.voice_command_pub.publish(String(data=command_text))
# Process command and execute action
action_cmd = self.process_command(command_text)
if action_cmd is not None:
# Publish movement command
self.cmd_vel_pub.publish(action_cmd)
print(f"Executed action for command: {command_text}")
else:
print(f"Could not process command: {command_text}")
# Small delay before next command
rospy.sleep(1.0)
except Exception as e:
print(f"Error processing voice command: {e}")
def cleanup(self):
"""Clean up resources"""
self.p.terminate()
if __name__ == "__main__":
vta_system = VoiceToActionSystem()
try:
vta_system.run_voice_loop()
except KeyboardInterrupt:
print("Voice-to-action system stopped by user")
finally:
vta_system.cleanup()
Real-time Voice Processing
For real-time voice processing, we can implement a more sophisticated approach that continuously listens for voice commands:
import threading
import queue
import time
class RealTimeVoiceToAction(VoiceToActionSystem):
def __init__(self):
super().__init__()
# Audio stream parameters
self.audio_queue = queue.Queue()
self.is_listening = False
self.wake_word = "robot" # Wake word to activate the system
def audio_callback(self, in_data, frame_count, time_info, status):
"""Callback function for audio stream"""
self.audio_queue.put(in_data)
return (None, pyaudio.paContinue)
def start_listening(self):
"""Start continuous listening for voice commands"""
self.is_listening = True
# Open audio stream
self.stream = self.p.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk,
stream_callback=self.audio_callback
)
self.stream.start_stream()
# Start processing thread
self.processing_thread = threading.Thread(target=self.process_audio_stream)
self.processing_thread.daemon = True
self.processing_thread.start()
print("Started real-time listening for voice commands...")
def process_audio_stream(self):
"""Process audio stream in real-time"""
temp_audio = []
silence_threshold = 1000 # Threshold for detecting speech
max_frames = int(self.rate / self.chunk * 5) # 5 seconds of audio
while self.is_listening:
try:
# Get audio data from queue
data = self.audio_queue.get(timeout=1)
# Convert to numpy array for analysis
audio_array = np.frombuffer(data, dtype=np.int16)
amplitude = np.abs(audio_array).mean()
# Check if we detected speech (above threshold)
if amplitude > silence_threshold:
temp_audio.append(data)
# If we have enough audio data, process it
if len(temp_audio) > max_frames:
# Save the accumulated audio
self.save_audio_chunk(temp_audio, "temp_chunk.wav")
# Transcribe and process
command_text = self.transcribe_audio("temp_chunk.wav")
if command_text:
# Check for wake word activation
if self.wake_word in command_text.lower():
# Extract the actual command after the wake word
command_start = command_text.lower().find(self.wake_word) + len(self.wake_word)
actual_command = command_text[command_start:].strip()
if actual_command:
print(f"Command received: {actual_command}")
action_cmd = self.process_command(actual_command)
if action_cmd is not None:
self.cmd_vel_pub.publish(action_cmd)
print(f"Executed action for command: {actual_command}")
# Clear the buffer
temp_audio = []
else:
# If it's silence, clear the buffer
temp_audio = []
except queue.Empty:
continue
except Exception as e:
print(f"Error in audio processing: {e}")
continue
def save_audio_chunk(self, audio_data, filename):
"""Save accumulated audio data to file"""
wf = wave.open(filename, 'wb')
wf.setnchannels(self.channels)
wf.setsampwidth(self.p.get_sample_size(self.format))
wf.setframerate(self.rate)
wf.writeframes(b''.join(audio_data))
wf.close()
def stop_listening(self):
"""Stop continuous listening"""
self.is_listening = False
if hasattr(self, 'stream'):
self.stream.stop_stream()
self.stream.close()
Integration with ROS 2
For ROS 2 integration, here's how to adapt the voice-to-action system:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from sensor_msgs.msg import AudioData
import whisper
import pyaudio
import wave
import numpy as np
class ROS2VoiceToAction(Node):
def __init__(self):
super().__init__('voice_to_action_node')
# Load Whisper model
self.model = whisper.load_model("base")
# Publishers
self.cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.voice_command_pub = self.create_publisher(String, '/voice_command', 10)
# Audio recording parameters
self.chunk = 1024
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 44100
self.record_seconds = 5
# Initialize PyAudio
self.p = pyaudio.PyAudio()
# Timer for periodic command processing
self.timer = self.create_timer(1.0, self.process_voice_command)
self.get_logger().info("ROS2 Voice-to-Action node initialized")
def record_audio(self, filename="command.wav"):
"""Record audio from microphone and save to file"""
self.get_logger().info("Recording audio...")
stream = self.p.open(format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk)
frames = []
for i in range(0, int(self.rate / self.chunk * self.record_seconds)):
data = stream.read(self.chunk)
frames.append(data)
self.get_logger().info("Recording finished.")
stream.stop_stream()
stream.close()
# Save the recorded data as a WAV file
wf = wave.open(filename, 'wb')
wf.setnchannels(self.channels)
wf.setsampwidth(self.p.get_sample_size(self.format))
wf.setframerate(self.rate)
wf.writeframes(b''.join(frames))
wf.close()
return filename
def transcribe_audio(self, audio_file):
"""Transcribe audio file using Whisper"""
result = self.model.transcribe(audio_file)
return result["text"].strip()
def process_command(self, command_text):
"""Process the transcribed command and generate robot action"""
command_text = command_text.lower()
cmd = Twist()
if "move forward" in command_text or "go forward" in command_text:
cmd.linear.x = 0.5
elif "move backward" in command_text or "go backward" in command_text:
cmd.linear.x = -0.5
elif "turn left" in command_text:
cmd.angular.z = 0.5
elif "turn right" in command_text:
cmd.angular.z = -0.5
elif "stop" in command_text:
cmd.linear.x = 0.0
cmd.angular.z = 0.0
else:
self.get_logger().info(f"Unknown command: {command_text}")
return None
return cmd
def process_voice_command(self):
"""Process voice command periodically"""
try:
# Record audio
audio_file = self.record_audio("temp_command.wav")
# Transcribe audio
command_text = self.transcribe_audio(audio_file)
self.get_logger().info(f"Recognized: {command_text}")
# Publish recognized command
cmd_msg = String()
cmd_msg.data = command_text
self.voice_command_pub.publish(cmd_msg)
# Process command and execute action
action_cmd = self.process_command(command_text)
if action_cmd is not None:
self.cmd_vel_pub.publish(action_cmd)
self.get_logger().info(f"Executed action for command: {command_text}")
else:
self.get_logger().info(f"Could not process command: {command_text}")
except Exception as e:
self.get_logger().error(f"Error processing voice command: {e}")
def main(args=None):
rclpy.init(args=args)
voice_to_action = ROS2VoiceToAction()
try:
rclpy.spin(voice_to_action)
except KeyboardInterrupt:
pass
finally:
voice_to_action.p.terminate()
voice_to_action.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Advanced Voice Command Processing
For more sophisticated voice command processing, we can implement intent recognition and entity extraction:
import re
from typing import Dict, List, Tuple
class AdvancedVoiceCommandProcessor:
def __init__(self):
# Define command patterns and their corresponding actions
self.command_patterns = {
'navigation': [
(r'move (forward|backward|left|right)', self.handle_navigation),
(r'go to (?:the )?(\w+)', self.handle_goto),
(r'go (?:to )?(?:the )?(\w+)', self.handle_goto),
],
'manipulation': [
(r'pick up (?:the )?(\w+)', self.handle_pickup),
(r'grasp (?:the )?(\w+)', self.handle_pickup),
(r'put down', self.handle_putdown),
],
'cleaning': [
(r'clean (?:the )?(\w+)', self.handle_clean),
(r'tidy (?:the )?(\w+)', self.handle_clean),
(r'organize (?:the )?(\w+)', self.handle_clean),
]
}
# Define location keywords
self.locations = {
'kitchen', 'bedroom', 'living room', 'bathroom', 'office',
'hallway', 'dining room', 'garage', 'garden'
}
# Define object keywords
self.objects = {
'cup', 'bottle', 'book', 'phone', 'keys', 'ball',
'toy', 'plate', 'fork', 'spoon', 'glass', 'box'
}
def extract_intent_and_entities(self, command: str) -> Tuple[str, Dict]:
"""Extract intent and entities from voice command"""
command_lower = command.lower()
for intent_type, patterns in self.command_patterns.items():
for pattern, handler in patterns:
match = re.search(pattern, command_lower)
if match:
# Extract entities (object, location, etc.)
entities = {}
groups = match.groups()
if groups:
entity = groups[0].strip()
# Determine if entity is a location or object
if entity in self.locations:
entities['location'] = entity
elif entity in self.objects:
entities['object'] = entity
else:
# Could be either - need context or default to object
entities['target'] = entity
return intent_type, entities
return 'unknown', {}
def handle_navigation(self, entities: Dict) -> str:
"""Handle navigation commands"""
direction = entities.get('target', 'forward')
return f"NAVIGATE_{direction.upper()}"
def handle_goto(self, entities: Dict) -> str:
"""Handle go to location commands"""
location = entities.get('location', 'unknown')
return f"GOTO_{location.upper().replace(' ', '_')}"
def handle_pickup(self, entities: Dict) -> str:
"""Handle pickup object commands"""
obj = entities.get('object', 'unknown')
return f"PICKUP_{obj.upper().replace(' ', '_')}"
def handle_putdown(self, entities: Dict) -> str:
"""Handle put down object commands"""
return "PUTDOWN_OBJECT"
def handle_clean(self, entities: Dict) -> str:
"""Handle cleaning commands"""
area = entities.get('location', 'room')
return f"CLEAN_{area.upper().replace(' ', '_')}"
def process_advanced_command(self, command_text: str) -> str:
"""Process command with intent recognition"""
intent, entities = self.extract_intent_and_entities(command_text)
if intent != 'unknown':
handler = self.command_patterns[intent][0][1] # Get the first matching handler
action = handler(entities)
return action
else:
return "UNKNOWN_COMMAND"
Performance Optimization
To optimize Whisper performance for robotics applications:
import torch
from transformers import pipeline
class OptimizedVoiceToAction:
def __init__(self, use_gpu=True):
# Check if CUDA is available
self.device = "cuda" if torch.cuda.is_available() and use_gpu else "cpu"
# Load a smaller, faster model for real-time applications
self.model_name = "openai/whisper-tiny" # or "openai/whisper-base"
# Create pipeline for faster inference
self.transcriber = pipeline(
"automatic-speech-recognition",
model=self.model_name,
device=self.device
)
print(f"Voice recognition system initialized on {self.device}")
def transcribe_audio_optimized(self, audio_file):
"""Optimized transcription using transformers pipeline"""
result = self.transcriber(audio_file)
return result["text"].strip()
def transcribe_audio_chunked(self, audio_file, chunk_duration=30):
"""Transcribe long audio files in chunks"""
# This method handles long audio files by splitting them into chunks
# Implementation would depend on the specific audio processing library
pass
Testing and Evaluation
Create a test suite to validate your voice-to-action system:
import unittest
from unittest.mock import Mock, patch
class TestVoiceToAction(unittest.TestCase):
def setUp(self):
self.vta = VoiceToActionSystem()
def test_command_recognition(self):
"""Test that commands are correctly recognized and processed"""
# Test forward movement
cmd = self.vta.process_command("move forward")
self.assertIsNotNone(cmd)
self.assertEqual(cmd.linear.x, 0.5)
self.assertEqual(cmd.angular.z, 0.0)
# Test turn command
cmd = self.vta.process_command("turn left")
self.assertIsNotNone(cmd)
self.assertEqual(cmd.linear.x, 0.0)
self.assertEqual(cmd.angular.z, 0.5)
def test_unknown_command(self):
"""Test handling of unknown commands"""
cmd = self.vta.process_command("invalid command")
self.assertIsNone(cmd)
def test_case_insensitive(self):
"""Test that commands work regardless of case"""
cmd1 = self.vta.process_command("MOVE FORWARD")
cmd2 = self.vta.process_command("move forward")
self.assertEqual(cmd1.linear.x, cmd2.linear.x)
self.assertEqual(cmd1.angular.z, cmd2.angular.z)
if __name__ == '__main__':
unittest.main()
Best Practices for Voice-to-Action Systems
-
Audio Quality: Ensure good microphone placement and audio preprocessing to improve recognition accuracy.
-
Wake Word Activation: Implement a wake word system to prevent the robot from responding to unrelated speech.
-
Error Handling: Include robust error handling for cases where commands are unclear or cannot be processed.
-
Context Awareness: Consider the robot's current state and environment when interpreting commands.
-
Privacy Considerations: Implement proper data handling for voice data, especially in sensitive environments.
-
Fallback Mechanisms: Provide alternative control methods when voice recognition fails.
Next Steps
Continue to the next section to learn about cognitive planning with LLMs, where we'll explore how to use large language models to translate natural language commands into sequences of robotic actions.