407 lines
16 KiB
Python
407 lines
16 KiB
Python
"""
|
|
InferencePipeline Usage Examples
|
|
================================
|
|
|
|
This file demonstrates how to use the InferencePipeline for various scenarios:
|
|
1. Single stage (equivalent to MultiDongle)
|
|
2. Two-stage cascade (detection -> classification)
|
|
3. Multi-stage complex pipeline
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import time
|
|
from InferencePipeline import (
|
|
InferencePipeline, StageConfig,
|
|
create_feature_extractor_preprocessor,
|
|
create_result_aggregator_postprocessor
|
|
)
|
|
from Multidongle import PreProcessor, PostProcessor, WebcamSource, RTSPSource
|
|
|
|
# =============================================================================
|
|
# Example 1: Single Stage Pipeline (Basic Usage)
|
|
# =============================================================================
|
|
|
|
def example_single_stage():
|
|
"""Single stage pipeline - equivalent to using MultiDongle directly"""
|
|
print("=== Single Stage Pipeline Example ===")
|
|
|
|
# Create stage configuration
|
|
stage_config = StageConfig(
|
|
stage_id="fire_detection",
|
|
port_ids=[28, 32],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="fire_detection_520.nef",
|
|
upload_fw=True,
|
|
max_queue_size=30
|
|
# Note: No inter-stage processors needed for single stage
|
|
# MultiDongle will handle internal preprocessing/postprocessing
|
|
)
|
|
|
|
# Create pipeline with single stage
|
|
pipeline = InferencePipeline(
|
|
stage_configs=[stage_config],
|
|
pipeline_name="SingleStageFireDetection"
|
|
)
|
|
|
|
# Initialize and start
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
# Process some data
|
|
data_source = WebcamSource(camera_id=0)
|
|
data_source.start()
|
|
|
|
def handle_result(pipeline_data):
|
|
result = pipeline_data.stage_results.get("fire_detection", {})
|
|
print(f"Fire Detection: {result.get('result', 'Unknown')} "
|
|
f"(Prob: {result.get('probability', 0.0):.3f})")
|
|
|
|
def handle_error(pipeline_data):
|
|
print(f"❌ Error: {pipeline_data.stage_results}")
|
|
|
|
pipeline.set_result_callback(handle_result)
|
|
pipeline.set_error_callback(handle_error)
|
|
|
|
try:
|
|
print("🚀 Starting single stage pipeline...")
|
|
for i in range(100): # Process 100 frames
|
|
frame = data_source.get_frame()
|
|
if frame is not None:
|
|
success = pipeline.put_data(frame, timeout=1.0)
|
|
if not success:
|
|
print("Pipeline input queue full, dropping frame")
|
|
time.sleep(0.1)
|
|
except KeyboardInterrupt:
|
|
print("\nStopping...")
|
|
finally:
|
|
data_source.stop()
|
|
pipeline.stop()
|
|
print("Single stage pipeline test completed")
|
|
|
|
# =============================================================================
|
|
# Example 2: Two-Stage Cascade Pipeline
|
|
# =============================================================================
|
|
|
|
def example_two_stage_cascade():
|
|
"""Two-stage cascade: Object Detection -> Fire Classification"""
|
|
print("=== Two-Stage Cascade Pipeline Example ===")
|
|
|
|
# Custom preprocessor for second stage
|
|
def roi_extraction_preprocess(frame, target_size):
|
|
"""Extract ROI from detection results and prepare for classification"""
|
|
# This would normally extract bounding box from first stage results
|
|
# For demo, we'll just do center crop
|
|
h, w = frame.shape[:2] if len(frame.shape) == 3 else frame.shape
|
|
center_x, center_y = w // 2, h // 2
|
|
crop_size = min(w, h) // 2
|
|
|
|
x1 = max(0, center_x - crop_size // 2)
|
|
y1 = max(0, center_y - crop_size // 2)
|
|
x2 = min(w, center_x + crop_size // 2)
|
|
y2 = min(h, center_y + crop_size // 2)
|
|
|
|
if len(frame.shape) == 3:
|
|
cropped = frame[y1:y2, x1:x2]
|
|
else:
|
|
cropped = frame[y1:y2, x1:x2]
|
|
|
|
return cv2.resize(cropped, target_size)
|
|
|
|
# Custom postprocessor for combining results
|
|
def combine_detection_classification(raw_output, **kwargs):
|
|
"""Combine detection and classification results"""
|
|
if raw_output.size > 0:
|
|
classification_prob = float(raw_output[0])
|
|
|
|
# Get detection result from metadata (would be passed from first stage)
|
|
detection_confidence = kwargs.get('detection_conf', 0.5)
|
|
|
|
# Combined confidence
|
|
combined_prob = (classification_prob * 0.7) + (detection_confidence * 0.3)
|
|
|
|
return {
|
|
'combined_probability': combined_prob,
|
|
'classification_prob': classification_prob,
|
|
'detection_conf': detection_confidence,
|
|
'result': 'Fire Detected' if combined_prob > 0.6 else 'No Fire',
|
|
'confidence': 'High' if combined_prob > 0.8 else 'Medium' if combined_prob > 0.5 else 'Low'
|
|
}
|
|
return {'combined_probability': 0.0, 'result': 'No Fire', 'confidence': 'Low'}
|
|
|
|
# Set up callbacks
|
|
def handle_cascade_result(pipeline_data):
|
|
"""Handle results from cascade pipeline"""
|
|
detection_result = pipeline_data.stage_results.get("object_detection", {})
|
|
classification_result = pipeline_data.stage_results.get("fire_classification", {})
|
|
|
|
print(f"Detection: {detection_result.get('result', 'Unknown')} "
|
|
f"(Prob: {detection_result.get('probability', 0.0):.3f})")
|
|
print(f"Classification: {classification_result.get('result', 'Unknown')} "
|
|
f"(Combined: {classification_result.get('combined_probability', 0.0):.3f})")
|
|
print(f"Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
|
|
print("-" * 50)
|
|
|
|
def handle_pipeline_stats(stats):
|
|
"""Handle pipeline statistics"""
|
|
print(f"\n📊 Pipeline Stats:")
|
|
print(f" Submitted: {stats['pipeline_input_submitted']}")
|
|
print(f" Completed: {stats['pipeline_completed']}")
|
|
print(f" Errors: {stats['pipeline_errors']}")
|
|
|
|
for stage_stat in stats['stage_statistics']:
|
|
print(f" Stage {stage_stat['stage_id']}: "
|
|
f"Processed={stage_stat['processed_count']}, "
|
|
f"AvgTime={stage_stat['avg_processing_time']:.3f}s")
|
|
|
|
# Stage 1: Object Detection
|
|
stage1_config = StageConfig(
|
|
stage_id="object_detection",
|
|
port_ids=[28, 30], # First set of dongles
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="object_detection_520.nef",
|
|
upload_fw=True,
|
|
max_queue_size=30
|
|
)
|
|
|
|
# Stage 2: Fire Classification
|
|
stage2_config = StageConfig(
|
|
stage_id="fire_classification",
|
|
port_ids=[32, 34], # Second set of dongles
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="fire_classification_520.nef",
|
|
upload_fw=True,
|
|
max_queue_size=30,
|
|
# Inter-stage processing
|
|
input_preprocessor=PreProcessor(resize_fn=roi_extraction_preprocess),
|
|
output_postprocessor=PostProcessor(process_fn=combine_detection_classification)
|
|
)
|
|
|
|
# Create two-stage pipeline
|
|
pipeline = InferencePipeline(
|
|
stage_configs=[stage1_config, stage2_config],
|
|
pipeline_name="TwoStageCascade"
|
|
)
|
|
|
|
pipeline.set_result_callback(handle_cascade_result)
|
|
pipeline.set_stats_callback(handle_pipeline_stats)
|
|
|
|
# Initialize and start
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
pipeline.start_stats_reporting(interval=10.0) # Stats every 10 seconds
|
|
|
|
# Process data
|
|
# data_source = RTSPSource("rtsp://your-camera-url")
|
|
data_source = WebcamSource(0)
|
|
data_source.start()
|
|
|
|
try:
|
|
frame_count = 0
|
|
while frame_count < 200:
|
|
frame = data_source.get_frame()
|
|
if frame is not None:
|
|
if pipeline.put_data(frame, timeout=1.0):
|
|
frame_count += 1
|
|
else:
|
|
print("Pipeline input queue full, dropping frame")
|
|
time.sleep(0.05)
|
|
except KeyboardInterrupt:
|
|
print("\nStopping cascade pipeline...")
|
|
finally:
|
|
data_source.stop()
|
|
pipeline.stop()
|
|
|
|
# =============================================================================
|
|
# Example 3: Complex Multi-Stage Pipeline
|
|
# =============================================================================
|
|
|
|
def example_complex_pipeline():
|
|
"""Complex multi-stage pipeline with feature extraction and fusion"""
|
|
print("=== Complex Multi-Stage Pipeline Example ===")
|
|
|
|
# Custom processors for different stages
|
|
def edge_detection_preprocess(frame, target_size):
|
|
"""Extract edge features"""
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
edges = cv2.Canny(gray, 50, 150)
|
|
edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
|
|
return cv2.resize(edges_3ch, target_size)
|
|
|
|
def thermal_simulation_preprocess(frame, target_size):
|
|
"""Simulate thermal-like processing"""
|
|
# Convert to HSV and extract V channel as pseudo-thermal
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
|
thermal_like = hsv[:, :, 2] # Value channel
|
|
thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR)
|
|
return cv2.resize(thermal_3ch, target_size)
|
|
|
|
def fusion_postprocess(raw_output, **kwargs):
|
|
"""Fuse results from multiple modalities"""
|
|
if raw_output.size > 0:
|
|
current_prob = float(raw_output[0])
|
|
|
|
# This would get previous stage results from pipeline metadata
|
|
# For demo, we'll simulate
|
|
rgb_confidence = kwargs.get('rgb_conf', 0.5)
|
|
edge_confidence = kwargs.get('edge_conf', 0.5)
|
|
|
|
# Weighted fusion
|
|
fused_prob = (current_prob * 0.5) + (rgb_confidence * 0.3) + (edge_confidence * 0.2)
|
|
|
|
return {
|
|
'fused_probability': fused_prob,
|
|
'individual_probs': {
|
|
'thermal': current_prob,
|
|
'rgb': rgb_confidence,
|
|
'edge': edge_confidence
|
|
},
|
|
'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire',
|
|
'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium' if fused_prob > 0.5 else 'Low'
|
|
}
|
|
return {'fused_probability': 0.0, 'result': 'No Fire', 'confidence': 'Low'}
|
|
|
|
# Stage 1: RGB Analysis
|
|
rgb_stage = StageConfig(
|
|
stage_id="rgb_analysis",
|
|
port_ids=[28, 30],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="rgb_fire_detection_520.nef",
|
|
upload_fw=True
|
|
)
|
|
|
|
# Stage 2: Edge Feature Analysis
|
|
edge_stage = StageConfig(
|
|
stage_id="edge_analysis",
|
|
port_ids=[32, 34],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="edge_fire_detection_520.nef",
|
|
upload_fw=True,
|
|
input_preprocessor=PreProcessor(resize_fn=edge_detection_preprocess)
|
|
)
|
|
|
|
# Stage 3: Thermal-like Analysis
|
|
thermal_stage = StageConfig(
|
|
stage_id="thermal_analysis",
|
|
port_ids=[36, 38],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="thermal_fire_detection_520.nef",
|
|
upload_fw=True,
|
|
input_preprocessor=PreProcessor(resize_fn=thermal_simulation_preprocess)
|
|
)
|
|
|
|
# Stage 4: Fusion
|
|
fusion_stage = StageConfig(
|
|
stage_id="result_fusion",
|
|
port_ids=[40, 42],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="fusion_520.nef",
|
|
upload_fw=True,
|
|
output_postprocessor=PostProcessor(process_fn=fusion_postprocess)
|
|
)
|
|
|
|
# Create complex pipeline
|
|
pipeline = InferencePipeline(
|
|
stage_configs=[rgb_stage, edge_stage, thermal_stage, fusion_stage],
|
|
pipeline_name="ComplexMultiModalPipeline"
|
|
)
|
|
|
|
# Advanced result handling
|
|
def handle_complex_result(pipeline_data):
|
|
"""Handle complex pipeline results"""
|
|
print(f"\n🔥 Multi-Modal Fire Detection Results:")
|
|
print(f" Pipeline ID: {pipeline_data.pipeline_id}")
|
|
|
|
for stage_id, result in pipeline_data.stage_results.items():
|
|
if 'probability' in result:
|
|
print(f" {stage_id}: {result.get('result', 'Unknown')} "
|
|
f"(Prob: {result.get('probability', 0.0):.3f})")
|
|
|
|
# Final fused result
|
|
if 'result_fusion' in pipeline_data.stage_results:
|
|
fusion_result = pipeline_data.stage_results['result_fusion']
|
|
print(f" 🎯 FINAL: {fusion_result.get('result', 'Unknown')} "
|
|
f"(Fused: {fusion_result.get('fused_probability', 0.0):.3f})")
|
|
print(f" Confidence: {fusion_result.get('confidence', 'Unknown')}")
|
|
|
|
print(f" Total Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
|
|
print("=" * 60)
|
|
|
|
def handle_error(pipeline_data):
|
|
"""Handle pipeline errors"""
|
|
print(f"❌ Pipeline Error for {pipeline_data.pipeline_id}")
|
|
for stage_id, result in pipeline_data.stage_results.items():
|
|
if 'error' in result:
|
|
print(f" Stage {stage_id} error: {result['error']}")
|
|
|
|
pipeline.set_result_callback(handle_complex_result)
|
|
pipeline.set_error_callback(handle_error)
|
|
|
|
# Initialize and start
|
|
try:
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
# Simulate data input
|
|
data_source = WebcamSource(camera_id=0)
|
|
data_source.start()
|
|
|
|
print("🚀 Complex pipeline started. Processing frames...")
|
|
|
|
frame_count = 0
|
|
start_time = time.time()
|
|
|
|
while frame_count < 50: # Process 50 frames for demo
|
|
frame = data_source.get_frame()
|
|
if frame is not None:
|
|
if pipeline.put_data(frame):
|
|
frame_count += 1
|
|
if frame_count % 10 == 0:
|
|
elapsed = time.time() - start_time
|
|
fps = frame_count / elapsed
|
|
print(f"📈 Processed {frame_count} frames, Pipeline FPS: {fps:.2f}")
|
|
time.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
print(f"Error in complex pipeline: {e}")
|
|
finally:
|
|
data_source.stop()
|
|
pipeline.stop()
|
|
|
|
# Final statistics
|
|
final_stats = pipeline.get_pipeline_statistics()
|
|
print(f"\n📊 Final Pipeline Statistics:")
|
|
print(f" Total Input: {final_stats['pipeline_input_submitted']}")
|
|
print(f" Completed: {final_stats['pipeline_completed']}")
|
|
print(f" Success Rate: {final_stats['pipeline_completed']/max(final_stats['pipeline_input_submitted'], 1)*100:.1f}%")
|
|
|
|
# =============================================================================
|
|
# Main Function - Run Examples
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="InferencePipeline Examples")
|
|
parser.add_argument("--example", choices=["single", "cascade", "complex"],
|
|
default="single", help="Which example to run")
|
|
args = parser.parse_args()
|
|
|
|
if args.example == "single":
|
|
example_single_stage()
|
|
elif args.example == "cascade":
|
|
example_two_stage_cascade()
|
|
elif args.example == "complex":
|
|
example_complex_pipeline()
|
|
else:
|
|
print("Available examples:")
|
|
print(" python pipeline_example.py --example single")
|
|
print(" python pipeline_example.py --example cascade")
|
|
print(" python pipeline_example.py --example complex") |