""" Integrated pipeline dashboard for the Cluster4NPU UI application. This module provides the main dashboard window that combines pipeline editing, stage configuration, performance estimation, and dongle management in a unified interface with a 3-panel layout. Main Components: - IntegratedPipelineDashboard: Main dashboard window - Node template palette for pipeline design - Dynamic property editing panels - Performance estimation and hardware management - Pipeline save/load functionality Usage: from cluster4npu_ui.ui.windows.dashboard import IntegratedPipelineDashboard dashboard = IntegratedPipelineDashboard() dashboard.show() """ import sys import json import os from typing import Optional, Dict, Any, List from PyQt5.QtWidgets import ( QMainWindow, QVBoxLayout, QHBoxLayout, QWidget, QLineEdit, QPushButton, QLabel, QSpinBox, QDoubleSpinBox, QComboBox, QListWidget, QCheckBox, QSplitter, QAction, QScrollArea, QTabWidget, QTableWidget, QTableWidgetItem, QHeaderView, QProgressBar, QGroupBox, QGridLayout, QFrame, QTextBrowser, QSizePolicy, QMessageBox, QFileDialog, QFormLayout, QToolBar, QStatusBar ) from PyQt5.QtCore import Qt, pyqtSignal, QTimer from PyQt5.QtGui import QFont try: from NodeGraphQt import NodeGraph NODEGRAPH_AVAILABLE = True except ImportError: NODEGRAPH_AVAILABLE = False print("Warning: NodeGraphQt not available. Pipeline editor will be disabled.") from cluster4npu_ui.config.theme import HARMONIOUS_THEME_STYLESHEET from cluster4npu_ui.config.settings import get_settings try: from cluster4npu_ui.core.nodes import ( InputNode, ModelNode, PreprocessNode, PostprocessNode, OutputNode, NODE_TYPES, create_node_property_widget ) ADVANCED_NODES_AVAILABLE = True except ImportError: ADVANCED_NODES_AVAILABLE = False # Use exact nodes that match original properties from cluster4npu_ui.core.nodes.exact_nodes import ( ExactInputNode, ExactModelNode, ExactPreprocessNode, ExactPostprocessNode, ExactOutputNode, EXACT_NODE_TYPES ) # Import pipeline analysis functions try: from cluster4npu_ui.core.pipeline import get_stage_count, analyze_pipeline_stages, get_pipeline_summary except ImportError: # Fallback functions if not available def get_stage_count(graph): return 0 def analyze_pipeline_stages(graph): return {} def get_pipeline_summary(graph): return {'stage_count': 0, 'valid': True, 'error': '', 'total_nodes': 0, 'model_nodes': 0, 'input_nodes': 0, 'output_nodes': 0, 'preprocess_nodes': 0, 'postprocess_nodes': 0, 'stages': []} class StageCountWidget(QWidget): """Widget to display stage count information in the pipeline editor.""" def __init__(self, parent=None): super().__init__(parent) self.stage_count = 0 self.pipeline_valid = True self.pipeline_error = "" self.setup_ui() self.setFixedSize(120, 22) def setup_ui(self): """Setup the stage count widget UI.""" layout = QHBoxLayout() layout.setContentsMargins(5, 2, 5, 2) # Stage count label only (compact version) self.stage_label = QLabel("Stages: 0") self.stage_label.setFont(QFont("Arial", 10, QFont.Bold)) self.stage_label.setStyleSheet("color: #cdd6f4; font-weight: bold;") layout.addWidget(self.stage_label) self.setLayout(layout) # Style the widget for status bar - ensure it's visible self.setStyleSheet(""" StageCountWidget { background-color: transparent; border: none; } """) # Ensure the widget is visible self.setVisible(True) self.stage_label.setVisible(True) def update_stage_count(self, count: int, valid: bool = True, error: str = ""): """Update the stage count display.""" self.stage_count = count self.pipeline_valid = valid self.pipeline_error = error # Update stage count with status indication if not valid: self.stage_label.setText(f"Stages: {count}") self.stage_label.setStyleSheet("color: #f38ba8; font-weight: bold;") else: if count == 0: self.stage_label.setText("Stages: 0") self.stage_label.setStyleSheet("color: #f9e2af; font-weight: bold;") else: self.stage_label.setText(f"Stages: {count}") self.stage_label.setStyleSheet("color: #a6e3a1; font-weight: bold;") class IntegratedPipelineDashboard(QMainWindow): """ Integrated dashboard combining pipeline editor, stage configuration, and performance estimation. This is the main application window that provides a comprehensive interface for designing, configuring, and managing ML inference pipelines. """ # Signals pipeline_modified = pyqtSignal() node_selected = pyqtSignal(object) pipeline_changed = pyqtSignal() stage_count_changed = pyqtSignal(int) def __init__(self, project_name: str = "", description: str = "", filename: Optional[str] = None): super().__init__() # Project information self.project_name = project_name or "Untitled Pipeline" self.description = description self.current_file = filename self.is_modified = False # Settings self.settings = get_settings() # Initialize UI components that will be created later self.props_instructions = None self.node_props_container = None self.node_props_layout = None self.fps_label = None self.latency_label = None self.memory_label = None self.suggestions_text = None self.dongles_list = None self.detected_devices = [] # Store detected device information self.stage_count_widget = None self.analysis_timer = None self.previous_stage_count = 0 self.stats_label = None # Initialize node graph if available if NODEGRAPH_AVAILABLE: self.setup_node_graph() else: self.graph = None # Setup UI self.setup_integrated_ui() self.setup_menu() self.setup_shortcuts() self.setup_analysis_timer() # Apply styling and configure window self.apply_styling() self.update_window_title() self.setGeometry(50, 50, 1400, 900) # Connect signals self.pipeline_changed.connect(self.analyze_pipeline) # Initial analysis print("๐Ÿš€ Pipeline Dashboard initialized") self.analyze_pipeline() # Set up a timer to hide UI elements after initialization self.ui_cleanup_timer = QTimer() self.ui_cleanup_timer.setSingleShot(True) self.ui_cleanup_timer.timeout.connect(self.cleanup_node_graph_ui) self.ui_cleanup_timer.start(1000) # 1 second delay def setup_node_graph(self): """Initialize the node graph system.""" try: self.graph = NodeGraph() # Configure NodeGraphQt to hide unwanted UI elements viewer = self.graph.viewer() if viewer: # Hide the logo/icon in bottom left corner if hasattr(viewer, 'set_logo_visible'): viewer.set_logo_visible(False) elif hasattr(viewer, 'show_logo'): viewer.show_logo(False) # Try to hide grid if hasattr(viewer, 'set_grid_mode'): viewer.set_grid_mode(0) # 0 = no grid elif hasattr(viewer, 'grid_mode'): viewer.grid_mode = 0 # Try to hide navigation widget/toolbar if hasattr(viewer, 'set_nav_widget_visible'): viewer.set_nav_widget_visible(False) elif hasattr(viewer, 'navigation_widget'): nav_widget = viewer.navigation_widget() if nav_widget: nav_widget.setVisible(False) # Try to hide any other UI elements if hasattr(viewer, 'set_minimap_visible'): viewer.set_minimap_visible(False) # Hide menu bar if exists if hasattr(viewer, 'set_menu_bar_visible'): viewer.set_menu_bar_visible(False) # Try to hide any toolbar elements widget = viewer.widget if hasattr(viewer, 'widget') else None if widget: # Find and hide toolbar-like children from PyQt5.QtWidgets import QToolBar, QFrame, QWidget for child in widget.findChildren(QToolBar): child.setVisible(False) # Look for other UI widgets that might be the horizontal bar for child in widget.findChildren(QFrame): # Check if this might be the navigation bar if hasattr(child, 'objectName') and 'nav' in child.objectName().lower(): child.setVisible(False) # Check size and position to identify the horizontal bar elif hasattr(child, 'geometry'): geom = child.geometry() # If it's a horizontal bar at the bottom left if geom.height() < 50 and geom.width() > 100: child.setVisible(False) # Additional attempt to hide navigation elements for child in widget.findChildren(QWidget): if hasattr(child, 'objectName'): obj_name = child.objectName().lower() if any(keyword in obj_name for keyword in ['nav', 'toolbar', 'control', 'zoom']): child.setVisible(False) # Use exact nodes that match original properties nodes_to_register = [ ExactInputNode, ExactModelNode, ExactPreprocessNode, ExactPostprocessNode, ExactOutputNode ] print("Registering nodes with NodeGraphQt...") for node_class in nodes_to_register: try: self.graph.register_node(node_class) print(f"โœ“ Registered {node_class.__name__} with identifier {node_class.__identifier__}") except Exception as e: print(f"โœ— Failed to register {node_class.__name__}: {e}") # Connect signals self.graph.node_created.connect(self.mark_modified) self.graph.nodes_deleted.connect(self.mark_modified) self.graph.node_selection_changed.connect(self.on_node_selection_changed) # Connect pipeline analysis signals self.graph.node_created.connect(self.schedule_analysis) self.graph.nodes_deleted.connect(self.schedule_analysis) if hasattr(self.graph, 'connection_changed'): self.graph.connection_changed.connect(self.schedule_analysis) if hasattr(self.graph, 'property_changed'): self.graph.property_changed.connect(self.mark_modified) print("Node graph setup completed successfully") except Exception as e: print(f"Error setting up node graph: {e}") import traceback traceback.print_exc() self.graph = None def cleanup_node_graph_ui(self): """Clean up NodeGraphQt UI elements after initialization.""" if not self.graph: return try: viewer = self.graph.viewer() if viewer: widget = viewer.widget if hasattr(viewer, 'widget') else None if widget: print("๐Ÿงน Cleaning up NodeGraphQt UI elements...") # More aggressive cleanup - hide all small widgets at bottom from PyQt5.QtWidgets import QWidget, QFrame, QLabel, QPushButton from PyQt5.QtCore import QRect for child in widget.findChildren(QWidget): if hasattr(child, 'geometry'): geom = child.geometry() parent_geom = widget.geometry() # Check if it's a small widget at the bottom left if (geom.height() < 100 and geom.width() < 200 and geom.y() > parent_geom.height() - 100 and geom.x() < 200): print(f"๐Ÿ—‘๏ธ Hiding bottom-left widget: {child.__class__.__name__}") child.setVisible(False) # Also try to hide by CSS styling try: widget.setStyleSheet(widget.styleSheet() + """ QWidget[objectName*="nav"] { display: none; } QWidget[objectName*="toolbar"] { display: none; } QWidget[objectName*="control"] { display: none; } QFrame[objectName*="zoom"] { display: none; } """) except: pass except Exception as e: print(f"Error cleaning up NodeGraphQt UI: {e}") def setup_integrated_ui(self): """Setup the integrated UI with node templates, pipeline editor and configuration panels.""" central_widget = QWidget() self.setCentralWidget(central_widget) # Main layout with status bar at bottom main_layout = QVBoxLayout(central_widget) main_layout.setContentsMargins(0, 0, 0, 0) main_layout.setSpacing(0) # Main horizontal splitter with 3 panels main_splitter = QSplitter(Qt.Horizontal) # Left side: Node Template Panel (25% width) left_panel = self.create_node_template_panel() left_panel.setMinimumWidth(250) left_panel.setMaximumWidth(350) # Middle: Pipeline Editor (50% width) - without its own status bar middle_panel = self.create_pipeline_editor_panel() # Right side: Configuration panels (25% width) right_panel = self.create_configuration_panel() right_panel.setMinimumWidth(300) right_panel.setMaximumWidth(400) # Add widgets to splitter main_splitter.addWidget(left_panel) main_splitter.addWidget(middle_panel) main_splitter.addWidget(right_panel) main_splitter.setSizes([300, 700, 400]) # 25-50-25 split # Add splitter to main layout main_layout.addWidget(main_splitter) # Add global status bar at the bottom self.global_status_bar = self.create_status_bar_widget() main_layout.addWidget(self.global_status_bar) def create_node_template_panel(self) -> QWidget: """Create left panel with node templates.""" panel = QWidget() layout = QVBoxLayout(panel) layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(10) # Header header = QLabel("Node Templates") header.setStyleSheet("color: #f9e2af; font-size: 16px; font-weight: bold; padding: 10px;") layout.addWidget(header) # Node template buttons - use exact nodes matching original nodes_info = [ ("Input Node", "Data input source", ExactInputNode), ("Model Node", "AI inference model", ExactModelNode), ("Preprocess Node", "Data preprocessing", ExactPreprocessNode), ("Postprocess Node", "Output processing", ExactPostprocessNode), ("Output Node", "Final output", ExactOutputNode) ] for name, description, node_class in nodes_info: # Create container for each node type node_container = QFrame() node_container.setStyleSheet(""" QFrame { background-color: #313244; border: 2px solid #45475a; border-radius: 8px; padding: 5px; } QFrame:hover { border-color: #89b4fa; background-color: #383a59; } """) container_layout = QVBoxLayout(node_container) container_layout.setContentsMargins(8, 8, 8, 8) container_layout.setSpacing(4) # Node name name_label = QLabel(name) name_label.setStyleSheet("color: #cdd6f4; font-weight: bold; font-size: 12px;") container_layout.addWidget(name_label) # Description desc_label = QLabel(description) desc_label.setStyleSheet("color: #a6adc8; font-size: 10px;") desc_label.setWordWrap(True) container_layout.addWidget(desc_label) # Add button add_btn = QPushButton("+ Add") add_btn.setStyleSheet(""" QPushButton { background-color: #89b4fa; color: #1e1e2e; border: none; padding: 4px 8px; border-radius: 4px; font-size: 10px; font-weight: bold; } QPushButton:hover { background-color: #a6c8ff; } QPushButton:pressed { background-color: #7287fd; } """) add_btn.clicked.connect(lambda checked, nc=node_class: self.add_node_to_graph(nc)) container_layout.addWidget(add_btn) layout.addWidget(node_container) # Pipeline Operations Section operations_label = QLabel("Pipeline Operations") operations_label.setStyleSheet("color: #f9e2af; font-size: 14px; font-weight: bold; padding: 10px;") layout.addWidget(operations_label) # Create operation buttons operations = [ ("Validate Pipeline", self.validate_pipeline), ("Clear Pipeline", self.clear_pipeline), ] for name, handler in operations: btn = QPushButton(name) btn.setStyleSheet(""" QPushButton { background-color: #45475a; color: #cdd6f4; border: 1px solid #585b70; border-radius: 6px; padding: 8px 12px; font-size: 11px; font-weight: bold; margin: 2px; } QPushButton:hover { background-color: #585b70; border-color: #89b4fa; } QPushButton:pressed { background-color: #313244; } """) btn.clicked.connect(handler) layout.addWidget(btn) # Add stretch to push everything to top layout.addStretch() # Instructions instructions = QLabel("Click 'Add' to insert nodes into the pipeline editor") instructions.setStyleSheet(""" color: #f9e2af; font-size: 10px; padding: 10px; background-color: #313244; border-radius: 6px; border-left: 3px solid #89b4fa; """) instructions.setWordWrap(True) layout.addWidget(instructions) return panel def create_pipeline_editor_panel(self) -> QWidget: """Create the middle panel with pipeline editor.""" panel = QWidget() layout = QVBoxLayout(panel) layout.setContentsMargins(5, 5, 5, 5) # Header header = QLabel("Pipeline Editor") header.setStyleSheet("color: #f9e2af; font-size: 16px; font-weight: bold; padding: 10px;") layout.addWidget(header) if self.graph and NODEGRAPH_AVAILABLE: # Add the node graph widget directly graph_widget = self.graph.widget graph_widget.setMinimumHeight(400) layout.addWidget(graph_widget) else: # Fallback: show placeholder placeholder = QLabel("Pipeline Editor\n(NodeGraphQt not available)") placeholder.setStyleSheet(""" color: #6c7086; font-size: 14px; padding: 40px; background-color: #313244; border-radius: 8px; border: 2px dashed #45475a; """) placeholder.setAlignment(Qt.AlignCenter) layout.addWidget(placeholder) return panel def create_pipeline_toolbar(self) -> QToolBar: """Create toolbar for pipeline operations.""" toolbar = QToolBar("Pipeline Operations") toolbar.setStyleSheet(""" QToolBar { background-color: #313244; border: 1px solid #45475a; spacing: 5px; padding: 5px; } QToolBar QAction { padding: 5px 10px; margin: 2px; border: 1px solid #45475a; border-radius: 3px; background-color: #45475a; color: #cdd6f4; } QToolBar QAction:hover { background-color: #585b70; } """) # Add nodes actions add_input_action = QAction("Add Input", self) add_input_action.triggered.connect(lambda: self.add_node_to_graph(ExactInputNode)) toolbar.addAction(add_input_action) add_model_action = QAction("Add Model", self) add_model_action.triggered.connect(lambda: self.add_node_to_graph(ExactModelNode)) toolbar.addAction(add_model_action) add_preprocess_action = QAction("Add Preprocess", self) add_preprocess_action.triggered.connect(lambda: self.add_node_to_graph(ExactPreprocessNode)) toolbar.addAction(add_preprocess_action) add_postprocess_action = QAction("Add Postprocess", self) add_postprocess_action.triggered.connect(lambda: self.add_node_to_graph(ExactPostprocessNode)) toolbar.addAction(add_postprocess_action) add_output_action = QAction("Add Output", self) add_output_action.triggered.connect(lambda: self.add_node_to_graph(ExactOutputNode)) toolbar.addAction(add_output_action) toolbar.addSeparator() # Pipeline actions validate_action = QAction("Validate Pipeline", self) validate_action.triggered.connect(self.validate_pipeline) toolbar.addAction(validate_action) clear_action = QAction("Clear Pipeline", self) clear_action.triggered.connect(self.clear_pipeline) toolbar.addAction(clear_action) toolbar.addSeparator() # Deploy action deploy_action = QAction("Deploy Pipeline", self) deploy_action.setToolTip("Convert pipeline to executable format and deploy to dongles") deploy_action.triggered.connect(self.deploy_pipeline) deploy_action.setStyleSheet(""" QAction { background-color: #a6e3a1; color: #1e1e2e; font-weight: bold; } QAction:hover { background-color: #94d2a3; } """) toolbar.addAction(deploy_action) return toolbar def setup_analysis_timer(self): """Setup timer for pipeline analysis.""" self.analysis_timer = QTimer() self.analysis_timer.setSingleShot(True) self.analysis_timer.timeout.connect(self.analyze_pipeline) self.analysis_timer.setInterval(500) # 500ms delay def schedule_analysis(self): """Schedule pipeline analysis after a delay.""" if self.analysis_timer: self.analysis_timer.start() def analyze_pipeline(self): """Analyze the current pipeline and update stage count.""" if not self.graph: return try: # Get pipeline summary summary = get_pipeline_summary(self.graph) current_stage_count = summary['stage_count'] # Print detailed pipeline analysis self.print_pipeline_analysis(summary, current_stage_count) # Update stage count widget if self.stage_count_widget: print(f"๐Ÿ”„ Updating stage count widget: {current_stage_count} stages") self.stage_count_widget.update_stage_count( current_stage_count, summary['valid'], summary.get('error', '') ) # Update statistics label if hasattr(self, 'stats_label') and self.stats_label: total_nodes = summary['total_nodes'] # Count connections more accurately connection_count = 0 if self.graph: for node in self.graph.all_nodes(): try: if hasattr(node, 'output_ports'): for output_port in node.output_ports(): if hasattr(output_port, 'connected_ports'): connection_count += len(output_port.connected_ports()) elif hasattr(node, 'outputs'): for output in node.outputs(): if hasattr(output, 'connected_ports'): connection_count += len(output.connected_ports()) elif hasattr(output, 'connected_inputs'): connection_count += len(output.connected_inputs()) except Exception: # If there's any error accessing connections, skip this node continue self.stats_label.setText(f"Nodes: {total_nodes} | Connections: {connection_count}") # Update info panel (if it exists) if hasattr(self, 'info_text') and self.info_text: self.update_info_panel(summary) # Update previous count for next comparison self.previous_stage_count = current_stage_count # Emit signal self.stage_count_changed.emit(current_stage_count) except Exception as e: print(f"Pipeline analysis error: {str(e)}") if self.stage_count_widget: self.stage_count_widget.update_stage_count(0, False, f"Analysis error: {str(e)}") def print_pipeline_analysis(self, summary, current_stage_count): """Print detailed pipeline analysis to terminal.""" # Check if stage count changed if current_stage_count != self.previous_stage_count: if self.previous_stage_count == 0 and current_stage_count > 0: print(f"Initial stage count: {current_stage_count}") elif current_stage_count != self.previous_stage_count: change = current_stage_count - self.previous_stage_count if change > 0: print(f"Stage count increased: {self.previous_stage_count} โ†’ {current_stage_count} (+{change})") else: print(f"Stage count decreased: {self.previous_stage_count} โ†’ {current_stage_count} ({change})") # Always print current pipeline status for clarity print(f"Current Pipeline Status:") print(f" โ€ข Stages: {current_stage_count}") print(f" โ€ข Total Nodes: {summary['total_nodes']}") print(f" โ€ข Model Nodes: {summary['model_nodes']}") print(f" โ€ข Input Nodes: {summary['input_nodes']}") print(f" โ€ข Output Nodes: {summary['output_nodes']}") print(f" โ€ข Preprocess Nodes: {summary['preprocess_nodes']}") print(f" โ€ข Postprocess Nodes: {summary['postprocess_nodes']}") print(f" โ€ข Valid: {'V' if summary['valid'] else 'X'}") if not summary['valid'] and summary.get('error'): print(f" โ€ข Error: {summary['error']}") # Print stage details if available if summary.get('stages') and len(summary['stages']) > 0: print(f"Stage Details:") for i, stage in enumerate(summary['stages'], 1): model_name = stage['model_config'].get('node_name', 'Unknown Model') preprocess_count = len(stage['preprocess_configs']) postprocess_count = len(stage['postprocess_configs']) stage_info = f" Stage {i}: {model_name}" if preprocess_count > 0: stage_info += f" (with {preprocess_count} preprocess)" if postprocess_count > 0: stage_info += f" (with {postprocess_count} postprocess)" print(stage_info) elif current_stage_count > 0: print(f"{current_stage_count} stage(s) detected but details not available") print("โ”€" * 50) # Separator line def update_info_panel(self, summary): """Update the pipeline info panel with analysis results.""" # This method is kept for compatibility but no longer used # since we removed the separate info panel pass def clear_pipeline(self): """Clear the entire pipeline.""" if self.graph: print("Clearing entire pipeline...") self.graph.clear_session() self.schedule_analysis() def create_configuration_panel(self) -> QWidget: """Create the right panel with configuration tabs.""" panel = QWidget() layout = QVBoxLayout(panel) layout.setContentsMargins(5, 5, 5, 5) layout.setSpacing(10) # Create tabs for different configuration sections config_tabs = QTabWidget() config_tabs.setStyleSheet(""" QTabWidget::pane { border: 2px solid #45475a; border-radius: 8px; background-color: #313244; } QTabWidget::tab-bar { alignment: center; } QTabBar::tab { background-color: #45475a; color: #cdd6f4; padding: 6px 12px; margin: 1px; border-radius: 4px; font-size: 11px; } QTabBar::tab:selected { background-color: #89b4fa; color: #1e1e2e; font-weight: bold; } QTabBar::tab:hover { background-color: #585b70; } """) # Add tabs config_tabs.addTab(self.create_node_properties_panel(), "Properties") config_tabs.addTab(self.create_performance_panel(), "Performance") config_tabs.addTab(self.create_dongle_panel(), "Dongles") layout.addWidget(config_tabs) return panel def create_node_properties_panel(self) -> QWidget: """Create node properties editing panel.""" widget = QScrollArea() content = QWidget() layout = QVBoxLayout(content) # Header header = QLabel("Node Properties") header.setStyleSheet("color: #f9e2af; font-size: 14px; font-weight: bold; padding: 5px;") layout.addWidget(header) # Instructions when no node selected self.props_instructions = QLabel("Select a node in the pipeline editor to view and edit its properties") self.props_instructions.setStyleSheet(""" color: #a6adc8; font-size: 12px; padding: 20px; background-color: #313244; border-radius: 8px; border: 2px dashed #45475a; """) self.props_instructions.setWordWrap(True) self.props_instructions.setAlignment(Qt.AlignCenter) layout.addWidget(self.props_instructions) # Container for dynamic properties self.node_props_container = QWidget() self.node_props_layout = QVBoxLayout(self.node_props_container) layout.addWidget(self.node_props_container) # Initially hide the container self.node_props_container.setVisible(False) layout.addStretch() widget.setWidget(content) widget.setWidgetResizable(True) return widget def create_status_bar_widget(self) -> QWidget: """Create a global status bar widget for pipeline information.""" status_widget = QWidget() status_widget.setFixedHeight(28) status_widget.setStyleSheet(""" QWidget { background-color: #1e1e2e; border-top: 1px solid #45475a; margin: 0px; padding: 0px; } """) layout = QHBoxLayout(status_widget) layout.setContentsMargins(15, 3, 15, 3) layout.setSpacing(20) # Left side: Stage count display self.stage_count_widget = StageCountWidget() self.stage_count_widget.setFixedSize(120, 22) layout.addWidget(self.stage_count_widget) # Center spacer layout.addStretch() # Right side: Pipeline statistics self.stats_label = QLabel("Nodes: 0 | Connections: 0") self.stats_label.setStyleSheet("color: #a6adc8; font-size: 10px;") layout.addWidget(self.stats_label) return status_widget def create_performance_panel(self) -> QWidget: """Create performance estimation panel.""" widget = QScrollArea() content = QWidget() layout = QVBoxLayout(content) # Header header = QLabel("Performance Estimation") header.setStyleSheet("color: #f9e2af; font-size: 14px; font-weight: bold; padding: 5px;") layout.addWidget(header) # Performance metrics metrics_group = QGroupBox("Estimated Metrics") metrics_layout = QFormLayout(metrics_group) self.fps_label = QLabel("-- FPS") self.latency_label = QLabel("-- ms") self.memory_label = QLabel("-- MB") metrics_layout.addRow("Throughput:", self.fps_label) metrics_layout.addRow("Latency:", self.latency_label) metrics_layout.addRow("Memory Usage:", self.memory_label) layout.addWidget(metrics_group) # Suggestions suggestions_group = QGroupBox("Optimization Suggestions") suggestions_layout = QVBoxLayout(suggestions_group) self.suggestions_text = QTextBrowser() self.suggestions_text.setMaximumHeight(150) self.suggestions_text.setPlainText("Connect nodes to see performance analysis and optimization suggestions.") suggestions_layout.addWidget(self.suggestions_text) layout.addWidget(suggestions_group) # Deploy section deploy_group = QGroupBox("Pipeline Deployment") deploy_layout = QVBoxLayout(deploy_group) # Deploy button self.deploy_button = QPushButton("Deploy Pipeline") self.deploy_button.setToolTip("Convert pipeline to executable format and deploy to dongles") self.deploy_button.clicked.connect(self.deploy_pipeline) self.deploy_button.setStyleSheet(""" QPushButton { background-color: #a6e3a1; color: #1e1e2e; border: 2px solid #a6e3a1; border-radius: 8px; padding: 12px 24px; font-weight: bold; font-size: 14px; min-height: 20px; } QPushButton:hover { background-color: #94d2a3; border-color: #94d2a3; } QPushButton:pressed { background-color: #7dc4b0; border-color: #7dc4b0; } QPushButton:disabled { background-color: #6c7086; color: #45475a; border-color: #6c7086; } """) deploy_layout.addWidget(self.deploy_button) # Deployment status self.deployment_status = QLabel("Ready to deploy") self.deployment_status.setStyleSheet("color: #a6adc8; font-size: 11px; margin-top: 5px;") self.deployment_status.setAlignment(Qt.AlignCenter) deploy_layout.addWidget(self.deployment_status) layout.addWidget(deploy_group) layout.addStretch() widget.setWidget(content) widget.setWidgetResizable(True) return widget def create_dongle_panel(self) -> QWidget: """Create dongle management panel.""" widget = QScrollArea() content = QWidget() layout = QVBoxLayout(content) # Header header = QLabel("Dongle Management") header.setStyleSheet("color: #f9e2af; font-size: 14px; font-weight: bold; padding: 5px;") layout.addWidget(header) # Detect dongles button detect_btn = QPushButton("Detect Dongles") detect_btn.clicked.connect(self.detect_dongles) layout.addWidget(detect_btn) # Dongles list self.dongles_list = QListWidget() self.dongles_list.addItem("No dongles detected. Click 'Detect Dongles' to scan.") layout.addWidget(self.dongles_list) layout.addStretch() widget.setWidget(content) widget.setWidgetResizable(True) return widget def setup_menu(self): """Setup the menu bar.""" menubar = self.menuBar() # File menu file_menu = menubar.addMenu('&File') # New pipeline new_action = QAction('&New Pipeline', self) new_action.setShortcut('Ctrl+N') new_action.triggered.connect(self.new_pipeline) file_menu.addAction(new_action) # Open pipeline open_action = QAction('&Open Pipeline...', self) open_action.setShortcut('Ctrl+O') open_action.triggered.connect(self.open_pipeline) file_menu.addAction(open_action) file_menu.addSeparator() # Save pipeline save_action = QAction('&Save Pipeline', self) save_action.setShortcut('Ctrl+S') save_action.triggered.connect(self.save_pipeline) file_menu.addAction(save_action) # Save As save_as_action = QAction('Save &As...', self) save_as_action.setShortcut('Ctrl+Shift+S') save_as_action.triggered.connect(self.save_pipeline_as) file_menu.addAction(save_as_action) file_menu.addSeparator() # Export export_action = QAction('&Export Configuration...', self) export_action.triggered.connect(self.export_configuration) file_menu.addAction(export_action) # Pipeline menu pipeline_menu = menubar.addMenu('&Pipeline') # Validate pipeline validate_action = QAction('&Validate Pipeline', self) validate_action.triggered.connect(self.validate_pipeline) pipeline_menu.addAction(validate_action) # Performance estimation perf_action = QAction('&Performance Analysis', self) perf_action.triggered.connect(self.update_performance_estimation) pipeline_menu.addAction(perf_action) def setup_shortcuts(self): """Setup keyboard shortcuts.""" # Delete shortcut self.delete_shortcut = QAction("Delete", self) self.delete_shortcut.setShortcut('Delete') self.delete_shortcut.triggered.connect(self.delete_selected_nodes) self.addAction(self.delete_shortcut) def apply_styling(self): """Apply the application stylesheet.""" self.setStyleSheet(HARMONIOUS_THEME_STYLESHEET) # Event handlers and utility methods def add_node_to_graph(self, node_class): """Add a new node to the graph.""" if not self.graph: QMessageBox.warning(self, "Node Graph Not Available", "NodeGraphQt is not available. Cannot add nodes.") return try: print(f"Attempting to create node with identifier: {node_class.__identifier__}") # Try different identifier formats that NodeGraphQt might use identifiers_to_try = [ node_class.__identifier__, # Original identifier f"{node_class.__identifier__}.{node_class.__name__}", # Full format node_class.__name__, # Just class name ] node = None for identifier in identifiers_to_try: try: print(f"Trying identifier: {identifier}") node = self.graph.create_node(identifier) print(f"Success with identifier: {identifier}") break except Exception as e: print(f"Failed with {identifier}: {e}") continue if not node: raise Exception("Could not create node with any identifier format") # Position the node with some randomization to avoid overlap import random x_pos = random.randint(50, 300) y_pos = random.randint(50, 300) node.set_pos(x_pos, y_pos) print(f"โœ“ Successfully created node: {node.name()}") self.mark_modified() except Exception as e: error_msg = f"Failed to create node: {e}" print(f"โœ— {error_msg}") import traceback traceback.print_exc() # Show user-friendly error QMessageBox.critical(self, "Node Creation Error", f"Could not create {node_class.NODE_NAME}.\n\n" f"Error: {e}\n\n" f"This might be due to:\n" f"โ€ข Node not properly registered\n" f"โ€ข NodeGraphQt compatibility issue\n" f"โ€ข Missing dependencies") def on_node_selection_changed(self): """Handle node selection changes.""" if not self.graph: return selected_nodes = self.graph.selected_nodes() if selected_nodes: self.update_node_properties_panel(selected_nodes[0]) self.node_selected.emit(selected_nodes[0]) else: self.clear_node_properties_panel() def update_node_properties_panel(self, node): """Update the properties panel for the selected node.""" if not self.node_props_container: return # Clear existing properties self.clear_node_properties_panel() # Show the container and hide instructions self.node_props_container.setVisible(True) self.props_instructions.setVisible(False) # Create property form form_widget = QWidget() form_layout = QFormLayout(form_widget) # Node info info_label = QLabel(f"Editing: {node.name()}") info_label.setStyleSheet("color: #89b4fa; font-weight: bold; margin-bottom: 10px;") form_layout.addRow(info_label) # Get node properties - try different methods try: properties = {} # Method 1: Try custom properties (for enhanced nodes) if hasattr(node, 'get_business_properties'): properties = node.get_business_properties() # Method 1.5: Try ExactNode properties (with _property_options) elif hasattr(node, '_property_options') and node._property_options: properties = {} for prop_name in node._property_options.keys(): if hasattr(node, 'get_property'): try: properties[prop_name] = node.get_property(prop_name) except: # If property doesn't exist, use a default value properties[prop_name] = None # Method 2: Try standard NodeGraphQt properties elif hasattr(node, 'properties'): all_props = node.properties() # Filter out system properties, keep user properties for key, value in all_props.items(): if not key.startswith('_') and key not in ['name', 'selected', 'disabled', 'custom']: properties[key] = value # Method 3: Use exact original properties based on node type else: node_type = node.__class__.__name__ if 'Input' in node_type: # Exact InputNode properties from original properties = { 'source_type': node.get_property('source_type') if hasattr(node, 'get_property') else 'Camera', 'device_id': node.get_property('device_id') if hasattr(node, 'get_property') else 0, 'source_path': node.get_property('source_path') if hasattr(node, 'get_property') else '', 'resolution': node.get_property('resolution') if hasattr(node, 'get_property') else '1920x1080', 'fps': node.get_property('fps') if hasattr(node, 'get_property') else 30 } elif 'Model' in node_type: # Exact ModelNode properties from original properties = { 'model_path': node.get_property('model_path') if hasattr(node, 'get_property') else '', 'dongle_series': node.get_property('dongle_series') if hasattr(node, 'get_property') else '520', 'num_dongles': node.get_property('num_dongles') if hasattr(node, 'get_property') else 1, 'port_id': node.get_property('port_id') if hasattr(node, 'get_property') else '' } elif 'Preprocess' in node_type: # Exact PreprocessNode properties from original properties = { 'resize_width': node.get_property('resize_width') if hasattr(node, 'get_property') else 640, 'resize_height': node.get_property('resize_height') if hasattr(node, 'get_property') else 480, 'normalize': node.get_property('normalize') if hasattr(node, 'get_property') else True, 'crop_enabled': node.get_property('crop_enabled') if hasattr(node, 'get_property') else False, 'operations': node.get_property('operations') if hasattr(node, 'get_property') else 'resize,normalize' } elif 'Postprocess' in node_type: # Exact PostprocessNode properties from original properties = { 'output_format': node.get_property('output_format') if hasattr(node, 'get_property') else 'JSON', 'confidence_threshold': node.get_property('confidence_threshold') if hasattr(node, 'get_property') else 0.5, 'nms_threshold': node.get_property('nms_threshold') if hasattr(node, 'get_property') else 0.4, 'max_detections': node.get_property('max_detections') if hasattr(node, 'get_property') else 100 } elif 'Output' in node_type: # Exact OutputNode properties from original properties = { 'output_type': node.get_property('output_type') if hasattr(node, 'get_property') else 'File', 'destination': node.get_property('destination') if hasattr(node, 'get_property') else '', 'format': node.get_property('format') if hasattr(node, 'get_property') else 'JSON', 'save_interval': node.get_property('save_interval') if hasattr(node, 'get_property') else 1.0 } if properties: for prop_name, prop_value in properties.items(): # Create widget based on property type and name widget = self.create_property_widget_enhanced(node, prop_name, prop_value) # Add to form label = prop_name.replace('_', ' ').title() form_layout.addRow(f"{label}:", widget) else: # Show available properties for debugging info_text = f"Node type: {node.__class__.__name__}\n" if hasattr(node, 'properties'): props = node.properties() info_text += f"Available properties: {list(props.keys())}" else: info_text += "No properties method found" info_label = QLabel(info_text) info_label.setStyleSheet("color: #f9e2af; font-size: 10px;") form_layout.addRow(info_label) except Exception as e: error_label = QLabel(f"Error loading properties: {e}") error_label.setStyleSheet("color: #f38ba8;") form_layout.addRow(error_label) import traceback traceback.print_exc() self.node_props_layout.addWidget(form_widget) def create_property_widget(self, node, prop_name: str, prop_value, options: Dict): """Create appropriate widget for a property.""" # Simple implementation - can be enhanced if isinstance(prop_value, bool): widget = QCheckBox() widget.setChecked(prop_value) elif isinstance(prop_value, int): widget = QSpinBox() widget.setValue(prop_value) if 'min' in options: widget.setMinimum(options['min']) if 'max' in options: widget.setMaximum(options['max']) elif isinstance(prop_value, float): widget = QDoubleSpinBox() widget.setValue(prop_value) if 'min' in options: widget.setMinimum(options['min']) if 'max' in options: widget.setMaximum(options['max']) elif isinstance(options, list): widget = QComboBox() widget.addItems(options) if prop_value in options: widget.setCurrentText(str(prop_value)) else: widget = QLineEdit() widget.setText(str(prop_value)) return widget def create_property_widget_enhanced(self, node, prop_name: str, prop_value): """Create enhanced property widget with better type detection.""" # Create widget based on property name and value widget = None # Get property options from the node if available prop_options = None if hasattr(node, '_property_options') and prop_name in node._property_options: prop_options = node._property_options[prop_name] # Check for file path properties first (from prop_options or name pattern) if (prop_options and isinstance(prop_options, dict) and prop_options.get('type') == 'file_path') or \ prop_name in ['model_path', 'source_path', 'destination']: # File path property with filters from prop_options or defaults widget = QPushButton(str(prop_value) if prop_value else 'Select File...') widget.setStyleSheet("text-align: left; padding: 5px;") def browse_file(): # Use filter from prop_options if available, otherwise use defaults if prop_options and 'filter' in prop_options: file_filter = prop_options['filter'] else: # Fallback to original filters filters = { 'model_path': 'Model files (*.onnx *.tflite *.pb)', 'source_path': 'Media files (*.mp4 *.avi *.mov *.mkv *.wav *.mp3)', 'destination': 'Output files (*.json *.xml *.csv *.txt)' } file_filter = filters.get(prop_name, 'All files (*)') file_path, _ = QFileDialog.getOpenFileName(self, f'Select {prop_name}', '', file_filter) if file_path: widget.setText(file_path) if hasattr(node, 'set_property'): node.set_property(prop_name, file_path) widget.clicked.connect(browse_file) # Check for dropdown properties (list options from prop_options or predefined) elif (prop_options and isinstance(prop_options, list)) or \ prop_name in ['source_type', 'dongle_series', 'output_format', 'format', 'output_type', 'resolution']: # Dropdown property widget = QComboBox() # Use options from prop_options if available, otherwise use defaults if prop_options and isinstance(prop_options, list): items = prop_options else: # Fallback to original options options = { 'source_type': ['Camera', 'Microphone', 'File', 'RTSP Stream', 'HTTP Stream'], 'dongle_series': ['520', '720', '1080', 'Custom'], 'output_format': ['JSON', 'XML', 'CSV', 'Binary'], 'format': ['JSON', 'XML', 'CSV', 'Binary'], 'output_type': ['File', 'API Endpoint', 'Database', 'Display', 'MQTT'], 'resolution': ['640x480', '1280x720', '1920x1080', '3840x2160', 'Custom'] } items = options.get(prop_name, [str(prop_value)]) widget.addItems(items) if str(prop_value) in items: widget.setCurrentText(str(prop_value)) def on_change(text): if hasattr(node, 'set_property'): node.set_property(prop_name, text) widget.currentTextChanged.connect(on_change) elif isinstance(prop_value, bool): # Boolean property widget = QCheckBox() widget.setChecked(prop_value) def on_change(state): if hasattr(node, 'set_property'): node.set_property(prop_name, state == 2) widget.stateChanged.connect(on_change) elif isinstance(prop_value, int): # Integer property widget = QSpinBox() widget.setValue(prop_value) # Set range from prop_options if available, otherwise use defaults if prop_options and isinstance(prop_options, dict) and 'min' in prop_options and 'max' in prop_options: widget.setRange(prop_options['min'], prop_options['max']) else: # Fallback to original ranges for specific properties widget.setRange(0, 99999) # Default range if prop_name in ['device_id']: widget.setRange(0, 10) elif prop_name in ['fps']: widget.setRange(1, 120) elif prop_name in ['resize_width', 'resize_height']: widget.setRange(64, 4096) elif prop_name in ['num_dongles']: widget.setRange(1, 16) elif prop_name in ['max_detections']: widget.setRange(1, 1000) def on_change(value): if hasattr(node, 'set_property'): node.set_property(prop_name, value) widget.valueChanged.connect(on_change) elif isinstance(prop_value, float): # Float property widget = QDoubleSpinBox() widget.setValue(prop_value) widget.setDecimals(2) # Set range and step from prop_options if available, otherwise use defaults if prop_options and isinstance(prop_options, dict): if 'min' in prop_options and 'max' in prop_options: widget.setRange(prop_options['min'], prop_options['max']) else: widget.setRange(0.0, 999.0) # Default range if 'step' in prop_options: widget.setSingleStep(prop_options['step']) else: widget.setSingleStep(0.01) # Default step else: # Fallback to original ranges for specific properties widget.setRange(0.0, 999.0) # Default range if prop_name in ['confidence_threshold', 'nms_threshold']: widget.setRange(0.0, 1.0) widget.setSingleStep(0.1) elif prop_name in ['save_interval']: widget.setRange(0.1, 60.0) widget.setSingleStep(0.1) def on_change(value): if hasattr(node, 'set_property'): node.set_property(prop_name, value) widget.valueChanged.connect(on_change) else: # String property (default) widget = QLineEdit() widget.setText(str(prop_value)) # Set placeholders for specific properties placeholders = { 'model_path': 'Path to model file (.nef, .onnx, etc.)', 'destination': 'Output file path', 'resolution': 'e.g., 1920x1080' } if prop_name in placeholders: widget.setPlaceholderText(placeholders[prop_name]) def on_change(text): if hasattr(node, 'set_property'): node.set_property(prop_name, text) widget.textChanged.connect(on_change) return widget def clear_node_properties_panel(self): """Clear the node properties panel.""" if not self.node_props_layout: return # Remove all widgets for i in reversed(range(self.node_props_layout.count())): child = self.node_props_layout.itemAt(i).widget() if child: child.deleteLater() # Show instructions and hide container self.node_props_container.setVisible(False) self.props_instructions.setVisible(True) def detect_dongles(self): """Detect available dongles using actual device scanning.""" if not self.dongles_list: return self.dongles_list.clear() try: # Import MultiDongle for device scanning from cluster4npu_ui.core.functions.Multidongle import MultiDongle # Scan for available devices devices = MultiDongle.scan_devices() if devices: # Add detected devices to the list for device in devices: port_id = device['port_id'] series = device['series'] self.dongles_list.addItem(f"{series} Dongle - Port {port_id}") # Add summary item self.dongles_list.addItem(f"Total: {len(devices)} device(s) detected") # Store device info for later use self.detected_devices = devices else: self.dongles_list.addItem("No Kneron devices detected") self.detected_devices = [] except Exception as e: # Fallback to simulation if scanning fails self.dongles_list.addItem("Device scanning failed - using simulation") self.dongles_list.addItem("Simulated KL520 Dongle - Port 28") self.dongles_list.addItem("Simulated KL720 Dongle - Port 32") self.detected_devices = [] # Print error for debugging print(f"Dongle detection error: {str(e)}") def get_detected_devices(self): """ Get the list of detected devices with their port IDs and series. Returns: List[Dict]: List of device information with port_id and series """ return getattr(self, 'detected_devices', []) def refresh_dongle_detection(self): """ Refresh the dongle detection and update the UI. This can be called when dongles are plugged/unplugged. """ self.detect_dongles() # Update any other UI components that depend on dongle detection self.update_performance_estimation() def get_available_ports(self): """ Get list of available port IDs from detected devices. Returns: List[int]: List of available port IDs """ return [device['port_id'] for device in self.get_detected_devices()] def get_device_by_port(self, port_id): """ Get device information by port ID. Args: port_id (int): Port ID to search for Returns: Dict or None: Device information if found, None otherwise """ for device in self.get_detected_devices(): if device['port_id'] == port_id: return device return None def update_performance_estimation(self): """Update performance metrics based on pipeline and detected devices.""" if not all([self.fps_label, self.latency_label, self.memory_label]): return # Enhanced performance estimation with device information if self.graph: num_nodes = len(self.graph.all_nodes()) num_devices = len(self.get_detected_devices()) # Base performance calculation base_fps = max(1, 60 - (num_nodes * 5)) base_latency = num_nodes * 10 base_memory = num_nodes * 50 # Adjust for device availability if num_devices > 0: # More devices can potentially improve performance device_multiplier = min(1.5, 1 + (num_devices - 1) * 0.1) estimated_fps = int(base_fps * device_multiplier) estimated_latency = max(5, int(base_latency / device_multiplier)) estimated_memory = base_memory # Memory usage doesn't change much else: # No devices detected - show warning performance estimated_fps = 1 estimated_latency = 999 estimated_memory = base_memory self.fps_label.setText(f"{estimated_fps} FPS") self.latency_label.setText(f"{estimated_latency} ms") self.memory_label.setText(f"{estimated_memory} MB") if self.suggestions_text: suggestions = [] # Device-specific suggestions if num_devices == 0: suggestions.append("No Kneron devices detected. Connect dongles to enable inference.") elif num_devices < num_nodes: suggestions.append(f"Consider connecting more devices ({num_devices} available, {num_nodes} pipeline stages).") # Performance suggestions if num_nodes > 5: suggestions.append("Consider reducing the number of pipeline stages for better performance.") if estimated_fps < 30 and num_devices > 0: suggestions.append("Current configuration may not achieve real-time performance.") # Hardware-specific suggestions detected_devices = self.get_detected_devices() if detected_devices: device_series = set(device['series'] for device in detected_devices) if len(device_series) > 1: suggestions.append(f"Mixed device types detected: {', '.join(device_series)}. Performance may vary.") if not suggestions: suggestions.append("Pipeline configuration looks good for optimal performance.") self.suggestions_text.setPlainText("\n".join(suggestions)) def delete_selected_nodes(self): """Delete selected nodes from the graph.""" if not self.graph: return selected_nodes = self.graph.selected_nodes() if selected_nodes: for node in selected_nodes: self.graph.delete_node(node) self.mark_modified() def validate_pipeline(self): """Validate the current pipeline.""" if not self.graph: QMessageBox.information(self, "Validation", "No pipeline to validate.") return print("๐Ÿ” Validating pipeline...") summary = get_pipeline_summary(self.graph) if summary['valid']: print(f"Pipeline validation passed - {summary['stage_count']} stages, {summary['total_nodes']} nodes") QMessageBox.information(self, "Pipeline Validation", f"Pipeline is valid!\n\n" f"Stages: {summary['stage_count']}\n" f"Total nodes: {summary['total_nodes']}") else: print(f"Pipeline validation failed: {summary['error']}") QMessageBox.warning(self, "Pipeline Validation", f"Pipeline validation failed:\n\n{summary['error']}") # File operations def new_pipeline(self): """Create a new pipeline.""" if self.is_modified: reply = QMessageBox.question(self, "Save Changes", "Save changes to current pipeline?", QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel) if reply == QMessageBox.Yes: self.save_pipeline() elif reply == QMessageBox.Cancel: return # Clear the graph if self.graph: self.graph.clear_session() self.project_name = "Untitled Pipeline" self.current_file = None self.is_modified = False self.update_window_title() def open_pipeline(self): """Open a pipeline file.""" file_path, _ = QFileDialog.getOpenFileName( self, "Open Pipeline", self.settings.get_default_project_location(), "Pipeline files (*.mflow);;All files (*)" ) if file_path: self.load_pipeline_file(file_path) def save_pipeline(self): """Save the current pipeline.""" if self.current_file: self.save_to_file(self.current_file) else: self.save_pipeline_as() def save_pipeline_as(self): """Save pipeline with a new name.""" file_path, _ = QFileDialog.getSaveFileName( self, "Save Pipeline", os.path.join(self.settings.get_default_project_location(), f"{self.project_name}.mflow"), "Pipeline files (*.mflow)" ) if file_path: self.save_to_file(file_path) def save_to_file(self, file_path: str): """Save pipeline to specified file.""" try: pipeline_data = { 'project_name': self.project_name, 'description': self.description, 'nodes': [], 'connections': [], 'version': '1.0' } # Save node data if graph is available if self.graph: for node in self.graph.all_nodes(): node_data = { 'id': node.id, 'name': node.name(), 'type': node.__class__.__name__, 'pos': node.pos() } if hasattr(node, 'get_business_properties'): node_data['properties'] = node.get_business_properties() pipeline_data['nodes'].append(node_data) # Save connections for node in self.graph.all_nodes(): for output_port in node.output_ports(): for input_port in output_port.connected_ports(): connection_data = { 'input_node': input_port.node().id, 'input_port': input_port.name(), 'output_node': node.id, 'output_port': output_port.name() } pipeline_data['connections'].append(connection_data) with open(file_path, 'w') as f: json.dump(pipeline_data, f, indent=2) self.current_file = file_path self.settings.add_recent_file(file_path) self.mark_saved() QMessageBox.information(self, "Saved", f"Pipeline saved to {file_path}") except Exception as e: QMessageBox.critical(self, "Save Error", f"Failed to save pipeline: {e}") def load_pipeline_file(self, file_path: str): """Load pipeline from file.""" try: with open(file_path, 'r') as f: pipeline_data = json.load(f) self.project_name = pipeline_data.get('project_name', 'Loaded Pipeline') self.description = pipeline_data.get('description', '') self.current_file = file_path # Clear existing pipeline if self.graph: self.graph.clear_session() # Load nodes and connections self._load_nodes_from_data(pipeline_data.get('nodes', [])) self._load_connections_from_data(pipeline_data.get('connections', [])) self.settings.add_recent_file(file_path) self.mark_saved() self.update_window_title() except Exception as e: QMessageBox.critical(self, "Load Error", f"Failed to load pipeline: {e}") def export_configuration(self): """Export pipeline configuration.""" QMessageBox.information(self, "Export", "Export functionality will be implemented in a future version.") def _load_nodes_from_data(self, nodes_data): """Load nodes from saved data.""" if not self.graph: return # Import node types from core.nodes.exact_nodes import EXACT_NODE_TYPES # Create a mapping from class names to node classes class_to_node_type = {} for node_name, node_class in EXACT_NODE_TYPES.items(): class_to_node_type[node_class.__name__] = node_class # Create a mapping from old IDs to new nodes self._node_id_mapping = {} for node_data in nodes_data: try: node_type = node_data.get('type') old_node_id = node_data.get('id') if node_type and node_type in class_to_node_type: node_class = class_to_node_type[node_type] # Try different identifier formats identifiers_to_try = [ node_class.__identifier__, f"{node_class.__identifier__}.{node_class.__name__}", node_class.__name__ ] node = None for identifier in identifiers_to_try: try: node = self.graph.create_node(identifier) break except Exception: continue if node: # Map old ID to new node if old_node_id: self._node_id_mapping[old_node_id] = node print(f"Mapped old ID {old_node_id} to new node {node.id}") # Set node properties if 'name' in node_data: node.set_name(node_data['name']) if 'pos' in node_data: node.set_pos(*node_data['pos']) # Restore business properties if 'properties' in node_data: for prop_name, prop_value in node_data['properties'].items(): try: node.set_property(prop_name, prop_value) except Exception as e: print(f"Warning: Could not set property {prop_name}: {e}") except Exception as e: print(f"Error loading node {node_data}: {e}") def _load_connections_from_data(self, connections_data): """Load connections from saved data.""" if not self.graph: return print(f"Loading {len(connections_data)} connections...") # Check if we have the node ID mapping if not hasattr(self, '_node_id_mapping'): print(" Warning: No node ID mapping available") return # Create connections between nodes for i, connection_data in enumerate(connections_data): try: input_node_id = connection_data.get('input_node') input_port_name = connection_data.get('input_port') output_node_id = connection_data.get('output_node') output_port_name = connection_data.get('output_port') print(f"Connection {i+1}: {output_node_id}:{output_port_name} -> {input_node_id}:{input_port_name}") # Find the nodes using the ID mapping input_node = self._node_id_mapping.get(input_node_id) output_node = self._node_id_mapping.get(output_node_id) if not input_node: print(f" Warning: Input node {input_node_id} not found in mapping") continue if not output_node: print(f" Warning: Output node {output_node_id} not found in mapping") continue # Get the ports input_port = input_node.get_input(input_port_name) output_port = output_node.get_output(output_port_name) if not input_port: print(f" Warning: Input port '{input_port_name}' not found on node {input_node.name()}") continue if not output_port: print(f" Warning: Output port '{output_port_name}' not found on node {output_node.name()}") continue # Create the connection - output connects to input output_port.connect_to(input_port) print(f" โœ“ Connection created successfully") except Exception as e: print(f"Error loading connection {connection_data}: {e}") # State management def mark_modified(self): """Mark the pipeline as modified.""" self.is_modified = True self.update_window_title() self.pipeline_modified.emit() # Schedule pipeline analysis self.schedule_analysis() # Update performance estimation when pipeline changes self.update_performance_estimation() def mark_saved(self): """Mark the pipeline as saved.""" self.is_modified = False self.update_window_title() def update_window_title(self): """Update the window title.""" title = f"Cluster4NPU - {self.project_name}" if self.is_modified: title += " *" if self.current_file: title += f" - {os.path.basename(self.current_file)}" self.setWindowTitle(title) def closeEvent(self, event): """Handle window close event.""" if self.is_modified: reply = QMessageBox.question(self, "Save Changes", "Save changes before closing?", QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel) if reply == QMessageBox.Yes: self.save_pipeline() event.accept() elif reply == QMessageBox.No: event.accept() else: event.ignore() else: event.accept() # Pipeline Deployment def deploy_pipeline(self): """Deploy the current pipeline to dongles.""" try: # First validate the pipeline if not self.validate_pipeline_for_deployment(): return # Convert current pipeline to .mflow format pipeline_data = self.export_pipeline_data() # Show deployment dialog self.show_deployment_dialog(pipeline_data) except Exception as e: QMessageBox.critical(self, "Deployment Error", f"Failed to prepare pipeline for deployment: {str(e)}") def validate_pipeline_for_deployment(self) -> bool: """Validate pipeline is ready for deployment.""" if not self.graph: QMessageBox.warning(self, "Deployment Error", "No pipeline to deploy. Please create a pipeline first.") return False # Check if pipeline has required nodes all_nodes = self.graph.all_nodes() if not all_nodes: QMessageBox.warning(self, "Deployment Error", "Pipeline is empty. Please add nodes to your pipeline.") return False # Check for required node types has_input = any(self.is_input_node(node) for node in all_nodes) has_model = any(self.is_model_node(node) for node in all_nodes) has_output = any(self.is_output_node(node) for node in all_nodes) if not has_input: QMessageBox.warning(self, "Deployment Error", "Pipeline must have at least one Input node.") return False if not has_model: QMessageBox.warning(self, "Deployment Error", "Pipeline must have at least one Model node.") return False if not has_output: QMessageBox.warning(self, "Deployment Error", "Pipeline must have at least one Output node.") return False # Validate model node configurations validation_errors = [] for node in all_nodes: if self.is_model_node(node): errors = self.validate_model_node_for_deployment(node) validation_errors.extend(errors) if validation_errors: error_msg = "Please fix the following issues before deployment:\n\n" error_msg += "\n".join(f"โ€ข {error}" for error in validation_errors) QMessageBox.warning(self, "Deployment Validation", error_msg) return False return True def validate_model_node_for_deployment(self, node) -> List[str]: """Validate a model node for deployment requirements.""" errors = [] try: # Get node properties if hasattr(node, 'get_property'): model_path = node.get_property('model_path') scpu_fw_path = node.get_property('scpu_fw_path') ncpu_fw_path = node.get_property('ncpu_fw_path') port_id = node.get_property('port_id') else: errors.append(f"Model node '{node.name()}' cannot read properties") return errors # Check model path if not model_path or not model_path.strip(): errors.append(f"Model node '{node.name()}' missing model path") elif not os.path.exists(model_path): errors.append(f"Model file not found: {model_path}") elif not model_path.endswith('.nef'): errors.append(f"Model file must be .nef format: {model_path}") # Check firmware paths if not scpu_fw_path or not scpu_fw_path.strip(): errors.append(f"Model node '{node.name()}' missing SCPU firmware path") elif not os.path.exists(scpu_fw_path): errors.append(f"SCPU firmware not found: {scpu_fw_path}") if not ncpu_fw_path or not ncpu_fw_path.strip(): errors.append(f"Model node '{node.name()}' missing NCPU firmware path") elif not os.path.exists(ncpu_fw_path): errors.append(f"NCPU firmware not found: {ncpu_fw_path}") # Check port ID if not port_id or not port_id.strip(): errors.append(f"Model node '{node.name()}' missing port ID") else: # Validate port ID format try: port_ids = [int(p.strip()) for p in port_id.split(',') if p.strip()] if not port_ids: errors.append(f"Model node '{node.name()}' has invalid port ID format") except ValueError: errors.append(f"Model node '{node.name()}' has invalid port ID: {port_id}") except Exception as e: errors.append(f"Error validating model node '{node.name()}': {str(e)}") return errors def export_pipeline_data(self) -> Dict[str, Any]: """Export current pipeline to dictionary format for deployment.""" pipeline_data = { 'project_name': self.project_name, 'description': self.description, 'nodes': [], 'connections': [], 'version': '1.0' } if not self.graph: return pipeline_data # Export nodes for node in self.graph.all_nodes(): node_data = { 'id': node.id, 'name': node.name(), 'type': node.__class__.__name__, 'pos': node.pos(), 'properties': {} } # Get node properties if hasattr(node, 'get_business_properties'): node_data['properties'] = node.get_business_properties() elif hasattr(node, '_property_options') and node._property_options: for prop_name in node._property_options.keys(): if hasattr(node, 'get_property'): try: node_data['properties'][prop_name] = node.get_property(prop_name) except: pass pipeline_data['nodes'].append(node_data) # Export connections for node in self.graph.all_nodes(): if hasattr(node, 'output_ports'): for output_port in node.output_ports(): if hasattr(output_port, 'connected_ports'): for input_port in output_port.connected_ports(): connection_data = { 'input_node': input_port.node().id, 'input_port': input_port.name(), 'output_node': node.id, 'output_port': output_port.name() } pipeline_data['connections'].append(connection_data) return pipeline_data def show_deployment_dialog(self, pipeline_data: Dict[str, Any]): """Show deployment dialog and handle deployment process.""" from ..dialogs.deployment import DeploymentDialog dialog = DeploymentDialog(pipeline_data, parent=self) if dialog.exec_() == dialog.Accepted: # Deployment was successful or initiated self.statusBar().showMessage("Pipeline deployment initiated...", 3000) def is_input_node(self, node) -> bool: """Check if node is an input node.""" return ('input' in str(type(node)).lower() or hasattr(node, 'NODE_NAME') and 'input' in str(node.NODE_NAME).lower()) def is_model_node(self, node) -> bool: """Check if node is a model node.""" return ('model' in str(type(node)).lower() or hasattr(node, 'NODE_NAME') and 'model' in str(node.NODE_NAME).lower()) def is_output_node(self, node) -> bool: """Check if node is an output node.""" return ('output' in str(type(node)).lower() or hasattr(node, 'NODE_NAME') and 'output' in str(node.NODE_NAME).lower())