Cluster/core/nodes/output_node.py
2025-07-17 17:04:56 +08:00

370 lines
13 KiB
Python

"""
Output node implementation for data sink operations.
This module provides the OutputNode class which handles various output destinations
including files, databases, APIs, and display systems for pipeline results.
Main Components:
- OutputNode: Core output data sink node implementation
- Output destination configuration and validation
- Format conversion and export functionality
Usage:
from cluster4npu_ui.core.nodes.output_node import OutputNode
node = OutputNode()
node.set_property('output_type', 'File')
node.set_property('destination', '/path/to/output.json')
"""
from .base_node import BaseNodeWithProperties
class OutputNode(BaseNodeWithProperties):
"""
Output data sink node for pipeline result export.
This node handles various output destinations including files, databases,
API endpoints, and display systems for processed pipeline results.
"""
__identifier__ = 'com.cluster.output_node'
NODE_NAME = 'Output Node'
def __init__(self):
super().__init__()
# Setup node connections (only input)
self.add_input('input', multi_input=False, color=(255, 140, 0))
self.set_color(255, 140, 0)
# Initialize properties
self.setup_properties()
def setup_properties(self):
"""Initialize output destination-specific properties."""
# Output type configuration
self.create_business_property('output_type', 'File', [
'File', 'API Endpoint', 'Database', 'Display', 'MQTT', 'WebSocket', 'Console'
])
# File output configuration
self.create_business_property('destination', '', {
'type': 'file_path',
'filter': 'Output files (*.json *.xml *.csv *.txt *.log)',
'description': 'Output file path or URL'
})
self.create_business_property('format', 'JSON', [
'JSON', 'XML', 'CSV', 'Binary', 'MessagePack', 'YAML', 'Parquet'
])
self.create_business_property('save_interval', 1.0, {
'min': 0.1,
'max': 60.0,
'step': 0.1,
'description': 'Save interval in seconds'
})
# File management
self.create_business_property('enable_rotation', False, {
'description': 'Enable file rotation based on size or time'
})
self.create_business_property('rotation_type', 'size', [
'size', 'time', 'count'
])
self.create_business_property('rotation_size_mb', 100, {
'min': 1,
'max': 1000,
'description': 'Rotation size in MB'
})
self.create_business_property('rotation_time_hours', 24, {
'min': 1,
'max': 168,
'description': 'Rotation time in hours'
})
# API endpoint configuration
self.create_business_property('api_url', '', {
'placeholder': 'https://api.example.com/data',
'description': 'API endpoint URL'
})
self.create_business_property('api_method', 'POST', [
'POST', 'PUT', 'PATCH'
])
self.create_business_property('api_headers', '', {
'placeholder': 'Authorization: Bearer token\\nContent-Type: application/json',
'description': 'API headers (one per line)'
})
self.create_business_property('api_timeout', 30, {
'min': 1,
'max': 300,
'description': 'API request timeout in seconds'
})
# Database configuration
self.create_business_property('db_connection_string', '', {
'placeholder': 'postgresql://user:pass@host:port/db',
'description': 'Database connection string'
})
self.create_business_property('db_table', '', {
'placeholder': 'results',
'description': 'Database table name'
})
self.create_business_property('db_batch_size', 100, {
'min': 1,
'max': 1000,
'description': 'Batch size for database inserts'
})
# MQTT configuration
self.create_business_property('mqtt_broker', '', {
'placeholder': 'mqtt://broker.example.com:1883',
'description': 'MQTT broker URL'
})
self.create_business_property('mqtt_topic', '', {
'placeholder': 'cluster4npu/results',
'description': 'MQTT topic for publishing'
})
self.create_business_property('mqtt_qos', 0, [
0, 1, 2
])
# Display configuration
self.create_business_property('display_type', 'console', [
'console', 'window', 'overlay', 'web'
])
self.create_business_property('display_format', 'pretty', [
'pretty', 'compact', 'raw'
])
# Buffer and queuing
self.create_business_property('enable_buffering', True, {
'description': 'Enable output buffering'
})
self.create_business_property('buffer_size', 1000, {
'min': 1,
'max': 10000,
'description': 'Buffer size in number of results'
})
self.create_business_property('flush_interval', 5.0, {
'min': 0.1,
'max': 60.0,
'step': 0.1,
'description': 'Buffer flush interval in seconds'
})
# Error handling
self.create_business_property('retry_on_error', True, {
'description': 'Retry on output errors'
})
self.create_business_property('max_retries', 3, {
'min': 0,
'max': 10,
'description': 'Maximum number of retries'
})
self.create_business_property('retry_delay', 1.0, {
'min': 0.1,
'max': 10.0,
'step': 0.1,
'description': 'Delay between retries in seconds'
})
def validate_configuration(self) -> tuple[bool, str]:
"""
Validate the current node configuration.
Returns:
Tuple of (is_valid, error_message)
"""
output_type = self.get_property('output_type')
# Validate based on output type
if output_type == 'File':
destination = self.get_property('destination')
if not destination:
return False, "Destination path is required for file output"
elif output_type == 'API Endpoint':
api_url = self.get_property('api_url')
if not api_url:
return False, "API URL is required for API endpoint output"
# Basic URL validation
if not (api_url.startswith('http://') or api_url.startswith('https://')):
return False, "Invalid API URL format"
elif output_type == 'Database':
db_connection = self.get_property('db_connection_string')
if not db_connection:
return False, "Database connection string is required"
db_table = self.get_property('db_table')
if not db_table:
return False, "Database table name is required"
elif output_type == 'MQTT':
mqtt_broker = self.get_property('mqtt_broker')
if not mqtt_broker:
return False, "MQTT broker URL is required"
mqtt_topic = self.get_property('mqtt_topic')
if not mqtt_topic:
return False, "MQTT topic is required"
# Validate save interval
save_interval = self.get_property('save_interval')
if not isinstance(save_interval, (int, float)) or save_interval <= 0:
return False, "Save interval must be greater than 0"
return True, ""
def get_output_config(self) -> dict:
"""
Get output configuration for pipeline execution.
Returns:
Dictionary containing output configuration
"""
return {
'node_id': self.id,
'node_name': self.name(),
'output_type': self.get_property('output_type'),
'destination': self.get_property('destination'),
'format': self.get_property('format'),
'save_interval': self.get_property('save_interval'),
'enable_rotation': self.get_property('enable_rotation'),
'rotation_type': self.get_property('rotation_type'),
'rotation_size_mb': self.get_property('rotation_size_mb'),
'rotation_time_hours': self.get_property('rotation_time_hours'),
'api_url': self.get_property('api_url'),
'api_method': self.get_property('api_method'),
'api_headers': self._parse_headers(self.get_property('api_headers')),
'api_timeout': self.get_property('api_timeout'),
'db_connection_string': self.get_property('db_connection_string'),
'db_table': self.get_property('db_table'),
'db_batch_size': self.get_property('db_batch_size'),
'mqtt_broker': self.get_property('mqtt_broker'),
'mqtt_topic': self.get_property('mqtt_topic'),
'mqtt_qos': self.get_property('mqtt_qos'),
'display_type': self.get_property('display_type'),
'display_format': self.get_property('display_format'),
'enable_buffering': self.get_property('enable_buffering'),
'buffer_size': self.get_property('buffer_size'),
'flush_interval': self.get_property('flush_interval'),
'retry_on_error': self.get_property('retry_on_error'),
'max_retries': self.get_property('max_retries'),
'retry_delay': self.get_property('retry_delay')
}
def _parse_headers(self, headers_str: str) -> dict:
"""Parse API headers from string format."""
headers = {}
if not headers_str:
return headers
for line in headers_str.split('\\n'):
line = line.strip()
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
return headers
def get_supported_formats(self) -> list[str]:
"""Get list of supported output formats."""
return ['JSON', 'XML', 'CSV', 'Binary', 'MessagePack', 'YAML', 'Parquet']
def get_estimated_throughput(self) -> dict:
"""
Estimate output throughput capabilities.
Returns:
Dictionary with throughput information
"""
output_type = self.get_property('output_type')
format_type = self.get_property('format')
# Estimated throughput (items per second) for different output types
throughput_map = {
'File': {
'JSON': 1000,
'XML': 800,
'CSV': 2000,
'Binary': 5000,
'MessagePack': 3000,
'YAML': 600,
'Parquet': 1500
},
'API Endpoint': {
'JSON': 100,
'XML': 80,
'CSV': 120,
'Binary': 150
},
'Database': {
'JSON': 500,
'XML': 400,
'CSV': 800,
'Binary': 1200
},
'MQTT': {
'JSON': 2000,
'XML': 1500,
'CSV': 3000,
'Binary': 5000
},
'Display': {
'JSON': 100,
'XML': 80,
'CSV': 120,
'Binary': 150
},
'Console': {
'JSON': 50,
'XML': 40,
'CSV': 60,
'Binary': 80
}
}
base_throughput = throughput_map.get(output_type, {}).get(format_type, 100)
# Adjust for buffering
if self.get_property('enable_buffering'):
buffer_multiplier = 1.5
else:
buffer_multiplier = 1.0
return {
'estimated_throughput': base_throughput * buffer_multiplier,
'output_type': output_type,
'format': format_type,
'buffering_enabled': self.get_property('enable_buffering'),
'buffer_size': self.get_property('buffer_size')
}
def requires_network(self) -> bool:
"""Check if the current output type requires network connectivity."""
output_type = self.get_property('output_type')
return output_type in ['API Endpoint', 'Database', 'MQTT', 'WebSocket']
def supports_real_time(self) -> bool:
"""Check if the current output type supports real-time output."""
output_type = self.get_property('output_type')
return output_type in ['Display', 'Console', 'MQTT', 'WebSocket', 'API Endpoint']