Cluster/ui/dialogs/deployment.py
2025-07-17 17:04:56 +08:00

877 lines
36 KiB
Python

"""
Pipeline Deployment Dialog
This dialog handles the conversion of .mflow pipeline data to executable format
and deployment to Kneron dongles using the InferencePipeline system.
Main Components:
- Pipeline conversion using MFlowConverter
- Topology analysis and optimization
- Dongle status monitoring
- Real-time deployment progress
- Error handling and troubleshooting
Usage:
from ui.dialogs.deployment import DeploymentDialog
dialog = DeploymentDialog(pipeline_data, parent=self)
dialog.exec_()
"""
import os
import sys
import json
import threading
import traceback
from typing import Dict, Any, List, Optional
from PyQt5.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QTextEdit, QPushButton,
QProgressBar, QTabWidget, QWidget, QFormLayout, QLineEdit, QSpinBox,
QCheckBox, QGroupBox, QScrollArea, QTableWidget, QTableWidgetItem,
QHeaderView, QMessageBox, QSplitter, QFrame
)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtGui import QFont, QColor, QPalette, QImage, QPixmap
# Import our converter and pipeline system
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'core', 'functions'))
try:
from ...core.functions.mflow_converter import MFlowConverter, PipelineConfig
CONVERTER_AVAILABLE = True
except ImportError as e:
print(f"Warning: MFlow converter not available: {e}")
CONVERTER_AVAILABLE = False
try:
from ...core.functions.Multidongle import MultiDongle
from ...core.functions.InferencePipeline import InferencePipeline
from ...core.functions.workflow_orchestrator import WorkflowOrchestrator
# from workflow_orchestrator import WorkflowOrchestrator
PIPELINE_AVAILABLE = True
except ImportError as e:
print(f"Warning: Pipeline system not available: {e}")
PIPELINE_AVAILABLE = False
class DeploymentWorker(QThread):
"""Worker thread for pipeline deployment to avoid blocking UI."""
# Signals
progress_updated = pyqtSignal(int, str) # progress, message
topology_analyzed = pyqtSignal(dict) # topology analysis results
conversion_completed = pyqtSignal(object) # PipelineConfig object
deployment_started = pyqtSignal()
deployment_completed = pyqtSignal(bool, str) # success, message
error_occurred = pyqtSignal(str)
frame_updated = pyqtSignal('PyQt_PyObject') # For live view
result_updated = pyqtSignal(dict) # For inference results
def __init__(self, pipeline_data: Dict[str, Any]):
super().__init__()
self.pipeline_data = pipeline_data
self.should_stop = False
self.orchestrator = None
def run(self):
"""Main deployment workflow."""
try:
# Step 1: Convert .mflow to pipeline config
self.progress_updated.emit(10, "Converting pipeline configuration...")
if not CONVERTER_AVAILABLE:
self.error_occurred.emit("MFlow converter not available. Please check installation.")
return
converter = MFlowConverter()
config = converter._convert_mflow_to_config(self.pipeline_data)
# Emit topology analysis results
self.topology_analyzed.emit({
'total_stages': len(config.stage_configs),
'pipeline_name': config.pipeline_name,
'input_config': config.input_config,
'output_config': config.output_config
})
self.progress_updated.emit(30, "Pipeline conversion completed")
self.conversion_completed.emit(config)
if self.should_stop:
return
# Step 2: Validate configuration
self.progress_updated.emit(40, "Validating pipeline configuration...")
is_valid, errors = converter.validate_config(config)
if not is_valid:
error_msg = "Configuration validation failed:\n" + "\n".join(errors)
self.error_occurred.emit(error_msg)
return
self.progress_updated.emit(60, "Configuration validation passed")
if self.should_stop:
return
# Step 3: Initialize pipeline (if dongle system available)
self.progress_updated.emit(70, "Initializing inference pipeline...")
if not PIPELINE_AVAILABLE:
self.progress_updated.emit(100, "Pipeline configuration ready (dongle system not available)")
self.deployment_completed.emit(True, "Pipeline configuration prepared successfully. Dongle system not available for actual deployment.")
return
# Create InferencePipeline instance
try:
pipeline = converter.create_inference_pipeline(config)
self.progress_updated.emit(80, "Initializing workflow orchestrator...")
self.deployment_started.emit()
# Create and start the orchestrator
self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config)
self.orchestrator.set_frame_callback(self.frame_updated.emit)
# Set up both GUI and terminal result callbacks
def combined_result_callback(result_dict):
# Print to terminal
self._print_terminal_results(result_dict)
# Emit for GUI
self.result_updated.emit(result_dict)
self.orchestrator.set_result_callback(combined_result_callback)
self.orchestrator.start()
self.progress_updated.emit(100, "Pipeline deployed successfully!")
self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages")
# Keep running until stop is requested
while not self.should_stop:
self.msleep(100) # Sleep for 100ms and check again
except Exception as e:
self.error_occurred.emit(f"Pipeline deployment failed: {str(e)}")
except Exception as e:
self.error_occurred.emit(f"Deployment error: {str(e)}")
def stop(self):
"""Stop the deployment process."""
self.should_stop = True
if self.orchestrator:
self.orchestrator.stop()
def _print_terminal_results(self, result_dict):
"""Print inference results to terminal with detailed formatting."""
try:
from datetime import datetime
# Header with timestamp
timestamp = datetime.fromtimestamp(result_dict.get('timestamp', 0)).strftime("%H:%M:%S.%f")[:-3]
pipeline_id = result_dict.get('pipeline_id', 'Unknown')
print(f"\n🔥 INFERENCE RESULT [{timestamp}]")
print(f" Pipeline ID: {pipeline_id}")
print(" " + "="*50)
# Stage results
stage_results = result_dict.get('stage_results', {})
if stage_results:
for stage_id, result in stage_results.items():
print(f" 📊 Stage: {stage_id}")
if isinstance(result, tuple) and len(result) == 2:
# Handle tuple results (result_string, probability)
result_string, probability = result
print(f" ✅ Result: {result_string}")
print(f" 📈 Probability: {probability:.3f}")
# Add confidence level
if probability > 0.8:
confidence = "🟢 Very High"
elif probability > 0.6:
confidence = "🟡 High"
elif probability > 0.4:
confidence = "🟠 Medium"
else:
confidence = "🔴 Low"
print(f" 🎯 Confidence: {confidence}")
elif isinstance(result, dict):
# Handle dict results
for key, value in result.items():
if key == 'probability':
print(f" 📈 {key.title()}: {value:.3f}")
elif key == 'result':
print(f"{key.title()}: {value}")
elif key == 'confidence':
print(f" 🎯 {key.title()}: {value}")
elif key == 'fused_probability':
print(f" 🔀 Fused Probability: {value:.3f}")
elif key == 'individual_probs':
print(f" 📋 Individual Probabilities:")
for prob_key, prob_value in value.items():
print(f" {prob_key}: {prob_value:.3f}")
else:
print(f" 📝 {key}: {value}")
else:
# Handle other result types
print(f" 📝 Raw Result: {result}")
print() # Blank line between stages
else:
print(" ⚠️ No stage results available")
# Processing time if available
metadata = result_dict.get('metadata', {})
if 'total_processing_time' in metadata:
processing_time = metadata['total_processing_time']
print(f" ⏱️ Processing Time: {processing_time:.3f}s")
# Add FPS calculation
if processing_time > 0:
fps = 1.0 / processing_time
print(f" 🚄 Theoretical FPS: {fps:.2f}")
# Additional metadata
if metadata:
interesting_keys = ['dongle_count', 'stage_count', 'queue_sizes', 'error_count']
for key in interesting_keys:
if key in metadata:
print(f" 📋 {key.replace('_', ' ').title()}: {metadata[key]}")
print(" " + "="*50)
except Exception as e:
print(f"❌ Error printing terminal results: {e}")
class DeploymentDialog(QDialog):
"""Main deployment dialog with comprehensive deployment management."""
def __init__(self, pipeline_data: Dict[str, Any], parent=None):
super().__init__(parent)
self.pipeline_data = pipeline_data
self.deployment_worker = None
self.pipeline_config = None
self.setWindowTitle("Deploy Pipeline to Dongles")
self.setMinimumSize(800, 600)
self.setup_ui()
self.apply_theme()
def setup_ui(self):
"""Setup the dialog UI."""
layout = QVBoxLayout(self)
# Header
header_label = QLabel("Pipeline Deployment")
header_label.setFont(QFont("Arial", 16, QFont.Bold))
header_label.setAlignment(Qt.AlignCenter)
layout.addWidget(header_label)
# Main content with tabs
self.tab_widget = QTabWidget()
# Overview tab
self.overview_tab = self.create_overview_tab()
self.tab_widget.addTab(self.overview_tab, "Overview")
# Topology tab
self.topology_tab = self.create_topology_tab()
self.tab_widget.addTab(self.topology_tab, "Topology Analysis")
# Configuration tab
self.config_tab = self.create_configuration_tab()
self.tab_widget.addTab(self.config_tab, "Configuration")
# Deployment tab
self.deployment_tab = self.create_deployment_tab()
self.tab_widget.addTab(self.deployment_tab, "Deployment")
# Live View tab
self.live_view_tab = self.create_live_view_tab()
self.tab_widget.addTab(self.live_view_tab, "Live View")
layout.addWidget(self.tab_widget)
# Progress bar
self.progress_bar = QProgressBar()
self.progress_bar.setVisible(False)
layout.addWidget(self.progress_bar)
# Status label
self.status_label = QLabel("Ready to deploy")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
# Buttons
button_layout = QHBoxLayout()
self.analyze_button = QPushButton("Analyze Pipeline")
self.analyze_button.clicked.connect(self.analyze_pipeline)
button_layout.addWidget(self.analyze_button)
self.deploy_button = QPushButton("Deploy to Dongles")
self.deploy_button.clicked.connect(self.start_deployment)
self.deploy_button.setEnabled(False)
button_layout.addWidget(self.deploy_button)
self.stop_button = QPushButton("Stop Inference")
self.stop_button.clicked.connect(self.stop_deployment)
self.stop_button.setEnabled(False)
self.stop_button.setVisible(False)
button_layout.addWidget(self.stop_button)
button_layout.addStretch()
self.close_button = QPushButton("Close")
self.close_button.clicked.connect(self.accept)
button_layout.addWidget(self.close_button)
layout.addLayout(button_layout)
# Populate initial data
self.populate_overview()
def create_overview_tab(self) -> QWidget:
"""Create pipeline overview tab."""
widget = QWidget()
layout = QVBoxLayout(widget)
# Pipeline info
info_group = QGroupBox("Pipeline Information")
info_layout = QFormLayout(info_group)
self.name_label = QLabel()
self.description_label = QLabel()
self.nodes_label = QLabel()
self.connections_label = QLabel()
info_layout.addRow("Name:", self.name_label)
info_layout.addRow("Description:", self.description_label)
info_layout.addRow("Nodes:", self.nodes_label)
info_layout.addRow("Connections:", self.connections_label)
layout.addWidget(info_group)
# Nodes table
nodes_group = QGroupBox("Pipeline Nodes")
nodes_layout = QVBoxLayout(nodes_group)
self.nodes_table = QTableWidget()
self.nodes_table.setColumnCount(3)
self.nodes_table.setHorizontalHeaderLabels(["Name", "Type", "Status"])
self.nodes_table.horizontalHeader().setStretchLastSection(True)
nodes_layout.addWidget(self.nodes_table)
layout.addWidget(nodes_group)
return widget
def create_topology_tab(self) -> QWidget:
"""Create topology analysis tab."""
widget = QWidget()
layout = QVBoxLayout(widget)
# Analysis results
self.topology_text = QTextEdit()
self.topology_text.setReadOnly(True)
self.topology_text.setFont(QFont("Consolas", 10))
self.topology_text.setText("Click 'Analyze Pipeline' to see topology analysis...")
layout.addWidget(self.topology_text)
return widget
def create_configuration_tab(self) -> QWidget:
"""Create configuration tab."""
widget = QWidget()
layout = QVBoxLayout(widget)
scroll_area = QScrollArea()
scroll_content = QWidget()
scroll_layout = QVBoxLayout(scroll_content)
# Stage configurations will be populated after analysis
self.config_content = QLabel("Run pipeline analysis to see stage configurations...")
self.config_content.setAlignment(Qt.AlignCenter)
scroll_layout.addWidget(self.config_content)
scroll_area.setWidget(scroll_content)
scroll_area.setWidgetResizable(True)
layout.addWidget(scroll_area)
return widget
def create_deployment_tab(self) -> QWidget:
"""Create deployment monitoring tab."""
widget = QWidget()
layout = QVBoxLayout(widget)
# Deployment log
log_group = QGroupBox("Deployment Log")
log_layout = QVBoxLayout(log_group)
self.deployment_log = QTextEdit()
self.deployment_log.setReadOnly(True)
self.deployment_log.setFont(QFont("Consolas", 9))
log_layout.addWidget(self.deployment_log)
layout.addWidget(log_group)
# Dongle status (placeholder)
status_group = QGroupBox("Dongle Status")
status_layout = QVBoxLayout(status_group)
self.dongle_status = QLabel("No dongles detected")
self.dongle_status.setAlignment(Qt.AlignCenter)
status_layout.addWidget(self.dongle_status)
layout.addWidget(status_group)
return widget
def create_live_view_tab(self) -> QWidget:
"""Create the live view tab for real-time output."""
widget = QWidget()
layout = QHBoxLayout(widget)
# Video display
video_group = QGroupBox("Live Video Feed")
video_layout = QVBoxLayout(video_group)
self.live_view_label = QLabel("Live view will appear here after deployment.")
self.live_view_label.setAlignment(Qt.AlignCenter)
self.live_view_label.setMinimumSize(640, 480)
video_layout.addWidget(self.live_view_label)
layout.addWidget(video_group, 2)
# Inference results
results_group = QGroupBox("Inference Results")
results_layout = QVBoxLayout(results_group)
self.results_text = QTextEdit()
self.results_text.setReadOnly(True)
results_layout.addWidget(self.results_text)
layout.addWidget(results_group, 1)
return widget
def populate_overview(self):
"""Populate overview tab with pipeline data."""
self.name_label.setText(self.pipeline_data.get('project_name', 'Untitled'))
self.description_label.setText(self.pipeline_data.get('description', 'No description'))
nodes = self.pipeline_data.get('nodes', [])
connections = self.pipeline_data.get('connections', [])
self.nodes_label.setText(str(len(nodes)))
self.connections_label.setText(str(len(connections)))
# Populate nodes table
self.nodes_table.setRowCount(len(nodes))
for i, node in enumerate(nodes):
self.nodes_table.setItem(i, 0, QTableWidgetItem(node.get('name', 'Unknown')))
self.nodes_table.setItem(i, 1, QTableWidgetItem(node.get('type', 'Unknown')))
self.nodes_table.setItem(i, 2, QTableWidgetItem("Ready"))
def analyze_pipeline(self):
"""Analyze pipeline topology and configuration."""
if not CONVERTER_AVAILABLE:
QMessageBox.warning(self, "Analysis Error",
"Pipeline analyzer not available. Please check installation.")
return
try:
self.status_label.setText("Analyzing pipeline...")
self.analyze_button.setEnabled(False)
# Create converter and analyze
converter = MFlowConverter()
config = converter._convert_mflow_to_config(self.pipeline_data)
self.pipeline_config = config
# Update topology tab
analysis_text = f"""Pipeline Analysis Results:
Name: {config.pipeline_name}
Description: {config.description}
Total Stages: {len(config.stage_configs)}
Input Configuration:
{json.dumps(config.input_config, indent=2)}
Output Configuration:
{json.dumps(config.output_config, indent=2)}
Stage Configurations:
"""
for i, stage_config in enumerate(config.stage_configs, 1):
analysis_text += f"\nStage {i}: {stage_config.stage_id}\n"
analysis_text += f" Port IDs: {stage_config.port_ids}\n"
analysis_text += f" Model Path: {stage_config.model_path}\n"
analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n"
analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n"
analysis_text += f" Upload Firmware: {stage_config.upload_fw}\n"
analysis_text += f" Max Queue Size: {stage_config.max_queue_size}\n"
self.topology_text.setText(analysis_text)
# Update configuration tab
self.update_configuration_tab(config)
# Validate configuration
is_valid, errors = converter.validate_config(config)
if is_valid:
self.status_label.setText("Pipeline analysis completed successfully")
self.deploy_button.setEnabled(True)
self.tab_widget.setCurrentIndex(1) # Switch to topology tab
else:
error_msg = "Configuration validation failed:\n" + "\n".join(errors)
QMessageBox.warning(self, "Validation Error", error_msg)
self.status_label.setText("Pipeline analysis failed validation")
except Exception as e:
QMessageBox.critical(self, "Analysis Error",
f"Failed to analyze pipeline: {str(e)}")
self.status_label.setText("Pipeline analysis failed")
finally:
self.analyze_button.setEnabled(True)
def update_configuration_tab(self, config: 'PipelineConfig'):
"""Update configuration tab with detailed stage information."""
# Clear existing content
scroll_content = QWidget()
scroll_layout = QVBoxLayout(scroll_content)
for i, stage_config in enumerate(config.stage_configs, 1):
stage_group = QGroupBox(f"Stage {i}: {stage_config.stage_id}")
stage_layout = QFormLayout(stage_group)
# Create read-only fields for stage configuration
model_path_edit = QLineEdit(stage_config.model_path)
model_path_edit.setReadOnly(True)
stage_layout.addRow("Model Path:", model_path_edit)
scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path)
scpu_fw_edit.setReadOnly(True)
stage_layout.addRow("SCPU Firmware:", scpu_fw_edit)
ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path)
ncpu_fw_edit.setReadOnly(True)
stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit)
port_ids_edit = QLineEdit(str(stage_config.port_ids))
port_ids_edit.setReadOnly(True)
stage_layout.addRow("Port IDs:", port_ids_edit)
queue_size_spin = QSpinBox()
queue_size_spin.setValue(stage_config.max_queue_size)
queue_size_spin.setReadOnly(True)
stage_layout.addRow("Queue Size:", queue_size_spin)
upload_fw_check = QCheckBox()
upload_fw_check.setChecked(stage_config.upload_fw)
upload_fw_check.setEnabled(False)
stage_layout.addRow("Upload Firmware:", upload_fw_check)
scroll_layout.addWidget(stage_group)
# Update the configuration tab
config_tab_layout = self.config_tab.layout()
old_scroll_area = config_tab_layout.itemAt(0).widget()
config_tab_layout.removeWidget(old_scroll_area)
old_scroll_area.deleteLater()
new_scroll_area = QScrollArea()
new_scroll_area.setWidget(scroll_content)
new_scroll_area.setWidgetResizable(True)
config_tab_layout.addWidget(new_scroll_area)
def start_deployment(self):
"""Start the deployment process."""
if not self.pipeline_config:
QMessageBox.warning(self, "Deployment Error",
"Please analyze the pipeline first.")
return
# Switch to deployment tab
self.tab_widget.setCurrentIndex(3)
# Setup UI for deployment
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
self.deploy_button.setEnabled(False)
self.close_button.setText("Cancel")
# Clear deployment log
self.deployment_log.clear()
self.deployment_log.append("Starting pipeline deployment...")
# Create and start deployment worker
self.deployment_worker = DeploymentWorker(self.pipeline_data)
self.deployment_worker.progress_updated.connect(self.update_progress)
self.deployment_worker.topology_analyzed.connect(self.update_topology_results)
self.deployment_worker.conversion_completed.connect(self.on_conversion_completed)
self.deployment_worker.deployment_started.connect(self.on_deployment_started)
self.deployment_worker.deployment_completed.connect(self.on_deployment_completed)
self.deployment_worker.error_occurred.connect(self.on_deployment_error)
self.deployment_worker.frame_updated.connect(self.update_live_view)
self.deployment_worker.result_updated.connect(self.update_inference_results)
self.deployment_worker.start()
def stop_deployment(self):
"""Stop the current deployment/inference."""
if self.deployment_worker and self.deployment_worker.isRunning():
reply = QMessageBox.question(self, "Stop Inference",
"Are you sure you want to stop the inference?",
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.Yes:
self.deployment_log.append("Stopping inference...")
self.status_label.setText("Stopping inference...")
# Disable stop button immediately to prevent multiple clicks
self.stop_button.setEnabled(False)
self.deployment_worker.stop()
# Wait for worker to finish in a separate thread to avoid blocking UI
def wait_for_stop():
if self.deployment_worker.wait(5000): # Wait up to 5 seconds
self.deployment_log.append("Inference stopped successfully.")
else:
self.deployment_log.append("Warning: Inference may not have stopped cleanly.")
# Update UI on main thread
self.stop_button.setVisible(False)
self.deploy_button.setEnabled(True)
self.close_button.setText("Close")
self.progress_bar.setVisible(False)
self.status_label.setText("Inference stopped")
self.dongle_status.setText("Pipeline stopped")
import threading
threading.Thread(target=wait_for_stop, daemon=True).start()
def update_progress(self, value: int, message: str):
"""Update deployment progress."""
self.progress_bar.setValue(value)
self.status_label.setText(message)
self.deployment_log.append(f"[{value}%] {message}")
def update_topology_results(self, results: Dict):
"""Update topology analysis results."""
self.deployment_log.append(f"Topology Analysis: {results['total_stages']} stages detected")
def on_conversion_completed(self, config):
"""Handle conversion completion."""
self.deployment_log.append("Pipeline conversion completed successfully")
def on_deployment_started(self):
"""Handle deployment start."""
self.deployment_log.append("Connecting to dongles...")
self.dongle_status.setText("Initializing dongles...")
# Show stop button and hide deploy button
self.stop_button.setEnabled(True)
self.stop_button.setVisible(True)
self.deploy_button.setEnabled(False)
def on_deployment_completed(self, success: bool, message: str):
"""Handle deployment completion."""
self.progress_bar.setValue(100)
if success:
self.deployment_log.append(f"SUCCESS: {message}")
self.status_label.setText("Deployment completed successfully!")
self.dongle_status.setText("Pipeline running on dongles")
# Keep stop button visible for successful deployment
self.stop_button.setEnabled(True)
self.stop_button.setVisible(True)
QMessageBox.information(self, "Deployment Success", message)
else:
self.deployment_log.append(f"FAILED: {message}")
self.status_label.setText("Deployment failed")
# Hide stop button for failed deployment
self.stop_button.setEnabled(False)
self.stop_button.setVisible(False)
self.deploy_button.setEnabled(True)
self.close_button.setText("Close")
self.progress_bar.setVisible(False)
def on_deployment_error(self, error: str):
"""Handle deployment error."""
self.deployment_log.append(f"ERROR: {error}")
self.status_label.setText("Deployment failed")
QMessageBox.critical(self, "Deployment Error", error)
# Hide stop button and show deploy button on error
self.stop_button.setEnabled(False)
self.stop_button.setVisible(False)
self.deploy_button.setEnabled(True)
self.close_button.setText("Close")
self.progress_bar.setVisible(False)
def update_live_view(self, frame):
"""Update the live view with a new frame."""
try:
# Convert the OpenCV frame to a QImage
height, width, channel = frame.shape
bytes_per_line = 3 * width
q_image = QImage(frame.data, width, height, bytes_per_line, QImage.Format_RGB888).rgbSwapped()
# Display the QImage in the QLabel
self.live_view_label.setPixmap(QPixmap.fromImage(q_image))
except Exception as e:
print(f"Error updating live view: {e}")
def update_inference_results(self, result_dict):
"""Update the inference results display."""
try:
import json
from datetime import datetime
# Format the results for display
timestamp = datetime.fromtimestamp(result_dict.get('timestamp', 0)).strftime("%H:%M:%S.%f")[:-3]
stage_results = result_dict.get('stage_results', {})
result_text = f"[{timestamp}] Pipeline ID: {result_dict.get('pipeline_id', 'Unknown')}\n"
# Display results from each stage
for stage_id, result in stage_results.items():
result_text += f" {stage_id}:\n"
if isinstance(result, tuple) and len(result) == 2:
# Handle tuple results (probability, result_string)
probability, result_string = result
result_text += f" Result: {result_string}\n"
result_text += f" Probability: {probability:.3f}\n"
elif isinstance(result, dict):
# Handle dict results
for key, value in result.items():
if key == 'probability':
result_text += f" Probability: {value:.3f}\n"
else:
result_text += f" {key}: {value}\n"
else:
result_text += f" {result}\n"
result_text += "-" * 50 + "\n"
# Append to results display (keep last 100 lines)
current_text = self.results_text.toPlainText()
lines = current_text.split('\n')
if len(lines) > 100:
lines = lines[-50:] # Keep last 50 lines
current_text = '\n'.join(lines)
self.results_text.setPlainText(current_text + result_text)
# Auto-scroll to bottom
scrollbar = self.results_text.verticalScrollBar()
scrollbar.setValue(scrollbar.maximum())
except Exception as e:
print(f"Error updating inference results: {e}")
def apply_theme(self):
"""Apply consistent theme to the dialog."""
self.setStyleSheet("""
QDialog {
background-color: #1e1e2e;
color: #cdd6f4;
}
QTabWidget::pane {
border: 1px solid #45475a;
background-color: #313244;
}
QTabWidget::tab-bar {
alignment: center;
}
QTabBar::tab {
background-color: #45475a;
color: #cdd6f4;
padding: 8px 16px;
margin-right: 2px;
border-top-left-radius: 4px;
border-top-right-radius: 4px;
}
QTabBar::tab:selected {
background-color: #89b4fa;
color: #1e1e2e;
}
QTabBar::tab:hover {
background-color: #585b70;
}
QGroupBox {
font-weight: bold;
border: 2px solid #45475a;
border-radius: 5px;
margin-top: 1ex;
padding-top: 5px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 10px 0 10px;
}
QPushButton {
background-color: #45475a;
color: #cdd6f4;
border: 1px solid #6c7086;
border-radius: 4px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #585b70;
}
QPushButton:pressed {
background-color: #313244;
}
QPushButton:disabled {
background-color: #313244;
color: #6c7086;
}
QTextEdit, QLineEdit {
background-color: #313244;
color: #cdd6f4;
border: 1px solid #45475a;
border-radius: 4px;
padding: 4px;
}
QTableWidget {
background-color: #313244;
alternate-background-color: #45475a;
color: #cdd6f4;
border: 1px solid #45475a;
}
QProgressBar {
background-color: #313244;
border: 1px solid #45475a;
border-radius: 4px;
text-align: center;
}
QProgressBar::chunk {
background-color: #a6e3a1;
border-radius: 3px;
}
""")
def closeEvent(self, event):
"""Handle dialog close event."""
if self.deployment_worker and self.deployment_worker.isRunning():
reply = QMessageBox.question(self, "Cancel Deployment",
"Deployment is in progress. Are you sure you want to cancel?",
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.Yes:
self.deployment_worker.stop()
self.deployment_worker.wait(3000) # Wait up to 3 seconds
event.accept()
else:
event.ignore()
else:
event.accept()