""" MFlow to API Converter This module converts .mflow pipeline files from the UI app into the API format required by MultiDongle and InferencePipeline components. Key Features: - Parse .mflow JSON files - Convert UI node properties to API configurations - Generate StageConfig objects for InferencePipeline - Handle pipeline topology and stage ordering - Validate configurations and provide helpful error messages Usage: from mflow_converter import MFlowConverter converter = MFlowConverter() pipeline_config = converter.load_and_convert("pipeline.mflow") # Use with InferencePipeline inference_pipeline = InferencePipeline(pipeline_config.stage_configs) """ import json import os from typing import List, Dict, Any, Tuple from dataclasses import dataclass from InferencePipeline import StageConfig, InferencePipeline class DefaultProcessors: """Default preprocessing and postprocessing functions""" @staticmethod def resize_and_normalize(frame, target_size=(640, 480), normalize=True): """Default resize and normalize function""" import cv2 import numpy as np # Resize resized = cv2.resize(frame, target_size) # Normalize if requested if normalize: resized = resized.astype(np.float32) / 255.0 return resized @staticmethod def bgr_to_rgb(frame): """Convert BGR to RGB""" import cv2 return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) @staticmethod def format_detection_output(results, confidence_threshold=0.5): """Format detection results""" formatted = [] for result in results: if result.get('confidence', 0) >= confidence_threshold: formatted.append({ 'class': result.get('class', 'unknown'), 'confidence': result.get('confidence', 0), 'bbox': result.get('bbox', [0, 0, 0, 0]) }) return formatted @dataclass class PipelineConfig: """Complete pipeline configuration ready for API use""" stage_configs: List[StageConfig] pipeline_name: str description: str input_config: Dict[str, Any] output_config: Dict[str, Any] preprocessing_configs: List[Dict[str, Any]] postprocessing_configs: List[Dict[str, Any]] class MFlowConverter: """Convert .mflow files to API configurations""" def __init__(self, default_fw_path: str = "./firmware"): """ Initialize converter Args: default_fw_path: Default path for firmware files if not specified """ self.default_fw_path = default_fw_path self.node_id_map = {} # Map node IDs to node objects self.stage_order = [] # Ordered list of model nodes (stages) def load_and_convert(self, mflow_file_path: str) -> PipelineConfig: """ Load .mflow file and convert to API configuration Args: mflow_file_path: Path to .mflow file Returns: PipelineConfig object ready for API use Raises: FileNotFoundError: If .mflow file doesn't exist ValueError: If .mflow format is invalid RuntimeError: If conversion fails """ if not os.path.exists(mflow_file_path): raise FileNotFoundError(f"MFlow file not found: {mflow_file_path}") with open(mflow_file_path, 'r', encoding='utf-8') as f: mflow_data = json.load(f) return self._convert_mflow_to_config(mflow_data) def _convert_mflow_to_config(self, mflow_data: Dict[str, Any]) -> PipelineConfig: """Convert loaded .mflow data to PipelineConfig""" # Extract basic metadata pipeline_name = mflow_data.get('project_name', 'Converted Pipeline') description = mflow_data.get('description', '') nodes = mflow_data.get('nodes', []) connections = mflow_data.get('connections', []) # Build node lookup and categorize nodes self._build_node_map(nodes) model_nodes, input_nodes, output_nodes, preprocess_nodes, postprocess_nodes = self._categorize_nodes() # Determine stage order based on connections self._determine_stage_order(model_nodes, connections) # Convert to StageConfig objects stage_configs = self._create_stage_configs(model_nodes, preprocess_nodes, postprocess_nodes, connections) # Extract input/output configurations input_config = self._extract_input_config(input_nodes) output_config = self._extract_output_config(output_nodes) # Extract preprocessing/postprocessing configurations preprocessing_configs = self._extract_preprocessing_configs(preprocess_nodes) postprocessing_configs = self._extract_postprocessing_configs(postprocess_nodes) return PipelineConfig( stage_configs=stage_configs, pipeline_name=pipeline_name, description=description, input_config=input_config, output_config=output_config, preprocessing_configs=preprocessing_configs, postprocessing_configs=postprocessing_configs ) def _build_node_map(self, nodes: List[Dict[str, Any]]): """Build lookup map for nodes by ID""" self.node_id_map = {node['id']: node for node in nodes} def _categorize_nodes(self) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], List[Dict]]: """Categorize nodes by type""" model_nodes = [] input_nodes = [] output_nodes = [] preprocess_nodes = [] postprocess_nodes = [] for node in self.node_id_map.values(): node_type = node.get('type', '').lower() if 'model' in node_type: model_nodes.append(node) elif 'input' in node_type: input_nodes.append(node) elif 'output' in node_type: output_nodes.append(node) elif 'preprocess' in node_type: preprocess_nodes.append(node) elif 'postprocess' in node_type: postprocess_nodes.append(node) return model_nodes, input_nodes, output_nodes, preprocess_nodes, postprocess_nodes def _determine_stage_order(self, model_nodes: List[Dict], connections: List[Dict]): """ Advanced Topological Sorting Algorithm Analyzes connection dependencies to determine optimal pipeline execution order. Features: - Cycle detection and prevention - Parallel stage identification - Dependency depth analysis - Pipeline efficiency optimization """ print("Starting intelligent pipeline topology analysis...") # Build dependency graph dependency_graph = self._build_dependency_graph(model_nodes, connections) # Detect and handle cycles cycles = self._detect_cycles(dependency_graph) if cycles: print(f"Warning: Detected {len(cycles)} dependency cycles!") dependency_graph = self._resolve_cycles(dependency_graph, cycles) # Perform topological sort with parallel optimization sorted_stages = self._topological_sort_with_optimization(dependency_graph, model_nodes) # Calculate and display pipeline metrics metrics = self._calculate_pipeline_metrics(sorted_stages, dependency_graph) self._display_pipeline_analysis(sorted_stages, metrics) self.stage_order = sorted_stages def _build_dependency_graph(self, model_nodes: List[Dict], connections: List[Dict]) -> Dict[str, Dict]: """Build dependency graph from connections""" print(" Building dependency graph...") # Initialize graph with all model nodes graph = {} node_id_to_model = {node['id']: node for node in model_nodes} for node in model_nodes: graph[node['id']] = { 'node': node, 'dependencies': set(), # What this node depends on 'dependents': set(), # What depends on this node 'depth': 0, # Distance from input 'parallel_group': 0 # For parallel execution grouping } # Analyze connections to build dependencies for conn in connections: output_node_id = conn.get('output_node') input_node_id = conn.get('input_node') # Only consider connections between model nodes if output_node_id in graph and input_node_id in graph: graph[input_node_id]['dependencies'].add(output_node_id) graph[output_node_id]['dependents'].add(input_node_id) print(f" Graph built: {len(graph)} model nodes, {len([c for c in connections if c.get('output_node') in graph and c.get('input_node') in graph])} dependencies") return graph def _detect_cycles(self, graph: Dict[str, Dict]) -> List[List[str]]: """Detect dependency cycles using DFS""" print(" Checking for dependency cycles...") cycles = [] visited = set() rec_stack = set() def dfs_cycle_detect(node_id, path): if node_id in rec_stack: # Found cycle - extract the cycle from path cycle_start = path.index(node_id) cycle = path[cycle_start:] + [node_id] cycles.append(cycle) return True if node_id in visited: return False visited.add(node_id) rec_stack.add(node_id) path.append(node_id) for dependent in graph[node_id]['dependents']: if dfs_cycle_detect(dependent, path): return True path.pop() rec_stack.remove(node_id) return False for node_id in graph: if node_id not in visited: dfs_cycle_detect(node_id, []) if cycles: print(f" Warning: Found {len(cycles)} cycles") else: print(" No cycles detected") return cycles def _resolve_cycles(self, graph: Dict[str, Dict], cycles: List[List[str]]) -> Dict[str, Dict]: """Resolve dependency cycles by breaking weakest links""" print(" Resolving dependency cycles...") for cycle in cycles: print(f" Breaking cycle: {' → '.join([graph[nid]['node']['name'] for nid in cycle])}") # Find the "weakest" dependency to break (arbitrary for now) # In a real implementation, this could be based on model complexity, processing time, etc. if len(cycle) >= 2: node_to_break = cycle[-2] # Break the last dependency dependent_to_break = cycle[-1] graph[dependent_to_break]['dependencies'].discard(node_to_break) graph[node_to_break]['dependents'].discard(dependent_to_break) print(f" Broke dependency: {graph[node_to_break]['node']['name']} → {graph[dependent_to_break]['node']['name']}") return graph def _topological_sort_with_optimization(self, graph: Dict[str, Dict], model_nodes: List[Dict]) -> List[Dict]: """Advanced topological sort with parallel optimization""" print(" Performing optimized topological sort...") # Calculate depth levels for each node self._calculate_depth_levels(graph) # Group nodes by depth for parallel execution depth_groups = self._group_by_depth(graph) # Sort within each depth group by optimization criteria sorted_nodes = [] for depth in sorted(depth_groups.keys()): group_nodes = depth_groups[depth] # Sort by complexity/priority within the same depth group_nodes.sort(key=lambda nid: ( len(graph[nid]['dependencies']), # Fewer dependencies first -len(graph[nid]['dependents']), # More dependents first (critical path) graph[nid]['node']['name'] # Stable sort by name )) for node_id in group_nodes: sorted_nodes.append(graph[node_id]['node']) print(f" Sorted {len(sorted_nodes)} stages into {len(depth_groups)} execution levels") return sorted_nodes def _calculate_depth_levels(self, graph: Dict[str, Dict]): """Calculate depth levels using dynamic programming""" print(" Calculating execution depth levels...") # Find nodes with no dependencies (starting points) no_deps = [nid for nid, data in graph.items() if not data['dependencies']] # BFS to calculate depths from collections import deque queue = deque([(nid, 0) for nid in no_deps]) while queue: node_id, depth = queue.popleft() if graph[node_id]['depth'] < depth: graph[node_id]['depth'] = depth # Update dependents for dependent in graph[node_id]['dependents']: queue.append((dependent, depth + 1)) def _group_by_depth(self, graph: Dict[str, Dict]) -> Dict[int, List[str]]: """Group nodes by execution depth for parallel processing""" depth_groups = {} for node_id, data in graph.items(): depth = data['depth'] if depth not in depth_groups: depth_groups[depth] = [] depth_groups[depth].append(node_id) return depth_groups def _calculate_pipeline_metrics(self, sorted_stages: List[Dict], graph: Dict[str, Dict]) -> Dict[str, Any]: """Calculate pipeline performance metrics""" print(" Calculating pipeline metrics...") total_stages = len(sorted_stages) max_depth = max([data['depth'] for data in graph.values()]) + 1 if graph else 1 # Calculate parallelization potential depth_distribution = {} for data in graph.values(): depth = data['depth'] depth_distribution[depth] = depth_distribution.get(depth, 0) + 1 max_parallel = max(depth_distribution.values()) if depth_distribution else 1 avg_parallel = sum(depth_distribution.values()) / len(depth_distribution) if depth_distribution else 1 # Calculate critical path critical_path = self._find_critical_path(graph) metrics = { 'total_stages': total_stages, 'pipeline_depth': max_depth, 'max_parallel_stages': max_parallel, 'avg_parallel_stages': avg_parallel, 'parallelization_efficiency': (total_stages / max_depth) if max_depth > 0 else 1.0, 'critical_path_length': len(critical_path), 'critical_path': critical_path } return metrics def _find_critical_path(self, graph: Dict[str, Dict]) -> List[str]: """Find the critical path (longest dependency chain)""" longest_path = [] def dfs_longest_path(node_id, current_path): nonlocal longest_path current_path.append(node_id) if not graph[node_id]['dependents']: # Leaf node - check if this is the longest path if len(current_path) > len(longest_path): longest_path = current_path.copy() else: for dependent in graph[node_id]['dependents']: dfs_longest_path(dependent, current_path) current_path.pop() # Start from nodes with no dependencies for node_id, data in graph.items(): if not data['dependencies']: dfs_longest_path(node_id, []) return longest_path def _display_pipeline_analysis(self, sorted_stages: List[Dict], metrics: Dict[str, Any]): """Display pipeline analysis results""" print("\n" + "="*60) print("INTELLIGENT PIPELINE TOPOLOGY ANALYSIS COMPLETE") print("="*60) print(f"Pipeline Metrics:") print(f" Total Stages: {metrics['total_stages']}") print(f" Pipeline Depth: {metrics['pipeline_depth']} levels") print(f" Max Parallel Stages: {metrics['max_parallel_stages']}") print(f" Parallelization Efficiency: {metrics['parallelization_efficiency']:.1%}") print(f"\nOptimized Execution Order:") for i, stage in enumerate(sorted_stages, 1): print(f" {i:2d}. {stage['name']} (ID: {stage['id'][:8]}...)") if metrics['critical_path']: print(f"\nCritical Path ({metrics['critical_path_length']} stages):") critical_names = [] for node_id in metrics['critical_path']: node_name = next((stage['name'] for stage in sorted_stages if stage['id'] == node_id), 'Unknown') critical_names.append(node_name) print(f" {' → '.join(critical_names)}") print(f"\nPerformance Insights:") if metrics['parallelization_efficiency'] > 0.8: print(" Excellent parallelization potential!") elif metrics['parallelization_efficiency'] > 0.6: print(" Good parallelization opportunities available") else: print(" Limited parallelization - consider pipeline redesign") if metrics['pipeline_depth'] <= 3: print(" Low latency pipeline - great for real-time applications") elif metrics['pipeline_depth'] <= 6: print(" Balanced pipeline depth - good throughput/latency trade-off") else: print(" Deep pipeline - optimized for maximum throughput") print("="*60 + "\n") def _create_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict], postprocess_nodes: List[Dict], connections: List[Dict]) -> List[StageConfig]: """Create StageConfig objects for each model node""" # Note: preprocess_nodes, postprocess_nodes, connections reserved for future enhanced processing stage_configs = [] for i, model_node in enumerate(self.stage_order): properties = model_node.get('properties', {}) # Extract configuration from UI properties stage_id = f"stage_{i+1}_{model_node.get('name', 'unknown').replace(' ', '_')}" # Convert port_id to list format port_id_str = properties.get('port_id', '').strip() if port_id_str: try: # Handle comma-separated port IDs port_ids = [int(p.strip()) for p in port_id_str.split(',') if p.strip()] except ValueError: print(f"Warning: Invalid port_id format '{port_id_str}', using default [28]") port_ids = [28] # Default port else: port_ids = [28] # Default port # Model path model_path = properties.get('model_path', '') if not model_path: print(f"Warning: No model_path specified for {model_node.get('name')}") # Firmware paths from UI properties scpu_fw_path = properties.get('scpu_fw_path', os.path.join(self.default_fw_path, 'fw_scpu.bin')) ncpu_fw_path = properties.get('ncpu_fw_path', os.path.join(self.default_fw_path, 'fw_ncpu.bin')) # Upload firmware flag upload_fw = properties.get('upload_fw', False) # Queue size max_queue_size = properties.get('max_queue_size', 50) # Create StageConfig stage_config = StageConfig( stage_id=stage_id, port_ids=port_ids, scpu_fw_path=scpu_fw_path, ncpu_fw_path=ncpu_fw_path, model_path=model_path, upload_fw=upload_fw, max_queue_size=max_queue_size ) stage_configs.append(stage_config) return stage_configs def _extract_input_config(self, input_nodes: List[Dict]) -> Dict[str, Any]: """Extract input configuration from input nodes""" if not input_nodes: return {} # Use the first input node input_node = input_nodes[0] properties = input_node.get('properties', {}) return { 'source_type': properties.get('source_type', 'Camera'), 'device_id': properties.get('device_id', 0), 'source_path': properties.get('source_path', ''), 'resolution': properties.get('resolution', '1920x1080'), 'fps': properties.get('fps', 30) } def _extract_output_config(self, output_nodes: List[Dict]) -> Dict[str, Any]: """Extract output configuration from output nodes""" if not output_nodes: return {} # Use the first output node output_node = output_nodes[0] properties = output_node.get('properties', {}) return { 'output_type': properties.get('output_type', 'File'), 'format': properties.get('format', 'JSON'), 'destination': properties.get('destination', ''), 'save_interval': properties.get('save_interval', 1.0) } def _extract_preprocessing_configs(self, preprocess_nodes: List[Dict]) -> List[Dict[str, Any]]: """Extract preprocessing configurations""" configs = [] for node in preprocess_nodes: properties = node.get('properties', {}) config = { 'resize_width': properties.get('resize_width', 640), 'resize_height': properties.get('resize_height', 480), 'normalize': properties.get('normalize', True), 'crop_enabled': properties.get('crop_enabled', False), 'operations': properties.get('operations', 'resize,normalize') } configs.append(config) return configs def _extract_postprocessing_configs(self, postprocess_nodes: List[Dict]) -> List[Dict[str, Any]]: """Extract postprocessing configurations""" configs = [] for node in postprocess_nodes: properties = node.get('properties', {}) config = { 'output_format': properties.get('output_format', 'JSON'), 'confidence_threshold': properties.get('confidence_threshold', 0.5), 'nms_threshold': properties.get('nms_threshold', 0.4), 'max_detections': properties.get('max_detections', 100) } configs.append(config) return configs def create_inference_pipeline(self, config: PipelineConfig) -> InferencePipeline: """ Create InferencePipeline instance from PipelineConfig Args: config: PipelineConfig object Returns: Configured InferencePipeline instance """ return InferencePipeline( stage_configs=config.stage_configs, pipeline_name=config.pipeline_name ) def validate_config(self, config: PipelineConfig) -> Tuple[bool, List[str]]: """ Validate pipeline configuration Args: config: PipelineConfig to validate Returns: (is_valid, error_messages) """ errors = [] # Check if we have at least one stage if not config.stage_configs: errors.append("Pipeline must have at least one stage (model node)") # Validate each stage config for i, stage_config in enumerate(config.stage_configs): stage_errors = self._validate_stage_config(stage_config, i+1) errors.extend(stage_errors) return len(errors) == 0, errors def _validate_stage_config(self, stage_config: StageConfig, stage_num: int) -> List[str]: """Validate individual stage configuration""" errors = [] # Check model path if not stage_config.model_path: errors.append(f"Stage {stage_num}: Model path is required") elif not os.path.exists(stage_config.model_path): errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}") # Check firmware paths if upload_fw is True if stage_config.upload_fw: if not os.path.exists(stage_config.scpu_fw_path): errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}") if not os.path.exists(stage_config.ncpu_fw_path): errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}") # Check port IDs if not stage_config.port_ids: errors.append(f"Stage {stage_num}: At least one port ID is required") return errors def convert_mflow_file(mflow_path: str, firmware_path: str = "./firmware") -> PipelineConfig: """ Convenience function to convert a .mflow file Args: mflow_path: Path to .mflow file firmware_path: Path to firmware directory Returns: PipelineConfig ready for API use """ converter = MFlowConverter(default_fw_path=firmware_path) return converter.load_and_convert(mflow_path) if __name__ == "__main__": # Example usage import sys if len(sys.argv) < 2: print("Usage: python mflow_converter.py [firmware_path]") sys.exit(1) mflow_file = sys.argv[1] firmware_path = sys.argv[2] if len(sys.argv) > 2 else "./firmware" try: converter = MFlowConverter(default_fw_path=firmware_path) config = converter.load_and_convert(mflow_file) print(f"Converted pipeline: {config.pipeline_name}") print(f"Stages: {len(config.stage_configs)}") # Validate configuration is_valid, errors = converter.validate_config(config) if is_valid: print("✓ Configuration is valid") # Create pipeline instance pipeline = converter.create_inference_pipeline(config) print(f"✓ InferencePipeline created: {pipeline.pipeline_name}") else: print("✗ Configuration has errors:") for error in errors: print(f" - {error}") except Exception as e: print(f"Error: {e}") sys.exit(1)