Cluster/README.md
2025-07-17 17:04:56 +08:00

15 KiB

InferencePipeline

A high-performance multi-stage inference pipeline system designed for Kneron NPU dongles, enabling flexible single-stage and cascaded multi-stage AI inference workflows.

Installation

This project uses uv for fast Python package management.

# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create and activate virtual environment
uv venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install dependencies
uv pip install -r requirements.txt

Requirements

"numpy>=2.2.6",
"opencv-python>=4.11.0.86",

Hardware Requirements

  • Kneron AI dongles (KL520, KL720, etc.)
  • USB ports for device connections
  • Compatible firmware files (fw_scpu.bin, fw_ncpu.bin)
  • Trained model files (.nef format)

Quick Start

Single-Stage Pipeline

Replace your existing MultiDongle usage with InferencePipeline for enhanced features:

from InferencePipeline import InferencePipeline, StageConfig

# Configure single stage
stage_config = StageConfig(
    stage_id="fire_detection",
    port_ids=[28, 32],  # USB port IDs for your dongles
    scpu_fw_path="fw_scpu.bin",
    ncpu_fw_path="fw_ncpu.bin", 
    model_path="fire_detection_520.nef",
    upload_fw=True
)

# Create and start pipeline
pipeline = InferencePipeline([stage_config], pipeline_name="FireDetection")
pipeline.initialize()
pipeline.start()

# Set up result callback
def handle_result(pipeline_data):
    result = pipeline_data.stage_results.get("fire_detection", {})
    print(f"🔥 Detection: {result.get('result', 'Unknown')} "
          f"(Probability: {result.get('probability', 0.0):.3f})")

pipeline.set_result_callback(handle_result)

# Process frames
import cv2
cap = cv2.VideoCapture(0)

try:
    while True:
        ret, frame = cap.read()
        if ret:
            pipeline.put_data(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    cap.release()
    pipeline.stop()

Multi-Stage Cascade Pipeline

Chain multiple models for complex workflows:

from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import PreProcessor, PostProcessor

# Custom preprocessing for second stage
def roi_extraction(frame, target_size):
    """Extract region of interest from detection results"""
    # Extract center region as example
    h, w = frame.shape[:2]
    center_crop = frame[h//4:3*h//4, w//4:3*w//4]
    return cv2.resize(center_crop, target_size)

# Custom result fusion
def combine_results(raw_output, **kwargs):
    """Combine detection + classification results"""
    classification_prob = float(raw_output[0]) if raw_output.size > 0 else 0.0
    detection_conf = kwargs.get('detection_conf', 0.5)
    
    # Weighted combination
    combined_score = (classification_prob * 0.7) + (detection_conf * 0.3)
    
    return {
        'combined_probability': combined_score,
        'classification_prob': classification_prob,
        'detection_conf': detection_conf,
        'result': 'Fire Detected' if combined_score > 0.6 else 'No Fire',
        'confidence': 'High' if combined_score > 0.8 else 'Low'
    }

# Stage 1: Object Detection
detection_stage = StageConfig(
    stage_id="object_detection",
    port_ids=[28, 30],
    scpu_fw_path="fw_scpu.bin",
    ncpu_fw_path="fw_ncpu.bin",
    model_path="object_detection_520.nef",
    upload_fw=True
)

# Stage 2: Fire Classification with preprocessing
classification_stage = StageConfig(
    stage_id="fire_classification",
    port_ids=[32, 34],
    scpu_fw_path="fw_scpu.bin", 
    ncpu_fw_path="fw_ncpu.bin",
    model_path="fire_classification_520.nef",
    upload_fw=True,
    input_preprocessor=PreProcessor(resize_fn=roi_extraction),
    output_postprocessor=PostProcessor(process_fn=combine_results)
)

# Create two-stage pipeline
pipeline = InferencePipeline(
    [detection_stage, classification_stage],
    pipeline_name="DetectionClassificationCascade"
)

# Enhanced result handler
def handle_cascade_result(pipeline_data):
    detection = pipeline_data.stage_results.get("object_detection", {})
    classification = pipeline_data.stage_results.get("fire_classification", {})
    
    print(f"🎯 Detection: {detection.get('result', 'Unknown')} "
          f"(Conf: {detection.get('probability', 0.0):.3f})")
    print(f"🔥 Classification: {classification.get('result', 'Unknown')} "
          f"(Combined: {classification.get('combined_probability', 0.0):.3f})")
    print(f"⏱️  Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
    print("-" * 50)

pipeline.set_result_callback(handle_cascade_result)
pipeline.initialize()
pipeline.start()

# Your processing loop here...

Usage Examples

Example 1: Real-time Webcam Processing

from InferencePipeline import InferencePipeline, StageConfig
from Multidongle import WebcamSource

def run_realtime_detection():
    # Configure pipeline
    config = StageConfig(
        stage_id="realtime_detection",
        port_ids=[28, 32], 
        scpu_fw_path="fw_scpu.bin",
        ncpu_fw_path="fw_ncpu.bin",
        model_path="your_model.nef",
        upload_fw=True,
        max_queue_size=30  # Prevent memory buildup
    )
    
    pipeline = InferencePipeline([config])
    pipeline.initialize()
    pipeline.start()
    
    # Use webcam source
    source = WebcamSource(camera_id=0)
    source.start()
    
    def display_results(pipeline_data):
        result = pipeline_data.stage_results["realtime_detection"]
        probability = result.get('probability', 0.0)
        detection = result.get('result', 'Unknown')
        
        # Your visualization logic here
        print(f"Detection: {detection} ({probability:.3f})")
    
    pipeline.set_result_callback(display_results)
    
    try:
        while True:
            frame = source.get_frame()
            if frame is not None:
                pipeline.put_data(frame)
            time.sleep(0.033)  # ~30 FPS
    except KeyboardInterrupt:
        print("Stopping...")
    finally:
        source.stop()
        pipeline.stop()

if __name__ == "__main__":
    run_realtime_detection()

Example 2: Complex Multi-Modal Pipeline

def run_multimodal_pipeline():
    """Multi-modal fire detection with RGB, edge, and thermal-like analysis"""
    
    def edge_preprocessing(frame, target_size):
        """Extract edge features"""
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
        return cv2.resize(edges_3ch, target_size)
    
    def thermal_preprocessing(frame, target_size):
        """Simulate thermal processing"""
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        thermal_like = hsv[:, :, 2]  # Value channel
        thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR)
        return cv2.resize(thermal_3ch, target_size)
    
    def fusion_postprocessing(raw_output, **kwargs):
        """Fuse results from multiple modalities"""
        if raw_output.size > 0:
            current_prob = float(raw_output[0])
            rgb_conf = kwargs.get('rgb_conf', 0.5)
            edge_conf = kwargs.get('edge_conf', 0.5)
            
            # Weighted fusion
            fused_prob = (current_prob * 0.5) + (rgb_conf * 0.3) + (edge_conf * 0.2)
            
            return {
                'fused_probability': fused_prob,
                'modality_scores': {
                    'thermal': current_prob,
                    'rgb': rgb_conf,
                    'edge': edge_conf
                },
                'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire',
                'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium'
            }
        return {'fused_probability': 0.0, 'result': 'No Fire'}
    
    # Define stages
    stages = [
        StageConfig("rgb_analysis", [28, 30], "fw_scpu.bin", "fw_ncpu.bin", "rgb_model.nef", True),
        StageConfig("edge_analysis", [32, 34], "fw_scpu.bin", "fw_ncpu.bin", "edge_model.nef", True,
                   input_preprocessor=PreProcessor(resize_fn=edge_preprocessing)),
        StageConfig("thermal_analysis", [36, 38], "fw_scpu.bin", "fw_ncpu.bin", "thermal_model.nef", True,
                   input_preprocessor=PreProcessor(resize_fn=thermal_preprocessing)),
        StageConfig("fusion", [40, 42], "fw_scpu.bin", "fw_ncpu.bin", "fusion_model.nef", True,
                   output_postprocessor=PostProcessor(process_fn=fusion_postprocessing))
    ]
    
    pipeline = InferencePipeline(stages, pipeline_name="MultiModalFireDetection")
    
    def handle_multimodal_result(pipeline_data):
        print(f"\n🔥 Multi-Modal Fire Detection Results:")
        for stage_id, result in pipeline_data.stage_results.items():
            if 'probability' in result:
                print(f"   {stage_id}: {result['result']} ({result['probability']:.3f})")
        
        if 'fusion' in pipeline_data.stage_results:
            fusion = pipeline_data.stage_results['fusion']
            print(f"   🎯 FINAL: {fusion['result']} (Fused: {fusion['fused_probability']:.3f})")
            print(f"   Confidence: {fusion.get('confidence', 'Unknown')}")
    
    pipeline.set_result_callback(handle_multimodal_result)
    
    # Start pipeline
    pipeline.initialize()
    pipeline.start()
    
    # Your processing logic here...

Example 3: Batch Processing

def process_image_batch(image_paths):
    """Process a batch of images through pipeline"""
    
    config = StageConfig(
        stage_id="batch_processing",
        port_ids=[28, 32],
        scpu_fw_path="fw_scpu.bin", 
        ncpu_fw_path="fw_ncpu.bin",
        model_path="batch_model.nef",
        upload_fw=True
    )
    
    pipeline = InferencePipeline([config])
    pipeline.initialize()
    pipeline.start()
    
    results = []
    
    def collect_result(pipeline_data):
        result = pipeline_data.stage_results["batch_processing"]
        results.append({
            'pipeline_id': pipeline_data.pipeline_id,
            'result': result,
            'processing_time': pipeline_data.metadata.get('total_processing_time', 0.0)
        })
    
    pipeline.set_result_callback(collect_result)
    
    # Submit all images
    for img_path in image_paths:
        image = cv2.imread(img_path)
        if image is not None:
            pipeline.put_data(image)
    
    # Wait for all results
    import time
    while len(results) < len(image_paths):
        time.sleep(0.1)
    
    pipeline.stop()
    return results

Configuration

StageConfig Parameters

StageConfig(
    stage_id="unique_stage_name",           # Required: Unique identifier
    port_ids=[28, 32],                      # Required: USB port IDs for dongles
    scpu_fw_path="fw_scpu.bin",            # Required: SCPU firmware path
    ncpu_fw_path="fw_ncpu.bin",            # Required: NCPU firmware path  
    model_path="model.nef",                 # Required: Model file path
    upload_fw=True,                         # Upload firmware on init
    max_queue_size=50,                      # Queue size limit
    input_preprocessor=None,                # Optional: Inter-stage preprocessing
    output_postprocessor=None,              # Optional: Inter-stage postprocessing
    stage_preprocessor=None,                # Optional: MultiDongle preprocessing
    stage_postprocessor=None                # Optional: MultiDongle postprocessing
)

Performance Tuning

# For high-throughput scenarios
config = StageConfig(
    stage_id="high_performance",
    port_ids=[28, 30, 32, 34],  # Use more dongles
    max_queue_size=100,          # Larger queues
    # ... other params
)

# For low-latency scenarios  
config = StageConfig(
    stage_id="low_latency",
    port_ids=[28, 32],
    max_queue_size=10,           # Smaller queues
    # ... other params
)

Statistics and Monitoring

# Enable statistics reporting
def print_stats(stats):
    print(f"\n📊 Pipeline Statistics:")
    print(f"   Input: {stats['pipeline_input_submitted']}")
    print(f"   Completed: {stats['pipeline_completed']}")
    print(f"   Success Rate: {stats['pipeline_completed']/max(stats['pipeline_input_submitted'], 1)*100:.1f}%")
    
    for stage_stat in stats['stage_statistics']:
        print(f"   Stage {stage_stat['stage_id']}: "
              f"Processed={stage_stat['processed_count']}, "
              f"AvgTime={stage_stat['avg_processing_time']:.3f}s")

pipeline.set_stats_callback(print_stats)
pipeline.start_stats_reporting(interval=5.0)  # Report every 5 seconds

Running Examples

The project includes comprehensive examples in test.py:

# Single-stage pipeline
uv run python test.py --example single

# Two-stage cascade pipeline
uv run python test.py --example cascade

# Complex multi-stage pipeline
uv run python test.py --example complex

API Reference

InferencePipeline

Main pipeline orchestrator class.

Methods:

  • initialize(): Initialize all pipeline stages
  • start(): Start pipeline processing threads
  • stop(): Gracefully stop pipeline
  • put_data(data, timeout=1.0): Submit data for processing
  • get_result(timeout=0.1): Get processed results
  • set_result_callback(callback): Set success callback
  • set_error_callback(callback): Set error callback
  • get_pipeline_statistics(): Get performance metrics

StageConfig

Configuration for individual pipeline stages.

PipelineData

Data structure flowing through pipeline stages.

Attributes:

  • data: Main data payload
  • metadata: Processing metadata
  • stage_results: Results from each stage
  • pipeline_id: Unique identifier
  • timestamp: Creation timestamp

Performance Considerations

  1. Queue Sizing: Balance memory usage vs. throughput with max_queue_size
  2. Dongle Distribution: Distribute dongles across stages for optimal performance
  3. Preprocessing: Minimize expensive operations in preprocessors
  4. Memory Management: Monitor queue sizes and processing times
  5. Threading: Pipeline uses multiple threads - ensure thread-safe operations

Troubleshooting

Common Issues

Pipeline hangs or stops processing:

  • Check dongle connections and firmware compatibility
  • Monitor queue sizes for bottlenecks
  • Verify model file paths and formats

High memory usage:

  • Reduce max_queue_size parameters
  • Ensure proper cleanup in custom processors
  • Monitor statistics for processing times

Poor performance:

  • Distribute dongles optimally across stages
  • Profile preprocessing/postprocessing functions
  • Consider batch processing for high throughput

Debug Mode

Enable detailed logging for troubleshooting:

import logging
logging.basicConfig(level=logging.DEBUG)

# Pipeline will output detailed processing information