Cluster/core/nodes/model_node.py
2025-07-17 17:04:56 +08:00

174 lines
5.9 KiB
Python

"""
Model node implementation for ML inference operations.
This module provides the ModelNode class which represents AI model inference
nodes in the pipeline. It handles model loading, hardware allocation, and
inference configuration for various NPU dongles.
Main Components:
- ModelNode: Core model inference node implementation
- Model configuration and validation
- Hardware dongle management
Usage:
from cluster4npu_ui.core.nodes.model_node import ModelNode
node = ModelNode()
node.set_property('model_path', '/path/to/model.onnx')
node.set_property('dongle_series', '720')
"""
from .base_node import BaseNodeWithProperties
class ModelNode(BaseNodeWithProperties):
"""
Model node for ML inference operations.
This node represents an AI model inference stage in the pipeline, handling
model loading, hardware allocation, and inference configuration.
"""
__identifier__ = 'com.cluster.model_node'
NODE_NAME = 'Model Node'
def __init__(self):
super().__init__()
# Setup node connections
self.add_input('input', multi_input=False, color=(255, 140, 0))
self.add_output('output', color=(0, 255, 0))
self.set_color(65, 84, 102)
# Initialize properties
self.setup_properties()
def setup_properties(self):
"""Initialize model-specific properties."""
# Model configuration
self.create_business_property('model_path', '', {
'type': 'file_path',
'filter': 'Model files (*.onnx *.tflite *.pb *.nef)',
'description': 'Path to the model file'
})
# Hardware configuration
self.create_business_property('dongle_series', '520', [
'520', '720', '1080', 'Custom'
])
self.create_business_property('num_dongles', 1, {
'min': 1,
'max': 16,
'description': 'Number of dongles to use for this model'
})
self.create_business_property('port_id', '', {
'placeholder': 'e.g., 8080 or auto',
'description': 'Port ID for dongle communication'
})
# Performance configuration
self.create_business_property('batch_size', 1, {
'min': 1,
'max': 32,
'description': 'Inference batch size'
})
self.create_business_property('max_queue_size', 10, {
'min': 1,
'max': 100,
'description': 'Maximum input queue size'
})
# Advanced options
self.create_business_property('enable_preprocessing', True, {
'description': 'Enable built-in preprocessing'
})
self.create_business_property('enable_postprocessing', True, {
'description': 'Enable built-in postprocessing'
})
def validate_configuration(self) -> tuple[bool, str]:
"""
Validate the current node configuration.
Returns:
Tuple of (is_valid, error_message)
"""
# Check model path
model_path = self.get_property('model_path')
if not model_path:
return False, "Model path is required"
# Check dongle series
dongle_series = self.get_property('dongle_series')
if dongle_series not in ['520', '720', '1080', 'Custom']:
return False, f"Invalid dongle series: {dongle_series}"
# Check number of dongles
num_dongles = self.get_property('num_dongles')
if not isinstance(num_dongles, int) or num_dongles < 1:
return False, "Number of dongles must be at least 1"
return True, ""
def get_inference_config(self) -> dict:
"""
Get inference configuration for pipeline execution.
Returns:
Dictionary containing inference configuration
"""
return {
'node_id': self.id,
'node_name': self.name(),
'model_path': self.get_property('model_path'),
'dongle_series': self.get_property('dongle_series'),
'num_dongles': self.get_property('num_dongles'),
'port_id': self.get_property('port_id'),
'batch_size': self.get_property('batch_size'),
'max_queue_size': self.get_property('max_queue_size'),
'enable_preprocessing': self.get_property('enable_preprocessing'),
'enable_postprocessing': self.get_property('enable_postprocessing')
}
def get_hardware_requirements(self) -> dict:
"""
Get hardware requirements for this model node.
Returns:
Dictionary containing hardware requirements
"""
return {
'dongle_series': self.get_property('dongle_series'),
'num_dongles': self.get_property('num_dongles'),
'port_id': self.get_property('port_id'),
'estimated_memory': self._estimate_memory_usage(),
'estimated_power': self._estimate_power_usage()
}
def _estimate_memory_usage(self) -> float:
"""Estimate memory usage in MB."""
# Simple estimation based on batch size and number of dongles
base_memory = 512 # Base memory in MB
batch_factor = self.get_property('batch_size') * 50
dongle_factor = self.get_property('num_dongles') * 100
return base_memory + batch_factor + dongle_factor
def _estimate_power_usage(self) -> float:
"""Estimate power usage in Watts."""
# Simple estimation based on dongle series and count
dongle_series = self.get_property('dongle_series')
num_dongles = self.get_property('num_dongles')
power_per_dongle = {
'520': 2.5,
'720': 3.5,
'1080': 5.0,
'Custom': 4.0
}
return power_per_dongle.get(dongle_series, 4.0) * num_dongles