Skip to main content

Logging and Error Handling for VLA Pipeline

Logging Infrastructure

Log Levels and Categories

The VLA pipeline implements comprehensive logging across different levels and categories:

Log Levels

  • DEBUG: Detailed diagnostic information for development and troubleshooting
  • INFO: General operational information and successful events
  • WARNING: Indications of potential issues that don't prevent operation
  • ERROR: Errors that prevent specific operations but don't crash the system
  • CRITICAL: Severe errors that may cause system failure

Log Categories

  • VOICE: Audio processing and transcription events
  • PLANNING: Cognitive planning and LLM interaction events
  • EXECUTION: ROS2 action execution events
  • PERCEPTION: Sensor data and environment perception events
  • SYSTEM: General system operation and infrastructure events

Log Format

All logs follow a structured JSON format:

{
"timestamp": "2023-01-01T10:00:00.000Z",
"level": "INFO",
"category": "VOICE",
"message": "Audio transcription completed successfully",
"context": {
"requestId": "unique-request-id",
"sessionId": "session-identifier",
"pipelineId": "vla-pipeline-id",
"duration": 1234,
"details": {
"audioLength": 5.2,
"confidence": 0.95,
"transcription": "Move forward 2 meters"
}
}
}

Logging Configuration

Voice Processing Logs

voice:
level: INFO
max_file_size: "10MB"
backup_count: 5
format: "%(asctime)s - VOICE - %(levelname)s - %(message)s"

Planning Logs

planning:
level: INFO
max_file_size: "10MB"
backup_count: 5
format: "%(asctime)s - PLANNING - %(levelname)s - %(message)s"

Execution Logs

execution:
level: INFO
max_file_size: "20MB"
backup_count: 10
format: "%(asctime)s - EXECUTION - %(levelname)s - %(message)s"

Error Handling Framework

Error Classification

Errors in the VLA pipeline are classified into several categories:

Voice Recognition Errors

  • AudioQualityError: Poor audio quality prevents accurate transcription
  • TimeoutError: Audio processing exceeds allowed time limit
  • ServiceError: Whisper API or audio processing service unavailable

Planning Errors

  • AmbiguousCommandError: Command is unclear and requires clarification
  • ContextError: Insufficient environmental context for planning
  • CapabilityError: Requested action exceeds robot capabilities

Execution Errors

  • ActionValidationError: Action parameters are invalid
  • SafetyViolationError: Action would violate safety constraints
  • ExecutionFailureError: Action failed during execution

System Errors

  • ResourceError: Insufficient system resources
  • CommunicationError: Communication failure between components
  • ConfigurationError: Incorrect system configuration

Error Response Format

All errors follow a consistent response format:

{
"error": {
"type": "ErrorType",
"code": "ERROR_CODE",
"message": "Human-readable error message",
"details": {
"timestamp": "2023-01-01T10:00:00.000Z",
"requestId": "request-identifier",
"pipelineId": "vla-pipeline-id",
"severity": "HIGH",
"suggestedAction": "Recommended next step"
}
}
}

Error Recovery Strategies

Voice Recognition Recovery

def handle_voice_error(error):
if isinstance(error, AudioQualityError):
return {
"status": "request_repeat",
"message": "Audio quality too low, please repeat command"
}
elif isinstance(error, TimeoutError):
return {
"status": "fallback_to_text",
"message": "Audio processing timed out, use text input"
}
else:
return {
"status": "service_unavailable",
"message": "Voice service temporarily unavailable"
}

Planning Recovery

def handle_planning_error(error):
if isinstance(error, AmbiguousCommandError):
return {
"status": "request_clarification",
"options": ["option1", "option2"],
"message": "Please clarify your command"
}
elif isinstance(error, ContextError):
return {
"status": "request_updated_context",
"message": "Need updated environmental information"
}
else:
return {
"status": "fallback_to_simple_action",
"message": "Falling back to simpler action"
}

Execution Recovery

def handle_execution_error(error):
if isinstance(error, SafetyViolationError):
return {
"status": "safety_override_required",
"message": "Action violates safety constraints"
}
elif isinstance(error, ActionValidationError):
return {
"status": "retry_with_modified_parameters",
"message": "Action parameters need adjustment"
}
else:
return {
"status": "abort_and_report",
"message": "Action execution failed"
}

Specific Error Handling Implementations

Voice-to-Action Error Handling

class VoiceToActionHandler:
def __init__(self):
self.max_audio_duration = 30 # seconds
self.min_confidence = 0.7

def process_audio(self, audio_data):
try:
# Validate audio input
if self.get_audio_duration(audio_data) > self.max_audio_duration:
raise TimeoutError("Audio exceeds maximum duration")

# Process with Whisper
transcription = self.transcribe_with_whisper(audio_data)

# Validate confidence
if transcription.confidence < self.min_confidence:
raise AudioQualityError("Low transcription confidence")

return {
"success": True,
"transcription": transcription.text,
"confidence": transcription.confidence
}

except TimeoutError as e:
logger.warning(f"Voice processing timeout: {str(e)}")
return {
"success": False,
"error": "timeout",
"message": "Audio processing timed out"
}
except AudioQualityError as e:
logger.warning(f"Audio quality issue: {str(e)}")
return {
"success": False,
"error": "low_quality",
"message": "Audio quality too low, please repeat"
}
except Exception as e:
logger.error(f"Unexpected error in voice processing: {str(e)}")
return {
"success": False,
"error": "service_error",
"message": "Voice service temporarily unavailable"
}

LLM Planning Error Handling

class LLMPlanningHandler:
def __init__(self):
self.max_planning_time = 60 # seconds
self.min_plan_confidence = 0.6

def generate_plan(self, command, context):
try:
# Set timeout for planning
start_time = time.time()

# Generate plan with LLM
plan = self.call_llm_planner(command, context)

# Check elapsed time
elapsed = time.time() - start_time
if elapsed > self.max_planning_time:
raise TimeoutError("Planning exceeded maximum time")

# Validate plan confidence
if plan.confidence < self.min_plan_confidence:
raise LowConfidenceError("Plan confidence below threshold")

# Validate plan feasibility
if not self.validate_plan_feasibility(plan, context):
raise PlanValidationError("Generated plan is not feasible")

return {
"success": True,
"plan": plan,
"confidence": plan.confidence
}

except TimeoutError:
logger.warning("LLM planning timeout")
return {
"success": False,
"error": "timeout",
"message": "Planning timed out"
}
except AmbiguousCommandError as e:
logger.info(f"Ambiguous command: {str(e)}")
return {
"success": False,
"error": "ambiguous_command",
"message": "Command is ambiguous",
"options": e.suggested_options
}
except PlanValidationError as e:
logger.warning(f"Plan validation failed: {str(e)}")
return {
"success": False,
"error": "plan_validation",
"message": "Generated plan is not feasible"
}
except Exception as e:
logger.error(f"Unexpected error in planning: {str(e)}")
return {
"success": False,
"error": "planning_error",
"message": "Planning service error"
}

ROS2 Execution Error Handling

class ROS2ExecutionHandler:
def __init__(self):
self.max_action_time = 300 # seconds (5 minutes)
self.max_retries = 3

def execute_action_sequence(self, actions):
results = []

for i, action in enumerate(actions):
retry_count = 0
success = False

while retry_count < self.max_retries and not success:
try:
# Execute the action with timeout
result = self.execute_single_action(action)

if result.status == "succeeded":
success = True
results.append({
"actionId": action.id,
"status": "succeeded",
"result": result.data
})
else:
retry_count += 1
if retry_count >= self.max_retries:
results.append({
"actionId": action.id,
"status": "failed",
"error": result.error,
"message": f"Action failed after {self.max_retries} retries"
})
else:
logger.info(f"Retrying action {action.id}, attempt {retry_count + 1}")

except SafetyViolationError as e:
logger.critical(f"Safety violation in action {action.id}: {str(e)}")
results.append({
"actionId": action.id,
"status": "failed",
"error": "safety_violation",
"message": "Action violates safety constraints"
})
# Stop execution on safety violations
break
except Exception as e:
logger.error(f"Error executing action {action.id}: {str(e)}")
retry_count += 1
if retry_count >= self.max_retries:
results.append({
"actionId": action.id,
"status": "failed",
"error": "execution_error",
"message": "Action execution failed"
})

return {
"success": all(r["status"] == "succeeded" for r in results),
"results": results
}

Monitoring and Observability

Key Metrics

The system tracks the following metrics for observability:

Performance Metrics

  • Voice transcription latency
  • Planning time
  • Action execution time
  • Overall pipeline throughput

Success Metrics

  • Voice transcription success rate
  • Planning success rate
  • Action execution success rate
  • End-to-end pipeline success rate

Error Metrics

  • Error rates by type and category
  • Error recovery success rate
  • Unhandled error count

Health Checks

The system implements health checks for each component:

def health_check():
checks = {
"voice_service": check_voice_service(),
"llm_service": check_llm_service(),
"ros2_connection": check_ros2_connection(),
"perception_service": check_perception_service(),
"overall_system": check_overall_system()
}

overall_status = all(check["status"] == "healthy" for check in checks.values())

return {
"status": "healthy" if overall_status else "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"checks": checks
}

This comprehensive logging and error handling framework ensures the VLA pipeline operates reliably and provides detailed information for debugging and monitoring purposes.