Cluster/debug_deployment.py
2025-07-17 17:04:56 +08:00

273 lines
10 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Debug script to trace deployment pipeline data flow.
This script helps identify where data flow breaks during deployment.
"""
import sys
import os
import json
from typing import Dict, Any
# Add the project root to the Python path
project_root = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, project_root)
sys.path.insert(0, os.path.join(project_root, 'core', 'functions'))
try:
from core.functions.mflow_converter import MFlowConverter
from core.functions.workflow_orchestrator import WorkflowOrchestrator
from core.functions.InferencePipeline import InferencePipeline
IMPORTS_AVAILABLE = True
except ImportError as e:
print(f"❌ Import error: {e}")
IMPORTS_AVAILABLE = False
def create_test_pipeline_data() -> Dict[str, Any]:
"""Create a minimal test pipeline that should work."""
return {
'project_name': 'Debug Test Pipeline',
'description': 'Simple test pipeline for debugging data flow',
'version': '1.0',
'nodes': [
{
'id': 'input_1',
'name': 'Camera Input',
'type': 'ExactInputNode',
'pos': [100, 100],
'properties': {
'source_type': 'camera', # lowercase to match WorkflowOrchestrator
'device_id': 0,
'resolution': '640x480', # smaller resolution for testing
'fps': 10 # lower fps for testing
}
},
{
'id': 'model_1',
'name': 'Test Model',
'type': 'ExactModelNode',
'pos': [300, 100],
'properties': {
'model_path': '/path/to/test.nef',
'scpu_fw_path': 'fw_scpu.bin',
'ncpu_fw_path': 'fw_ncpu.bin',
'port_ids': [28, 32],
'upload_fw': True
}
},
{
'id': 'output_1',
'name': 'Debug Output',
'type': 'ExactOutputNode',
'pos': [500, 100],
'properties': {
'output_type': 'console',
'destination': './debug_output'
}
}
],
'connections': [
{
'input_node': 'input_1',
'input_port': 'output',
'output_node': 'model_1',
'output_port': 'input'
},
{
'input_node': 'model_1',
'input_port': 'output',
'output_node': 'output_1',
'output_port': 'input'
}
]
}
def trace_pipeline_conversion(pipeline_data: Dict[str, Any]):
"""Trace the conversion process step by step."""
print("🔍 DEBUGGING PIPELINE CONVERSION")
print("=" * 60)
if not IMPORTS_AVAILABLE:
print("❌ Cannot trace conversion - imports not available")
return None, None, None
try:
print("1⃣ Creating MFlowConverter...")
converter = MFlowConverter()
print("2⃣ Converting pipeline data to config...")
config = converter._convert_mflow_to_config(pipeline_data)
print(f"✅ Conversion successful!")
print(f" Pipeline name: {config.pipeline_name}")
print(f" Total stages: {len(config.stage_configs)}")
print("\n📊 INPUT CONFIG:")
print(json.dumps(config.input_config, indent=2))
print("\n📊 OUTPUT CONFIG:")
print(json.dumps(config.output_config, indent=2))
print("\n📊 STAGE CONFIGS:")
for i, stage_config in enumerate(config.stage_configs, 1):
print(f" Stage {i}: {stage_config.stage_id}")
print(f" Port IDs: {stage_config.port_ids}")
print(f" Model: {stage_config.model_path}")
print("\n3⃣ Validating configuration...")
is_valid, errors = converter.validate_config(config)
if is_valid:
print("✅ Configuration is valid")
else:
print("❌ Configuration validation failed:")
for error in errors:
print(f" - {error}")
return converter, config, is_valid
except Exception as e:
print(f"❌ Conversion failed: {e}")
import traceback
traceback.print_exc()
return None, None, False
def trace_workflow_creation(converter, config):
"""Trace the workflow orchestrator creation."""
print("\n🔧 DEBUGGING WORKFLOW ORCHESTRATOR")
print("=" * 60)
try:
print("1⃣ Creating InferencePipeline...")
pipeline = converter.create_inference_pipeline(config)
print("✅ Pipeline created")
print("2⃣ Creating WorkflowOrchestrator...")
orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config)
print("✅ Orchestrator created")
print("3⃣ Checking data source creation...")
data_source = orchestrator._create_data_source()
if data_source:
print(f"✅ Data source created: {type(data_source).__name__}")
# Check if the data source can initialize
print("4⃣ Testing data source initialization...")
if hasattr(data_source, 'initialize'):
init_result = data_source.initialize()
print(f" Initialization result: {init_result}")
else:
print(" Data source has no initialize method")
else:
print("❌ Data source creation failed")
print(f" Source type: {config.input_config.get('source_type', 'MISSING')}")
print("5⃣ Checking result handler creation...")
result_handler = orchestrator._create_result_handler()
if result_handler:
print(f"✅ Result handler created: {type(result_handler).__name__}")
else:
print("⚠️ No result handler created (may be expected)")
return orchestrator, data_source, result_handler
except Exception as e:
print(f"❌ Workflow creation failed: {e}")
import traceback
traceback.print_exc()
return None, None, None
def test_data_flow(orchestrator):
"""Test the actual data flow without real dongles."""
print("\n🌊 TESTING DATA FLOW")
print("=" * 60)
# Set up result callback to track data
results_received = []
def debug_result_callback(result_dict):
print(f"🎯 RESULT RECEIVED: {result_dict}")
results_received.append(result_dict)
def debug_frame_callback(frame):
print(f"📸 FRAME RECEIVED: {type(frame)} shape={getattr(frame, 'shape', 'N/A')}")
try:
print("1⃣ Setting up callbacks...")
orchestrator.set_result_callback(debug_result_callback)
orchestrator.set_frame_callback(debug_frame_callback)
print("2⃣ Starting orchestrator (this will fail with dongles, but should show data source activity)...")
orchestrator.start()
print("3⃣ Running for 5 seconds to capture any activity...")
import time
time.sleep(5)
print("4⃣ Stopping orchestrator...")
orchestrator.stop()
print(f"📊 Results summary:")
print(f" Total results received: {len(results_received)}")
return len(results_received) > 0
except Exception as e:
print(f"❌ Data flow test failed: {e}")
print(" This might be expected if dongles are not available")
return False
def main():
"""Main debugging function."""
print("🚀 CLUSTER4NPU DEPLOYMENT DEBUG TOOL")
print("=" * 60)
# Create test pipeline data
pipeline_data = create_test_pipeline_data()
# Trace conversion
converter, config, is_valid = trace_pipeline_conversion(pipeline_data)
if not converter or not config or not is_valid:
print("\n❌ Cannot proceed - conversion failed or invalid")
return
# Trace workflow creation
orchestrator, data_source, result_handler = trace_workflow_creation(converter, config)
if not orchestrator:
print("\n❌ Cannot proceed - workflow creation failed")
return
# Test data flow (this will likely fail with dongle connection, but shows data source behavior)
print("\n⚠️ Note: The following test will likely fail due to missing dongles,")
print(" but it will help us see if the data source is working correctly.")
data_flowing = test_data_flow(orchestrator)
print("\n📋 DEBUGGING SUMMARY")
print("=" * 60)
print(f"✅ Pipeline conversion: {'SUCCESS' if converter else 'FAILED'}")
print(f"✅ Configuration validation: {'SUCCESS' if is_valid else 'FAILED'}")
print(f"✅ Workflow orchestrator: {'SUCCESS' if orchestrator else 'FAILED'}")
print(f"✅ Data source creation: {'SUCCESS' if data_source else 'FAILED'}")
print(f"✅ Result handler creation: {'SUCCESS' if result_handler else 'N/A'}")
print(f"✅ Data flow test: {'SUCCESS' if data_flowing else 'FAILED (expected without dongles)'}")
if data_source and not data_flowing:
print("\n🔍 DIAGNOSIS:")
print("The issue appears to be that:")
print("1. Pipeline configuration is working correctly")
print("2. Data source can be created")
print("3. BUT: Either the data source cannot initialize (camera not available)")
print(" OR: The pipeline cannot start (dongles not available)")
print(" OR: No data is being sent to the pipeline")
print("\n💡 RECOMMENDATIONS:")
print("1. Check if a camera is connected at index 0")
print("2. Check if dongles are properly connected")
print("3. Add more detailed logging to WorkflowOrchestrator.start()")
print("4. Verify the pipeline.put_data() callback is being called")
if __name__ == "__main__":
main()