488 lines
15 KiB
Markdown
488 lines
15 KiB
Markdown
# InferencePipeline
|
|
|
|
A high-performance multi-stage inference pipeline system designed for Kneron NPU dongles, enabling flexible single-stage and cascaded multi-stage AI inference workflows.
|
|
|
|
<!-- ## Features
|
|
|
|
- **Single-stage inference**: Direct replacement for MultiDongle with enhanced features
|
|
- **Multi-stage cascaded pipelines**: Chain multiple AI models for complex workflows
|
|
- **Flexible preprocessing/postprocessing**: Custom data transformation between stages
|
|
- **Thread-safe design**: Concurrent processing with automatic queue management
|
|
- **Real-time performance**: Optimized for live video streams and high-throughput scenarios
|
|
- **Comprehensive statistics**: Built-in performance monitoring and metrics -->
|
|
|
|
## Installation
|
|
|
|
This project uses [uv](https://github.com/astral-sh/uv) for fast Python package management.
|
|
|
|
```bash
|
|
# Install uv if you haven't already
|
|
curl -LsSf https://astral.sh/uv/install.sh | sh
|
|
|
|
# Create and activate virtual environment
|
|
uv venv
|
|
source .venv/bin/activate # On Windows: .venv\Scripts\activate
|
|
|
|
# Install dependencies
|
|
uv pip install -r requirements.txt
|
|
```
|
|
|
|
### Requirements
|
|
|
|
```txt
|
|
"numpy>=2.2.6",
|
|
"opencv-python>=4.11.0.86",
|
|
```
|
|
|
|
### Hardware Requirements
|
|
|
|
- Kneron AI dongles (KL520, KL720, etc.)
|
|
- USB ports for device connections
|
|
- Compatible firmware files (`fw_scpu.bin`, `fw_ncpu.bin`)
|
|
- Trained model files (`.nef` format)
|
|
|
|
## Quick Start
|
|
|
|
### Single-Stage Pipeline
|
|
|
|
Replace your existing MultiDongle usage with InferencePipeline for enhanced features:
|
|
|
|
```python
|
|
from InferencePipeline import InferencePipeline, StageConfig
|
|
|
|
# Configure single stage
|
|
stage_config = StageConfig(
|
|
stage_id="fire_detection",
|
|
port_ids=[28, 32], # USB port IDs for your dongles
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="fire_detection_520.nef",
|
|
upload_fw=True
|
|
)
|
|
|
|
# Create and start pipeline
|
|
pipeline = InferencePipeline([stage_config], pipeline_name="FireDetection")
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
# Set up result callback
|
|
def handle_result(pipeline_data):
|
|
result = pipeline_data.stage_results.get("fire_detection", {})
|
|
print(f"🔥 Detection: {result.get('result', 'Unknown')} "
|
|
f"(Probability: {result.get('probability', 0.0):.3f})")
|
|
|
|
pipeline.set_result_callback(handle_result)
|
|
|
|
# Process frames
|
|
import cv2
|
|
cap = cv2.VideoCapture(0)
|
|
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if ret:
|
|
pipeline.put_data(frame)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
finally:
|
|
cap.release()
|
|
pipeline.stop()
|
|
```
|
|
|
|
### Multi-Stage Cascade Pipeline
|
|
|
|
Chain multiple models for complex workflows:
|
|
|
|
```python
|
|
from InferencePipeline import InferencePipeline, StageConfig
|
|
from Multidongle import PreProcessor, PostProcessor
|
|
|
|
# Custom preprocessing for second stage
|
|
def roi_extraction(frame, target_size):
|
|
"""Extract region of interest from detection results"""
|
|
# Extract center region as example
|
|
h, w = frame.shape[:2]
|
|
center_crop = frame[h//4:3*h//4, w//4:3*w//4]
|
|
return cv2.resize(center_crop, target_size)
|
|
|
|
# Custom result fusion
|
|
def combine_results(raw_output, **kwargs):
|
|
"""Combine detection + classification results"""
|
|
classification_prob = float(raw_output[0]) if raw_output.size > 0 else 0.0
|
|
detection_conf = kwargs.get('detection_conf', 0.5)
|
|
|
|
# Weighted combination
|
|
combined_score = (classification_prob * 0.7) + (detection_conf * 0.3)
|
|
|
|
return {
|
|
'combined_probability': combined_score,
|
|
'classification_prob': classification_prob,
|
|
'detection_conf': detection_conf,
|
|
'result': 'Fire Detected' if combined_score > 0.6 else 'No Fire',
|
|
'confidence': 'High' if combined_score > 0.8 else 'Low'
|
|
}
|
|
|
|
# Stage 1: Object Detection
|
|
detection_stage = StageConfig(
|
|
stage_id="object_detection",
|
|
port_ids=[28, 30],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="object_detection_520.nef",
|
|
upload_fw=True
|
|
)
|
|
|
|
# Stage 2: Fire Classification with preprocessing
|
|
classification_stage = StageConfig(
|
|
stage_id="fire_classification",
|
|
port_ids=[32, 34],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="fire_classification_520.nef",
|
|
upload_fw=True,
|
|
input_preprocessor=PreProcessor(resize_fn=roi_extraction),
|
|
output_postprocessor=PostProcessor(process_fn=combine_results)
|
|
)
|
|
|
|
# Create two-stage pipeline
|
|
pipeline = InferencePipeline(
|
|
[detection_stage, classification_stage],
|
|
pipeline_name="DetectionClassificationCascade"
|
|
)
|
|
|
|
# Enhanced result handler
|
|
def handle_cascade_result(pipeline_data):
|
|
detection = pipeline_data.stage_results.get("object_detection", {})
|
|
classification = pipeline_data.stage_results.get("fire_classification", {})
|
|
|
|
print(f"🎯 Detection: {detection.get('result', 'Unknown')} "
|
|
f"(Conf: {detection.get('probability', 0.0):.3f})")
|
|
print(f"🔥 Classification: {classification.get('result', 'Unknown')} "
|
|
f"(Combined: {classification.get('combined_probability', 0.0):.3f})")
|
|
print(f"⏱️ Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s")
|
|
print("-" * 50)
|
|
|
|
pipeline.set_result_callback(handle_cascade_result)
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
# Your processing loop here...
|
|
```
|
|
|
|
## Usage Examples
|
|
|
|
### Example 1: Real-time Webcam Processing
|
|
|
|
```python
|
|
from InferencePipeline import InferencePipeline, StageConfig
|
|
from Multidongle import WebcamSource
|
|
|
|
def run_realtime_detection():
|
|
# Configure pipeline
|
|
config = StageConfig(
|
|
stage_id="realtime_detection",
|
|
port_ids=[28, 32],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="your_model.nef",
|
|
upload_fw=True,
|
|
max_queue_size=30 # Prevent memory buildup
|
|
)
|
|
|
|
pipeline = InferencePipeline([config])
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
# Use webcam source
|
|
source = WebcamSource(camera_id=0)
|
|
source.start()
|
|
|
|
def display_results(pipeline_data):
|
|
result = pipeline_data.stage_results["realtime_detection"]
|
|
probability = result.get('probability', 0.0)
|
|
detection = result.get('result', 'Unknown')
|
|
|
|
# Your visualization logic here
|
|
print(f"Detection: {detection} ({probability:.3f})")
|
|
|
|
pipeline.set_result_callback(display_results)
|
|
|
|
try:
|
|
while True:
|
|
frame = source.get_frame()
|
|
if frame is not None:
|
|
pipeline.put_data(frame)
|
|
time.sleep(0.033) # ~30 FPS
|
|
except KeyboardInterrupt:
|
|
print("Stopping...")
|
|
finally:
|
|
source.stop()
|
|
pipeline.stop()
|
|
|
|
if __name__ == "__main__":
|
|
run_realtime_detection()
|
|
```
|
|
|
|
### Example 2: Complex Multi-Modal Pipeline
|
|
|
|
```python
|
|
def run_multimodal_pipeline():
|
|
"""Multi-modal fire detection with RGB, edge, and thermal-like analysis"""
|
|
|
|
def edge_preprocessing(frame, target_size):
|
|
"""Extract edge features"""
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
edges = cv2.Canny(gray, 50, 150)
|
|
edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
|
|
return cv2.resize(edges_3ch, target_size)
|
|
|
|
def thermal_preprocessing(frame, target_size):
|
|
"""Simulate thermal processing"""
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
|
thermal_like = hsv[:, :, 2] # Value channel
|
|
thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR)
|
|
return cv2.resize(thermal_3ch, target_size)
|
|
|
|
def fusion_postprocessing(raw_output, **kwargs):
|
|
"""Fuse results from multiple modalities"""
|
|
if raw_output.size > 0:
|
|
current_prob = float(raw_output[0])
|
|
rgb_conf = kwargs.get('rgb_conf', 0.5)
|
|
edge_conf = kwargs.get('edge_conf', 0.5)
|
|
|
|
# Weighted fusion
|
|
fused_prob = (current_prob * 0.5) + (rgb_conf * 0.3) + (edge_conf * 0.2)
|
|
|
|
return {
|
|
'fused_probability': fused_prob,
|
|
'modality_scores': {
|
|
'thermal': current_prob,
|
|
'rgb': rgb_conf,
|
|
'edge': edge_conf
|
|
},
|
|
'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire',
|
|
'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium'
|
|
}
|
|
return {'fused_probability': 0.0, 'result': 'No Fire'}
|
|
|
|
# Define stages
|
|
stages = [
|
|
StageConfig("rgb_analysis", [28, 30], "fw_scpu.bin", "fw_ncpu.bin", "rgb_model.nef", True),
|
|
StageConfig("edge_analysis", [32, 34], "fw_scpu.bin", "fw_ncpu.bin", "edge_model.nef", True,
|
|
input_preprocessor=PreProcessor(resize_fn=edge_preprocessing)),
|
|
StageConfig("thermal_analysis", [36, 38], "fw_scpu.bin", "fw_ncpu.bin", "thermal_model.nef", True,
|
|
input_preprocessor=PreProcessor(resize_fn=thermal_preprocessing)),
|
|
StageConfig("fusion", [40, 42], "fw_scpu.bin", "fw_ncpu.bin", "fusion_model.nef", True,
|
|
output_postprocessor=PostProcessor(process_fn=fusion_postprocessing))
|
|
]
|
|
|
|
pipeline = InferencePipeline(stages, pipeline_name="MultiModalFireDetection")
|
|
|
|
def handle_multimodal_result(pipeline_data):
|
|
print(f"\n🔥 Multi-Modal Fire Detection Results:")
|
|
for stage_id, result in pipeline_data.stage_results.items():
|
|
if 'probability' in result:
|
|
print(f" {stage_id}: {result['result']} ({result['probability']:.3f})")
|
|
|
|
if 'fusion' in pipeline_data.stage_results:
|
|
fusion = pipeline_data.stage_results['fusion']
|
|
print(f" 🎯 FINAL: {fusion['result']} (Fused: {fusion['fused_probability']:.3f})")
|
|
print(f" Confidence: {fusion.get('confidence', 'Unknown')}")
|
|
|
|
pipeline.set_result_callback(handle_multimodal_result)
|
|
|
|
# Start pipeline
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
# Your processing logic here...
|
|
```
|
|
|
|
### Example 3: Batch Processing
|
|
|
|
```python
|
|
def process_image_batch(image_paths):
|
|
"""Process a batch of images through pipeline"""
|
|
|
|
config = StageConfig(
|
|
stage_id="batch_processing",
|
|
port_ids=[28, 32],
|
|
scpu_fw_path="fw_scpu.bin",
|
|
ncpu_fw_path="fw_ncpu.bin",
|
|
model_path="batch_model.nef",
|
|
upload_fw=True
|
|
)
|
|
|
|
pipeline = InferencePipeline([config])
|
|
pipeline.initialize()
|
|
pipeline.start()
|
|
|
|
results = []
|
|
|
|
def collect_result(pipeline_data):
|
|
result = pipeline_data.stage_results["batch_processing"]
|
|
results.append({
|
|
'pipeline_id': pipeline_data.pipeline_id,
|
|
'result': result,
|
|
'processing_time': pipeline_data.metadata.get('total_processing_time', 0.0)
|
|
})
|
|
|
|
pipeline.set_result_callback(collect_result)
|
|
|
|
# Submit all images
|
|
for img_path in image_paths:
|
|
image = cv2.imread(img_path)
|
|
if image is not None:
|
|
pipeline.put_data(image)
|
|
|
|
# Wait for all results
|
|
import time
|
|
while len(results) < len(image_paths):
|
|
time.sleep(0.1)
|
|
|
|
pipeline.stop()
|
|
return results
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### StageConfig Parameters
|
|
|
|
```python
|
|
StageConfig(
|
|
stage_id="unique_stage_name", # Required: Unique identifier
|
|
port_ids=[28, 32], # Required: USB port IDs for dongles
|
|
scpu_fw_path="fw_scpu.bin", # Required: SCPU firmware path
|
|
ncpu_fw_path="fw_ncpu.bin", # Required: NCPU firmware path
|
|
model_path="model.nef", # Required: Model file path
|
|
upload_fw=True, # Upload firmware on init
|
|
max_queue_size=50, # Queue size limit
|
|
input_preprocessor=None, # Optional: Inter-stage preprocessing
|
|
output_postprocessor=None, # Optional: Inter-stage postprocessing
|
|
stage_preprocessor=None, # Optional: MultiDongle preprocessing
|
|
stage_postprocessor=None # Optional: MultiDongle postprocessing
|
|
)
|
|
```
|
|
|
|
### Performance Tuning
|
|
|
|
```python
|
|
# For high-throughput scenarios
|
|
config = StageConfig(
|
|
stage_id="high_performance",
|
|
port_ids=[28, 30, 32, 34], # Use more dongles
|
|
max_queue_size=100, # Larger queues
|
|
# ... other params
|
|
)
|
|
|
|
# For low-latency scenarios
|
|
config = StageConfig(
|
|
stage_id="low_latency",
|
|
port_ids=[28, 32],
|
|
max_queue_size=10, # Smaller queues
|
|
# ... other params
|
|
)
|
|
```
|
|
|
|
## Statistics and Monitoring
|
|
|
|
```python
|
|
# Enable statistics reporting
|
|
def print_stats(stats):
|
|
print(f"\n📊 Pipeline Statistics:")
|
|
print(f" Input: {stats['pipeline_input_submitted']}")
|
|
print(f" Completed: {stats['pipeline_completed']}")
|
|
print(f" Success Rate: {stats['pipeline_completed']/max(stats['pipeline_input_submitted'], 1)*100:.1f}%")
|
|
|
|
for stage_stat in stats['stage_statistics']:
|
|
print(f" Stage {stage_stat['stage_id']}: "
|
|
f"Processed={stage_stat['processed_count']}, "
|
|
f"AvgTime={stage_stat['avg_processing_time']:.3f}s")
|
|
|
|
pipeline.set_stats_callback(print_stats)
|
|
pipeline.start_stats_reporting(interval=5.0) # Report every 5 seconds
|
|
```
|
|
|
|
## Running Examples
|
|
|
|
The project includes comprehensive examples in `test.py`:
|
|
|
|
```bash
|
|
# Single-stage pipeline
|
|
uv run python test.py --example single
|
|
|
|
# Two-stage cascade pipeline
|
|
uv run python test.py --example cascade
|
|
|
|
# Complex multi-stage pipeline
|
|
uv run python test.py --example complex
|
|
```
|
|
|
|
## API Reference
|
|
|
|
### InferencePipeline
|
|
|
|
Main pipeline orchestrator class.
|
|
|
|
**Methods:**
|
|
- `initialize()`: Initialize all pipeline stages
|
|
- `start()`: Start pipeline processing threads
|
|
- `stop()`: Gracefully stop pipeline
|
|
- `put_data(data, timeout=1.0)`: Submit data for processing
|
|
- `get_result(timeout=0.1)`: Get processed results
|
|
- `set_result_callback(callback)`: Set success callback
|
|
- `set_error_callback(callback)`: Set error callback
|
|
- `get_pipeline_statistics()`: Get performance metrics
|
|
|
|
### StageConfig
|
|
|
|
Configuration for individual pipeline stages.
|
|
|
|
### PipelineData
|
|
|
|
Data structure flowing through pipeline stages.
|
|
|
|
**Attributes:**
|
|
- `data`: Main data payload
|
|
- `metadata`: Processing metadata
|
|
- `stage_results`: Results from each stage
|
|
- `pipeline_id`: Unique identifier
|
|
- `timestamp`: Creation timestamp
|
|
|
|
## Performance Considerations
|
|
|
|
1. **Queue Sizing**: Balance memory usage vs. throughput with `max_queue_size`
|
|
2. **Dongle Distribution**: Distribute dongles across stages for optimal performance
|
|
3. **Preprocessing**: Minimize expensive operations in preprocessors
|
|
4. **Memory Management**: Monitor queue sizes and processing times
|
|
5. **Threading**: Pipeline uses multiple threads - ensure thread-safe operations
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
**Pipeline hangs or stops processing:**
|
|
- Check dongle connections and firmware compatibility
|
|
- Monitor queue sizes for bottlenecks
|
|
- Verify model file paths and formats
|
|
|
|
**High memory usage:**
|
|
- Reduce `max_queue_size` parameters
|
|
- Ensure proper cleanup in custom processors
|
|
- Monitor statistics for processing times
|
|
|
|
**Poor performance:**
|
|
- Distribute dongles optimally across stages
|
|
- Profile preprocessing/postprocessing functions
|
|
- Consider batch processing for high throughput
|
|
|
|
### Debug Mode
|
|
|
|
Enable detailed logging for troubleshooting:
|
|
|
|
```python
|
|
import logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# Pipeline will output detailed processing information
|
|
``` |