ROS2 Action Sequence Generator
Overview
The ROS2 Action Sequence Generator is responsible for converting the high-level action plans generated by the LLM cognitive planner into executable ROS2 action calls. This component bridges the gap between the abstract plan and the specific ROS2 interfaces used by the robot.
Architecture
[Action Plan] → [Sequence Generator] → [ROS2 Action Clients] → [Robot Execution]
↓
[Action Validation] → [Safety Checks] → [Execution Orchestration]
Implementation
Action Sequence Generator Core
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from geometry_msgs.msg import Twist, Pose, Point
from std_msgs.msg import String
from typing import Dict, List, Any, Optional, Callable
import logging
import time
from dataclasses import dataclass
from enum import Enum
class ActionStatus(Enum):
PENDING = "pending"
ACTIVE = "active"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
ABORTED = "aborted"
REJECTED = "rejected"
LOST = "lost"
@dataclass
class ActionResult:
"""Result of executing a single action"""
action_id: str
status: ActionStatus
result_data: Any = None
error_message: Optional[str] = None
execution_time: float = 0.0
class ROS2ActionSequenceGenerator:
def __init__(self, node: Node):
"""
Initialize the ROS2 action sequence generator
Args:
node: ROS2 node for creating action clients and publishers
"""
self.node = node
self.logger = logging.getLogger(__name__)
# Initialize action clients for common robot actions
self._initialize_action_clients()
# Set default timeouts
self.default_timeout = 60 # seconds
self.navigation_timeout = 300 # 5 minutes for navigation
self.manipulation_timeout = 120 # 2 minutes for manipulation
def _initialize_action_clients(self):
"""Initialize common action clients used by the robot"""
# Navigation action client (for MoveBase or NavigateToPose)
try:
from nav2_msgs.action import NavigateToPose
self.nav_client = ActionClient(
self.node,
NavigateToPose,
'navigate_to_pose'
)
except ImportError:
self.logger.warning("Navigation action client not available")
self.nav_client = None
# Manipulation action client (example interface)
try:
from control_msgs.action import FollowJointTrajectory
self.manip_client = ActionClient(
self.node,
FollowJointTrajectory,
'joint_trajectory_controller/follow_joint_trajectory'
)
except ImportError:
self.logger.warning("Manipulation action client not available")
self.manip_client = None
# Initialize other common action clients as needed
self.action_clients = {
'navigation': self.nav_client,
'manipulation': self.manip_client
}
# Publisher for basic movement commands
self.cmd_vel_pub = self.node.create_publisher(Twist, '/cmd_vel', 10)
def generate_action_sequence(self,
action_plan: List[Dict[str, Any]],
execution_options: Optional[Dict[str, Any]] = None) -> List[ActionResult]:
"""
Generate and execute a sequence of ROS2 actions
Args:
action_plan: List of action dictionaries from the planner
execution_options: Options for execution (continue_on_error, max_retries, etc.)
Returns:
List of results for each action in the sequence
"""
if execution_options is None:
execution_options = {
'continue_on_error': False,
'max_retries': 3,
'safety_check': True
}
results = []
continue_execution = True
for i, action in enumerate(action_plan):
if not continue_execution:
# Mark remaining actions as cancelled
for remaining_action in action_plan[i:]:
results.append(ActionResult(
action_id=remaining_action.get('id', f'unknown_{i}'),
status=ActionStatus.CANCELLED,
error_message="Execution cancelled due to previous error"
))
break
# Execute the action with retry logic
result = self._execute_action_with_retry(
action,
execution_options['max_retries'],
execution_options['safety_check']
)
results.append(result)
# Check if we should continue based on execution options
if not execution_options['continue_on_error'] and result.status != ActionStatus.SUCCEEDED:
self.logger.error(f"Action {action.get('id')} failed, stopping execution")
continue_execution = False
return results
def _execute_action_with_retry(self,
action: Dict[str, Any],
max_retries: int,
safety_check: bool) -> ActionResult:
"""Execute an action with retry logic"""
retry_count = 0
last_error = None
while retry_count < max_retries:
try:
# Perform safety check if enabled
if safety_check and not self._is_action_safe(action):
error_msg = f"Action {action.get('id', 'unknown')} failed safety check"
self.logger.warning(error_msg)
return ActionResult(
action_id=action.get('id', 'unknown'),
status=ActionStatus.REJECTED,
error_message=error_msg
)
# Execute the action
start_time = time.time()
result = self._execute_single_action(action)
execution_time = time.time() - start_time
# Check result status
if result.status == ActionStatus.SUCCEEDED:
self.logger.info(f"Action {action.get('id', 'unknown')} succeeded")
return ActionResult(
action_id=action.get('id', 'unknown'),
status=result.status,
result_data=result.result_data,
execution_time=execution_time
)
else:
# Action failed, prepare for retry
last_error = result.error_message
retry_count += 1
self.logger.warning(
f"Action {action.get('id', 'unknown')} failed (attempt {retry_count}), retrying..."
)
time.sleep(1) # Brief pause before retry
except Exception as e:
last_error = str(e)
retry_count += 1
self.logger.warning(
f"Exception in action {action.get('id', 'unknown')} (attempt {retry_count}), retrying: {str(e)}"
)
time.sleep(1)
# If we get here, all retries have been exhausted
error_msg = f"Action {action.get('id', 'unknown')} failed after {max_retries} retries: {last_error}"
self.logger.error(error_msg)
return ActionResult(
action_id=action.get('id', 'unknown'),
status=ActionStatus.ABORTED,
error_message=error_msg
)
def _execute_single_action(self, action: Dict[str, Any]) -> ActionResult:
"""Execute a single action based on its type and parameters"""
action_type = action.get('type', 'unknown')
action_name = action.get('action', 'unknown')
parameters = action.get('parameters', {})
self.logger.info(f"Executing {action_type} action: {action_name}")
try:
if action_type == 'navigation':
return self._execute_navigation_action(action_name, parameters)
elif action_type == 'manipulation':
return self._execute_manipulation_action(action_name, parameters)
elif action_type == 'interaction':
return self._execute_interaction_action(action_name, parameters)
elif action_type == 'perception':
return self._execute_perception_action(action_name, parameters)
elif action_type == 'movement':
return self._execute_movement_action(action_name, parameters)
else:
return ActionResult(
action_id=action.get('id', 'unknown'),
status=ActionStatus.REJECTED,
error_message=f"Unknown action type: {action_type}"
)
except Exception as e:
return ActionResult(
action_id=action.get('id', 'unknown'),
status=ActionStatus.ABORTED,
error_message=str(e)
)
def _execute_navigation_action(self, action_name: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute navigation-related actions"""
if action_name == 'navigate_to' or action_name == 'go_to':
destination = parameters.get('location') or parameters.get('destination')
if not destination:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message="No destination specified for navigation"
)
# Convert destination to appropriate format
pose = self._convert_to_pose(destination)
if not pose:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message=f"Invalid destination format: {destination}"
)
# Send navigation goal
return self._send_navigation_goal(pose)
elif action_name == 'move_forward':
distance = parameters.get('distance', 1.0)
return self._execute_move_forward(distance)
elif action_name == 'turn':
angle = parameters.get('angle', 90.0) # degrees
return self._execute_turn(angle)
else:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message=f"Unknown navigation action: {action_name}"
)
def _execute_manipulation_action(self, action_name: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute manipulation-related actions"""
if action_name == 'pick_up' or action_name == 'grasp':
object_name = parameters.get('object')
if not object_name:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message="No object specified for pick-up action"
)
# In a real implementation, this would interface with a manipulation system
# For simulation, we'll just log the action
self.logger.info(f"Simulated pick-up action for object: {object_name}")
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"object": object_name, "status": "picked_up"}
)
elif action_name == 'put_down' or action_name == 'release':
object_name = parameters.get('object')
location = parameters.get('location')
self.logger.info(f"Simulated put-down action for object: {object_name} at {location}")
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"object": object_name, "location": location, "status": "put_down"}
)
else:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message=f"Unknown manipulation action: {action_name}"
)
def _execute_interaction_action(self, action_name: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute interaction-related actions"""
if action_name == 'turn_on' or action_name == 'activate':
device = parameters.get('device')
return self._execute_device_interaction(device, True)
elif action_name == 'turn_off' or action_name == 'deactivate':
device = parameters.get('device')
return self._execute_device_interaction(device, False)
else:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message=f"Unknown interaction action: {action_name}"
)
def _execute_perception_action(self, action_name: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute perception-related actions"""
if action_name == 'detect_object' or action_name == 'find_object':
target_object = parameters.get('object')
search_area = parameters.get('search_area', 'current_view')
# Simulate object detection
# In a real system, this would interface with perception modules
detected = self._simulate_object_detection(target_object, search_area)
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED if detected else ActionStatus.ABORTED,
result_data={
"object": target_object,
"detected": detected,
"location": [1.0, 1.0, 0.0] if detected else None
}
)
else:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message=f"Unknown perception action: {action_name}"
)
def _execute_movement_action(self, action_name: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute basic movement actions"""
if action_name == 'move_forward':
distance = parameters.get('distance', 1.0)
return self._execute_move_forward(distance)
elif action_name == 'move_backward':
distance = parameters.get('distance', 1.0)
return self._execute_move_forward(-distance) # Negative distance for backward
elif action_name == 'turn_left':
angle = parameters.get('angle', 90.0)
return self._execute_turn(angle)
elif action_name == 'turn_right':
angle = parameters.get('angle', 90.0)
return self._execute_turn(-angle) # Negative angle for right turn
elif action_name == 'stop':
return self._execute_stop()
else:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message=f"Unknown movement action: {action_name}"
)
def _convert_to_pose(self, destination: Any) -> Optional[Pose]:
"""Convert various destination formats to a Pose object"""
if isinstance(destination, list) and len(destination) >= 2:
# Format: [x, y] or [x, y, z]
x, y = destination[0], destination[1]
z = destination[2] if len(destination) > 2 else 0.0
pose = Pose()
pose.position = Point(x=float(x), y=float(y), z=float(z))
# Simple orientation (facing forward)
pose.orientation.w = 1.0
return pose
elif isinstance(destination, dict):
# Format: {"x": x, "y": y, "z": z}
x = destination.get('x', 0.0)
y = destination.get('y', 0.0)
z = destination.get('z', 0.0)
pose = Pose()
pose.position = Point(x=float(x), y=float(y), z=float(z))
pose.orientation.w = 1.0
return pose
else:
# Unknown format
return None
def _send_navigation_goal(self, pose: Pose) -> ActionResult:
"""Send navigation goal to the navigation system"""
if not self.nav_client:
return ActionResult(
action_id='unknown',
status=ActionStatus.REJECTED,
error_message="Navigation client not available"
)
# Wait for the action server to be available
if not self.nav_client.wait_for_server(timeout_sec=5.0):
return ActionResult(
action_id='unknown',
status=ActionStatus.ABORTED,
error_message="Navigation server not available"
)
# Create the goal message
from nav2_msgs.action import NavigateToPose
goal_msg = NavigateToPose.Goal()
goal_msg.pose = pose
# Send the goal
future = self.nav_client.send_goal_async(goal_msg)
# Wait for the result (with timeout)
# In a real implementation, this would be handled asynchronously
# For simulation, we'll just return success
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"destination": [pose.position.x, pose.position.y]}
)
def _execute_move_forward(self, distance: float) -> ActionResult:
"""Execute a forward movement command"""
# Calculate the required velocity and duration
speed = 0.5 # m/s
duration = abs(distance) / speed
direction = 1.0 if distance >= 0 else -1.0
# Create velocity message
msg = Twist()
msg.linear.x = speed * direction
msg.angular.z = 0.0
# Publish the command for the specified duration
start_time = time.time()
while time.time() - start_time < duration:
self.cmd_vel_pub.publish(msg)
time.sleep(0.1)
# Stop the robot
msg.linear.x = 0.0
self.cmd_vel_pub.publish(msg)
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"distance": distance, "duration": duration}
)
def _execute_turn(self, angle_degrees: float) -> ActionResult:
"""Execute a turning command"""
# Convert degrees to radians
angle_rad = angle_degrees * 3.14159 / 180.0
# Calculate the required angular velocity and duration
# For a differential drive robot: angle = angular_velocity * time
angular_speed = 0.5 # rad/s
duration = abs(angle_rad) / angular_speed
direction = 1.0 if angle_rad >= 0 else -1.0
# Create velocity message
msg = Twist()
msg.linear.x = 0.0
msg.angular.z = angular_speed * direction
# Publish the command for the specified duration
start_time = time.time()
while time.time() - start_time < duration:
self.cmd_vel_pub.publish(msg)
time.sleep(0.1)
# Stop the robot
msg.angular.z = 0.0
self.cmd_vel_pub.publish(msg)
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"angle": angle_degrees, "duration": duration}
)
def _execute_stop(self) -> ActionResult:
"""Stop all robot movement"""
msg = Twist()
msg.linear.x = 0.0
msg.angular.z = 0.0
self.cmd_vel_pub.publish(msg)
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"status": "stopped"}
)
def _execute_device_interaction(self, device: str, activate: bool) -> ActionResult:
"""Execute a device interaction (turn on/off)"""
# In a real system, this would call a service to control the device
# For simulation, we'll just log the action
action = "activate" if activate else "deactivate"
self.logger.info(f"Simulated {action} action for device: {device}")
return ActionResult(
action_id='unknown',
status=ActionStatus.SUCCEEDED,
result_data={"device": device, "action": action, "status": "completed"}
)
def _simulate_object_detection(self, target_object: str, search_area: str) -> bool:
"""Simulate object detection for demonstration purposes"""
# In a real system, this would interface with perception modules
# For simulation, we'll return True for known objects
known_objects = ["red_cup", "blue_ball", "book", "phone", "keys"]
return target_object.lower() in [obj.lower() for obj in known_objects]
def _is_action_safe(self, action: Dict[str, Any]) -> bool:
"""Perform safety checks on an action before execution"""
action_type = action.get('type', 'unknown')
parameters = action.get('parameters', {})
# Safety checks for navigation
if action_type == 'navigation':
destination = parameters.get('location') or parameters.get('destination')
if destination:
# Check if destination is too far or in forbidden area
# This is a simplified check - real systems would have more sophisticated safety logic
if isinstance(destination, (list, tuple)) and len(destination) >= 2:
x, y = destination[0], destination[1]
# Basic bounds check (within 10m radius for safety)
if abs(x) > 10.0 or abs(y) > 10.0:
self.logger.warning(f"Navigation destination {destination} is outside safe bounds")
return False
# Safety checks for manipulation
elif action_type == 'manipulation':
# Check if object is within manipulable range
# For simulation, we'll assume all objects are safe to manipulate
pass
# Add more safety checks as needed for other action types
return True
Action Validation and Safety Module
from typing import Dict, List, Any, Tuple
import logging
class ActionValidator:
def __init__(self):
self.logger = logging.getLogger(__name__)
def validate_action_sequence(self,
action_sequence: List[Dict[str, Any]],
robot_capabilities: List[str]) -> Tuple[bool, List[str]]:
"""
Validate an action sequence against robot capabilities
Args:
action_sequence: List of action dictionaries
robot_capabilities: List of capabilities the robot has
Returns:
Tuple of (is_valid, list_of_validation_errors)
"""
errors = []
for i, action in enumerate(action_sequence):
action_type = action.get('type', 'unknown')
action_name = action.get('action', 'unknown')
# Check if robot has capability for this action type
if action_type not in robot_capabilities:
errors.append(
f"Action {i+1} ({action_name}): Robot does not have '{action_type}' capability"
)
# Validate action-specific parameters
action_errors = self._validate_action_parameters(action)
errors.extend([f"Action {i+1} ({action_name}): {error}" for error in action_errors])
return len(errors) == 0, errors
def _validate_action_parameters(self, action: Dict[str, Any]) -> List[str]:
"""Validate parameters for a specific action"""
errors = []
action_type = action.get('type', 'unknown')
parameters = action.get('parameters', {})
if action_type == 'navigation':
# Validate navigation parameters
destination = parameters.get('location') or parameters.get('destination')
if not destination:
errors.append("No destination specified for navigation action")
distance = parameters.get('distance')
if distance is not None and (not isinstance(distance, (int, float)) or distance <= 0):
errors.append("Invalid distance parameter for movement")
elif action_type == 'manipulation':
# Validate manipulation parameters
obj = parameters.get('object')
if not obj:
errors.append("No object specified for manipulation action")
elif action_type == 'interaction':
# Validate interaction parameters
device = parameters.get('device')
if not device:
errors.append("No device specified for interaction action")
return errors
def validate_action_dependencies(self, action_sequence: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
"""
Validate action dependencies in the sequence
Args:
action_sequence: List of action dictionaries
Returns:
Tuple of (is_valid, list_of_dependency_errors)
"""
errors = []
action_ids = {action.get('id') for action in action_sequence if action.get('id')}
for i, action in enumerate(action_sequence):
dependencies = action.get('dependencies', [])
for dep_id in dependencies:
if dep_id not in action_ids:
errors.append(
f"Action {i+1} ({action.get('action', 'unknown')}) "
f"depends on non-existent action ID: {dep_id}"
)
return len(errors) == 0, errors
def validate_action_feasibility(self,
action: Dict[str, Any],
environment_context: Dict[str, Any]) -> Tuple[bool, str]:
"""
Validate if an action is feasible in the current environment
Args:
action: Action dictionary
environment_context: Current environmental context
Returns:
Tuple of (is_feasible, reason_if_not_feasible)
"""
action_type = action.get('type', 'unknown')
parameters = action.get('parameters', {})
if action_type == 'navigation':
destination = parameters.get('location') or parameters.get('destination')
if destination:
# Check if destination is accessible
if not self._is_destination_accessible(destination, environment_context):
return False, f"Destination {destination} is not accessible"
elif action_type == 'manipulation':
obj = parameters.get('object')
if obj:
# Check if object exists and is reachable
if not self._is_object_reachable(obj, environment_context):
return False, f"Object {obj} is not reachable"
return True, "Action is feasible"
def _is_destination_accessible(self, destination: Any, context: Dict[str, Any]) -> bool:
"""Check if a destination is accessible in the environment"""
# In a real implementation, this would check against the environment map
# For simulation, we'll do a basic check
if isinstance(destination, (list, tuple)) and len(destination) >= 2:
x, y = destination[0], destination[1]
# Basic check: destination should be within environment bounds
return -50 <= x <= 50 and -50 <= y <= 50
return False
def _is_object_reachable(self, obj_name: str, context: Dict[str, Any]) -> bool:
"""Check if an object is reachable in the environment"""
object_locations = context.get('object_locations', {})
return obj_name in object_locations
Main Integration Module
class ROS2ActionExecutor:
def __init__(self, node: Node):
"""
Main module for executing ROS2 action sequences
Args:
node: ROS2 node for creating clients and publishers
"""
self.generator = ROS2ActionSequenceGenerator(node)
self.validator = ActionValidator()
self.logger = logging.getLogger(__name__)
def execute_action_sequence(self,
action_sequence: List[Dict[str, Any]],
robot_capabilities: List[str],
environment_context: Dict[str, Any],
execution_options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Execute a complete action sequence with validation and error handling
Args:
action_sequence: List of action dictionaries to execute
robot_capabilities: List of robot capabilities
environment_context: Environmental context for feasibility checks
execution_options: Options for execution (continue_on_error, max_retries, etc.)
Returns:
Dictionary with execution results
"""
if execution_options is None:
execution_options = {
'continue_on_error': False,
'max_retries': 3,
'safety_check': True
}
# Step 1: Validate the action sequence
self.logger.info(f"Validating action sequence with {len(action_sequence)} actions")
is_valid, validation_errors = self.validator.validate_action_sequence(
action_sequence, robot_capabilities
)
if not is_valid:
return {
"success": False,
"error": "Action sequence validation failed",
"validation_errors": validation_errors,
"results": []
}
# Step 2: Validate action dependencies
deps_valid, dependency_errors = self.validator.validate_action_dependencies(
action_sequence
)
if not deps_valid:
return {
"success": False,
"error": "Action dependency validation failed",
"dependency_errors": dependency_errors,
"results": []
}
# Step 3: Validate feasibility of each action
for i, action in enumerate(action_sequence):
is_feasible, reason = self.validator.validate_action_feasibility(
action, environment_context
)
if not is_feasible:
return {
"success": False,
"error": f"Action {i+1} is not feasible: {reason}",
"results": []
}
# Step 4: Execute the action sequence
self.logger.info("Starting action sequence execution")
results = self.generator.generate_action_sequence(
action_sequence, execution_options
)
# Step 5: Compile results
successful_actions = sum(1 for r in results if r.status == ActionStatus.SUCCEEDED)
total_actions = len(results)
execution_summary = {
"total_actions": total_actions,
"successful_actions": successful_actions,
"failed_actions": total_actions - successful_actions,
"success_rate": successful_actions / total_actions if total_actions > 0 else 0,
"results": [
{
"action_id": r.action_id,
"status": r.status.value,
"error": r.error_message,
"execution_time": r.execution_time
} for r in results
]
}
overall_success = (execution_summary["success_rate"] == 1.0 or
(execution_options.get("continue_on_error", False) and successful_actions > 0))
return {
"success": overall_success,
"summary": execution_summary,
"results": results
}
def execute_single_action(self, action: Dict[str, Any]) -> ActionResult:
"""Execute a single action directly"""
return self.generator._execute_single_action(action)
def cancel_execution(self):
"""Cancel any ongoing execution"""
# In a real implementation, this would cancel ongoing actions
# For simulation, we'll just log the cancellation
self.logger.info("Execution cancellation requested")
return True
Usage Example
import rclpy
from rclpy.node import Node
class VLAActionExecutorNode(Node):
def __init__(self):
super().__init__('vla_action_executor')
# Initialize the action executor
self.action_executor = ROS2ActionExecutor(self)
# Define robot capabilities
self.robot_capabilities = [
'navigation', 'manipulation', 'perception', 'interaction'
]
# Define environment context
self.environment_context = {
'object_locations': {
'red_cup': [2.0, 1.5, 0.8],
'blue_ball': [3.2, -1.0, 0.8],
'kitchen': [5.0, 0.0, 0.0]
},
'robot_position': [0.0, 0.0, 0.0],
'map_bounds': [-10, -10, 10, 10] # min_x, min_y, max_x, max_y
}
def execute_plan(self, action_sequence: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a plan with the action sequence"""
execution_options = {
'continue_on_error': True,
'max_retries': 3,
'safety_check': True
}
result = self.action_executor.execute_action_sequence(
action_sequence,
self.robot_capabilities,
self.environment_context,
execution_options
)
return result
def main():
rclpy.init()
node = VLAActionExecutorNode()
# Example action sequence from the LLM planner
action_sequence = [
{
'id': 'nav_001',
'type': 'navigation',
'action': 'navigate_to',
'parameters': {'location': [2.0, 1.0, 0.0]},
'dependencies': []
},
{
'id': 'manip_001',
'type': 'manipulation',
'action': 'pick_up',
'parameters': {'object': 'red_cup'},
'dependencies': ['nav_001']
},
{
'id': 'nav_002',
'type': 'navigation',
'action': 'navigate_to',
'parameters': {'location': [0.0, 0.0, 0.0]},
'dependencies': ['manip_001']
}
]
# Execute the plan
result = node.execute_plan(action_sequence)
print(f"Execution success: {result['success']}")
print(f"Success rate: {result['summary']['success_rate']:.2f}")
print("Action results:")
for action_result in result['summary']['results']:
print(f" {action_result['action_id']}: {action_result['status']}")
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Safety and Error Handling
The ROS2 Action Sequence Generator implements comprehensive safety and error handling:
- Action Validation: Validates each action against robot capabilities before execution
- Dependency Checking: Ensures action dependencies are properly ordered
- Feasibility Verification: Checks if actions are possible in the current environment
- Safety Constraints: Enforces safety rules during execution
- Retry Logic: Implements configurable retry mechanisms for failed actions
- Execution Monitoring: Tracks and reports the status of each action
This module provides a robust bridge between the high-level plans generated by the LLM cognitive planner and the low-level ROS2 interfaces used by the robot, ensuring safe and reliable execution of complex tasks.