mirror of
https://github.com/bbenchoff/OrthoRoute.git
synced 2026-01-11 14:01:27 +00:00
4003 lines
177 KiB
Python
4003 lines
177 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
OrthoRoute Main Window - PyQt6 GUI for PCB visualization and routing
|
||
New architecture implementation of the rich GUI functionality
|
||
"""
|
||
|
||
import sys
|
||
import logging
|
||
from typing import Dict, Any, Optional, List
|
||
import os
|
||
import time
|
||
from pathlib import Path
|
||
|
||
# Add debug logging capability
|
||
try:
|
||
sys.path.append(os.path.join(os.path.dirname(__file__), '../../../..'))
|
||
from debug_logger import get_debug_logger
|
||
DEBUG_LOGGING_AVAILABLE = True
|
||
except ImportError:
|
||
DEBUG_LOGGING_AVAILABLE = False
|
||
|
||
from PyQt6.QtWidgets import (
|
||
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout,
|
||
QLabel, QPushButton, QTextEdit, QTreeWidget, QTreeWidgetItem,
|
||
QSplitter, QGroupBox, QScrollArea, QTabWidget, QProgressBar,
|
||
QStatusBar, QMenuBar, QToolBar, QApplication, QMessageBox,
|
||
QCheckBox, QSpinBox, QDoubleSpinBox, QComboBox, QSlider,
|
||
QFrame, QSizePolicy, QLineEdit, QFileDialog
|
||
)
|
||
from PyQt6.QtCore import (
|
||
Qt, QTimer, QThread, pyqtSignal, QSize, QRect, QPoint, QPointF,
|
||
QRectF, QPropertyAnimation, QEasingCurve, pyqtSlot, QMutex
|
||
)
|
||
from PyQt6.QtGui import (
|
||
QPainter, QPen, QBrush, QColor, QFont, QPixmap, QPalette,
|
||
QAction, QIcon, QPolygonF, QTransform, QWheelEvent, QMouseEvent,
|
||
QPaintEvent, QResizeEvent, QImage
|
||
)
|
||
|
||
from .kicad_colors import KiCadColorScheme
|
||
from .pathfinder_stats_widget import PathFinderStatsWidget
|
||
from ...algorithms.manhattan.manhattan_router_rrg import ManhattanRRGRoutingEngine
|
||
from ...algorithms.manhattan.rrg import RoutingConfig
|
||
from ...infrastructure.gpu.cuda_provider import CUDAProvider
|
||
from ...infrastructure.gpu.cpu_fallback import CPUProvider
|
||
from ...infrastructure.serialization import (
|
||
export_pcb_to_orp, import_solution_from_ors,
|
||
derive_orp_filename, derive_ors_filename, get_solution_summary
|
||
)
|
||
from kipy.board_types import Track, Via, PadStack, BoardLayer, ViaType
|
||
from kipy.common_types import Vector2
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class RoutingThread(QThread):
|
||
"""Background thread for running routing operations."""
|
||
|
||
# Define signals for progress updates and completion
|
||
progress_update = pyqtSignal(int, int, str, list, list) # current, total, status, new_tracks, new_vias
|
||
routing_completed = pyqtSignal(dict) # Routing results
|
||
routing_error = pyqtSignal(str) # Error message
|
||
|
||
def __init__(self, algorithm, board_data, config, gpu_provider=None, plugin_router=None):
|
||
"""Initialize routing thread."""
|
||
super().__init__()
|
||
self.algorithm = algorithm
|
||
self.board_data = board_data
|
||
self.config = config
|
||
self.gpu_provider = gpu_provider
|
||
self.plugin_router = plugin_router # STEP 1: Accept plugin's UnifiedPathFinder instance
|
||
self.router = None
|
||
self.is_cancelled = False
|
||
|
||
def run(self):
|
||
"""Run the routing operation in a background thread."""
|
||
try:
|
||
# SURGICAL FIX STEP 1: Use plugin's UnifiedPathFinder instance, don't create second router
|
||
if self.algorithm == "Manhattan RRG":
|
||
# STEP 1a: Use plugin's UnifiedPathFinder instance if provided
|
||
if self.plugin_router is not None:
|
||
logger.info("[GUI-ROUTER-WIRE] Using plugin's UnifiedPathFinder instance (no second router)")
|
||
self.router = self.plugin_router
|
||
logger.info(f"[GUI-ROUTER-WIRE] Using existing {self.router.__class__.__name__} with instance_tag={getattr(self.router, '_instance_tag', 'NO_TAG')}")
|
||
else:
|
||
# STEP 1b: Fallback - create new instance with warning
|
||
logger.warning("[GUI-ROUTER-WIRE] No plugin router provided, creating new UnifiedPathFinder (split-brain risk)")
|
||
|
||
# Import UnifiedPathFinder and GPUConfig
|
||
from ...algorithms.manhattan.unified_pathfinder import UnifiedPathFinder, PathFinderConfig, GPUConfig
|
||
|
||
# Create PathFinder config with strict DRC
|
||
pf_config = PathFinderConfig()
|
||
pf_config.strict_drc = True
|
||
|
||
# Create UnifiedPathFinder instance (not legacy ManhattanRRG)
|
||
# GPU mode is controlled by GPUConfig.GPU_MODE (hardcoded, no env vars)
|
||
self.router = UnifiedPathFinder(config=pf_config, use_gpu=GPUConfig.GPU_MODE)
|
||
logger.info(f"[GUI-ROUTER-WIRE] Created new {self.router.__class__.__name__} with instance_tag={getattr(self.router, '_instance_tag', 'NO_TAG')}")
|
||
|
||
# STEP 1c: Add guard assert to prove UnifiedPathFinder is used
|
||
assert self.router.__class__.__name__ == 'UnifiedPathFinder', f"Expected UnifiedPathFinder, got {self.router.__class__.__name__}"
|
||
# STEP 1c: Convert board_data to domain Board object for UnifiedPathFinder
|
||
from ...domain.models.board import Board
|
||
|
||
# Convert board_data dict to domain Board object
|
||
mock_board = self._convert_board_data_to_domain(self.board_data, None)
|
||
|
||
# STEP 1d: Use same three-step UnifiedPathFinder calls as plugin
|
||
logger.info(f"[GUI-UPF-STEP1] initialize_graph with {len(mock_board.nets)} nets")
|
||
self.router.initialize_graph(mock_board)
|
||
|
||
logger.info(f"[GUI-UPF-STEP2] map_all_pads with {len([p for n in mock_board.nets for p in n.pads])} pads")
|
||
self.router.map_all_pads(mock_board)
|
||
|
||
logger.info(f"[GUI-UPF-STEP3] route_multiple_nets")
|
||
|
||
# STEP 2: Wire PathFinder stats to GUI - capture real metrics
|
||
import time
|
||
routing_start_time = time.time()
|
||
|
||
# Emit start signal for PathFinder stats
|
||
max_iterations = getattr(self.router.config, 'max_iterations', 8)
|
||
net_count = len(mock_board.nets)
|
||
self.progress_update.emit(0, net_count, f"Starting PathFinder negotiation ({max_iterations} iterations max)...", [], [])
|
||
|
||
# Route with UnifiedPathFinder and capture real timing
|
||
logger.info(f"[GUI-STATS] Starting PathFinder with {net_count} nets, {max_iterations} max iterations")
|
||
routing_result = self.router.route_multiple_nets(mock_board.nets)
|
||
routing_elapsed = time.time() - routing_start_time
|
||
|
||
if routing_result:
|
||
logger.info("[GUI-UPF-STEP4] emit_geometry")
|
||
tracks_count, vias_count = self.router.emit_geometry(mock_board)
|
||
|
||
# Get geometry payload
|
||
geom = self.router.get_geometry_payload()
|
||
|
||
# Convert to result format
|
||
result = {
|
||
'success': tracks_count > 0 or vias_count > 0,
|
||
'tracks': geom.tracks if geom else [],
|
||
'vias': geom.vias if geom else [],
|
||
'routed_nets': len([n for n in mock_board.nets if n.name != 'unconnected']),
|
||
'failed_nets': 0,
|
||
'stats': {
|
||
'elapsed_time': routing_elapsed, # REAL routing time
|
||
'total_length': tracks_count,
|
||
'total_vias': vias_count,
|
||
'success_rate': 1.0 if tracks_count > 0 else 0.0,
|
||
'iterations_used': max_iterations, # PathFinder iterations
|
||
'negotiation_time': routing_elapsed
|
||
}
|
||
}
|
||
logger.info(f"[GUI-UPF-SUCCESS] Generated {tracks_count} tracks, {vias_count} vias")
|
||
else:
|
||
result = {
|
||
'success': False,
|
||
'tracks': [],
|
||
'vias': [],
|
||
'routed_nets': 0,
|
||
'failed_nets': len(mock_board.nets),
|
||
'stats': {
|
||
'elapsed_time': routing_elapsed, # REAL routing time even on failure
|
||
'total_length': 0, 'total_vias': 0, 'success_rate': 0.0,
|
||
'iterations_used': 0, # Failed before negotiation
|
||
'negotiation_time': routing_elapsed
|
||
}
|
||
}
|
||
logger.error("[GUI-UPF-FAIL] UnifiedPathFinder routing failed")
|
||
|
||
if self.is_cancelled:
|
||
return
|
||
|
||
# Emit completion signal with results
|
||
self.routing_completed.emit(result)
|
||
|
||
else:
|
||
self.routing_error.emit(f"Unknown routing algorithm: {self.algorithm}")
|
||
|
||
except Exception as e:
|
||
self.routing_error.emit(f"Routing error: {str(e)}")
|
||
logger.exception("Error in routing thread")
|
||
|
||
def cancel(self):
|
||
"""Cancel the routing operation."""
|
||
self.is_cancelled = True
|
||
|
||
def _convert_board_data_to_domain(self, board_data, drc_constraints):
|
||
"""Convert board_data dict to domain Board object for RRG router"""
|
||
from ...domain.models.board import Board, Net, Pad, Bounds, Coordinate, Component
|
||
|
||
# Create board bounds
|
||
bounds_data = board_data.get('bounds', (0, 0, 100, 100))
|
||
board_bounds = Bounds(
|
||
min_x=bounds_data[0],
|
||
min_y=bounds_data[1],
|
||
max_x=bounds_data[2],
|
||
max_y=bounds_data[3]
|
||
)
|
||
|
||
# Convert nets and pads
|
||
nets = []
|
||
nets_data = board_data.get('nets', {})
|
||
|
||
for net_name, net_data in nets_data.items():
|
||
if not net_name or net_name.strip() == "":
|
||
continue
|
||
|
||
pads_data = net_data.get('pads', [])
|
||
if len(pads_data) < 2:
|
||
continue # Skip single-pad nets
|
||
|
||
# Convert pads
|
||
net_pads = []
|
||
for pad_data in pads_data:
|
||
pad = Pad(
|
||
id=f"{net_name}_pad_{len(net_pads)}",
|
||
component_id=f"comp_{net_name}_{len(net_pads)}",
|
||
net_id=f"net_{len(nets)}",
|
||
position=Coordinate(
|
||
x=pad_data.get('x', 0.0),
|
||
y=pad_data.get('y', 0.0)
|
||
),
|
||
size=(
|
||
pad_data.get('width', 1.0),
|
||
pad_data.get('height', 1.0)
|
||
),
|
||
drill_size=pad_data.get('drill', None),
|
||
layer=pad_data.get('layers', ['F.Cu'])[0] if pad_data.get('layers') else 'F.Cu'
|
||
)
|
||
net_pads.append(pad)
|
||
|
||
# Create net
|
||
net = Net(
|
||
id=f"net_{len(nets)}",
|
||
name=net_name,
|
||
pads=net_pads
|
||
)
|
||
nets.append(net)
|
||
|
||
# Create mock components for proper bounds calculation
|
||
components = []
|
||
all_pads = []
|
||
for net in nets:
|
||
for i, pad in enumerate(net.pads):
|
||
all_pads.append(pad)
|
||
|
||
# Create a single mock component containing all pads
|
||
if all_pads:
|
||
# Calculate center position
|
||
avg_x = sum(pad.position.x for pad in all_pads) / len(all_pads)
|
||
avg_y = sum(pad.position.y for pad in all_pads) / len(all_pads)
|
||
|
||
mock_component = Component(
|
||
id="mock_comp_1",
|
||
reference="U1",
|
||
value="MOCK",
|
||
footprint="MOCK_FP",
|
||
position=Coordinate(avg_x, avg_y),
|
||
pads=all_pads
|
||
)
|
||
components.append(mock_component)
|
||
|
||
# Create board - handle filename robustly
|
||
filename = board_data.get('filename') or board_data.get('name') or 'TestBackplane.kicad_pcb'
|
||
board = Board(
|
||
id="board_1",
|
||
name=filename,
|
||
components=components,
|
||
nets=nets,
|
||
layer_count=board_data.get('layers', 2) # Dynamic from KiCad file
|
||
)
|
||
|
||
# Store airwires as a custom attribute for RRG routing
|
||
board._airwires = board_data.get('airwires', [])
|
||
# Store KiCad-calculated bounds for accurate routing area
|
||
board._kicad_bounds = board_data.get('bounds', None)
|
||
|
||
return board
|
||
|
||
|
||
# PathFinder GUI Integration Methods
|
||
def _update_pathfinder_status(self, status_text: str):
|
||
"""Update status bar with PathFinder instrumentation metrics"""
|
||
self.status_label.setText(status_text)
|
||
self.metrics_label.setText(status_text)
|
||
self.metrics_label.setVisible(True)
|
||
|
||
# Also print to terminal for console monitoring
|
||
print(f"[PathFinder]: {status_text}")
|
||
|
||
def _update_csv_export_status(self, csv_status: str):
|
||
"""Update status bar with CSV export information"""
|
||
self.csv_status_label.setText(csv_status)
|
||
self.csv_status_label.setVisible(True)
|
||
|
||
# Auto-hide CSV status after 10 seconds
|
||
QTimer.singleShot(10000, lambda: self.csv_status_label.setVisible(False))
|
||
|
||
def _display_instrumentation_summary(self):
|
||
"""Display instrumentation summary when routing completes"""
|
||
if hasattr(self.router, 'pathfinder') and hasattr(self.router.pathfinder, 'get_instrumentation_summary'):
|
||
summary = self.router.pathfinder.get_instrumentation_summary()
|
||
|
||
if summary:
|
||
summary_text = (f"Session: {summary.get('session_id', 'N/A')} | "
|
||
f"Iterations: {summary.get('total_iterations', 0)} | "
|
||
f"Success: {summary.get('final_success_rate', 0):.1f}% | "
|
||
f"Nets: {summary.get('successful_nets', 0)}/{summary.get('total_nets_processed', 0)} | "
|
||
f"Avg Time: {summary.get('avg_routing_time_ms', 0):.1f}ms")
|
||
|
||
self.metrics_label.setText(summary_text)
|
||
self.metrics_label.setVisible(True)
|
||
|
||
print(f"[SUMMARY] Routing Summary: {summary_text}")
|
||
logger.info(f"Instrumentation summary: {summary}")
|
||
|
||
|
||
|
||
class PCBViewer(QWidget):
|
||
"""PCB visualization widget for displaying board, components, tracks, and airwires"""
|
||
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self.board_data = None
|
||
self.zoom_factor = 1.0
|
||
self.pan_x = 0.0
|
||
self.pan_y = 0.0
|
||
self.last_pan_point = QPoint()
|
||
self.is_panning = False
|
||
|
||
# Initialize KiCad color scheme
|
||
self.color_scheme = KiCadColorScheme()
|
||
|
||
# Display option flags
|
||
self.show_components = True
|
||
self.show_tracks = True
|
||
self.show_vias = True
|
||
self.show_pads = True
|
||
self.show_airwires = True
|
||
self.show_zones = True
|
||
self.show_keepouts = True
|
||
|
||
# Layer visibility tracking (will be updated when board loads with actual layer names)
|
||
self.visible_layers = set() # Start empty, will be populated from board_data['layer_names']
|
||
|
||
self.setMinimumSize(800, 600)
|
||
self.setMouseTracking(True)
|
||
|
||
def set_board_data(self, board_data: Dict[str, Any]):
|
||
"""Set the board data to display"""
|
||
self.board_data = board_data
|
||
|
||
# Initialize visible_layers from board data (all layers visible by default)
|
||
if board_data and 'layer_names' in board_data:
|
||
self.visible_layers = set(board_data['layer_names'])
|
||
logger.info(f"PCBViewer: Initialized {len(self.visible_layers)} visible layers from board data")
|
||
elif not self.visible_layers:
|
||
# Fallback to 2-layer board
|
||
self.visible_layers = set(['F.Cu', 'B.Cu'])
|
||
|
||
self.fit_to_view()
|
||
self.update()
|
||
|
||
def fit_to_view(self):
|
||
"""Fit the board to the current view"""
|
||
if not self.board_data:
|
||
return
|
||
|
||
# Get board bounds
|
||
bounds = self.board_data.get('bounds', (0, 0, 100, 100))
|
||
board_width = bounds[2] - bounds[0]
|
||
board_height = bounds[3] - bounds[1]
|
||
|
||
if board_width <= 0 or board_height <= 0:
|
||
return
|
||
|
||
# Calculate zoom to fit
|
||
widget_width = self.width() - 40
|
||
widget_height = self.height() - 40
|
||
|
||
zoom_x = widget_width / board_width
|
||
zoom_y = widget_height / board_height
|
||
|
||
self.zoom_factor = min(zoom_x, zoom_y) * 0.9
|
||
|
||
# Center the board
|
||
self.pan_x = (bounds[0] + bounds[2]) / 2
|
||
self.pan_y = (bounds[1] + bounds[3]) / 2
|
||
|
||
# Trigger repaint
|
||
self.update()
|
||
|
||
def paintEvent(self, event: QPaintEvent):
|
||
"""Paint the PCB visualization"""
|
||
painter = QPainter(self)
|
||
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
|
||
|
||
# Fill background with KiCad background color
|
||
painter.fillRect(self.rect(), self.color_scheme.get_color('background'))
|
||
|
||
if not self.board_data:
|
||
painter.setPen(self.color_scheme.get_color('text'))
|
||
painter.drawText(self.rect(), Qt.AlignmentFlag.AlignCenter, "No board data loaded")
|
||
return
|
||
|
||
# Set up coordinate transformation
|
||
painter.translate(self.width() / 2, self.height() / 2)
|
||
painter.scale(self.zoom_factor, self.zoom_factor)
|
||
painter.translate(-self.pan_x, -self.pan_y)
|
||
|
||
# Skip artificial board outline - real boards should use Edge.Cuts layer
|
||
# self._draw_board_outline(painter)
|
||
|
||
# Draw airwires (behind everything)
|
||
if self.show_airwires:
|
||
self._draw_airwires(painter)
|
||
|
||
# Draw tracks
|
||
if self.show_tracks:
|
||
self._draw_tracks(painter)
|
||
|
||
# Draw pads
|
||
if self.show_pads:
|
||
self._draw_pads(painter)
|
||
|
||
# Draw components
|
||
if self.show_components:
|
||
self._draw_components(painter)
|
||
|
||
# Draw vias
|
||
if self.show_vias:
|
||
self._draw_vias(painter)
|
||
|
||
def _draw_board_outline(self, painter: QPainter):
|
||
"""Draw the board outline"""
|
||
bounds = self.board_data.get('bounds', (0, 0, 100, 100))
|
||
|
||
painter.setPen(QPen(self.color_scheme.get_color('edge_cuts'), 0.5))
|
||
painter.setBrush(QBrush())
|
||
|
||
rect = QRectF(bounds[0], bounds[1], bounds[2] - bounds[0], bounds[3] - bounds[1])
|
||
painter.drawRect(rect)
|
||
|
||
def _draw_airwires(self, painter: QPainter):
|
||
"""Draw airwires (unrouted connections) with performance optimization"""
|
||
airwires = self.board_data.get('airwires', [])
|
||
|
||
# Performance limit: only show airwires when zoomed in enough or limit count
|
||
zoom_level = painter.transform().m11() # Get current zoom scale
|
||
max_airwires = min(len(airwires), int(1000 * zoom_level) if zoom_level > 0.1 else 200)
|
||
|
||
if max_airwires <= 0:
|
||
return
|
||
|
||
painter.setPen(QPen(self.color_scheme.get_color('ratsnest'), 0.1))
|
||
|
||
# Get viewport bounds for culling
|
||
viewport = painter.viewport()
|
||
|
||
# More robust viewport calculation
|
||
try:
|
||
transform_inverted, invertible = painter.transform().inverted()
|
||
if invertible:
|
||
visible_rect = transform_inverted.mapRect(QRectF(viewport))
|
||
else:
|
||
# Fallback: render everything if transform can't be inverted
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000) # Large fallback area
|
||
except:
|
||
# Fallback: render everything if viewport calculation fails
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000) # Large fallback area
|
||
|
||
drawn_count = 0
|
||
for i, airwire in enumerate(airwires):
|
||
if i >= max_airwires: # Hard limit for performance
|
||
break
|
||
|
||
try:
|
||
x1 = airwire['start_x']
|
||
y1 = airwire['start_y']
|
||
x2 = airwire['end_x']
|
||
y2 = airwire['end_y']
|
||
|
||
# Viewport culling: only draw if line intersects visible area (disabled for now - needs debugging)
|
||
# line_rect = QRectF(min(x1, x2), min(y1, y2), abs(x2-x1), abs(y2-y1))
|
||
# if visible_rect.intersects(line_rect):
|
||
painter.drawLine(QPointF(x1, y1), QPointF(x2, y2))
|
||
drawn_count += 1
|
||
|
||
except (KeyError, TypeError):
|
||
continue
|
||
|
||
# Debug log airwire rendering stats
|
||
if DEBUG_LOGGING_AVAILABLE and drawn_count > 0:
|
||
debug_logger = get_debug_logger()
|
||
render_counts = {
|
||
'airwires_drawn': drawn_count,
|
||
'max_airwires_allowed': max_airwires,
|
||
'total_airwires_available': len(airwires),
|
||
'zoom_level': zoom_level
|
||
}
|
||
debug_logger.log_visualization_data(render_counts, zoom_level)
|
||
|
||
def _draw_tracks(self, painter: QPainter):
|
||
"""Draw existing tracks/traces with performance optimization"""
|
||
tracks = self.board_data.get('tracks', [])
|
||
|
||
logger.info(f"_draw_tracks called: board_data has {len(tracks)} tracks")
|
||
if tracks:
|
||
logger.info(f"First track: {tracks[0]}")
|
||
|
||
if not tracks:
|
||
logger.info("No tracks to draw - returning early")
|
||
return
|
||
|
||
# CRITICAL PERFORMANCE FIX: Viewport culling and LOD
|
||
zoom_level = painter.transform().m11() # Get current zoom from transform matrix
|
||
|
||
# Calculate visible viewport in world coordinates
|
||
try:
|
||
transform = painter.transform().inverted()[0]
|
||
viewport_rect = painter.viewport()
|
||
visible_rect = transform.mapRect(QRectF(viewport_rect))
|
||
|
||
# Expand visible rect slightly for smooth panning
|
||
margin = max(visible_rect.width(), visible_rect.height()) * 0.1
|
||
visible_rect = visible_rect.adjusted(-margin, -margin, margin, margin)
|
||
|
||
logger.info(f"VIEWPORT DEBUG: visible_rect = ({visible_rect.left():.1f}, {visible_rect.top():.1f}) -> ({visible_rect.right():.1f}, {visible_rect.bottom():.1f})")
|
||
logger.info(f"VIEWPORT DEBUG: zoom_level = {zoom_level:.3f}")
|
||
except Exception as e:
|
||
# Fallback: render everything if transform fails
|
||
logger.warning(f"VIEWPORT DEBUG: Transform failed ({e}), using fallback viewport")
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000)
|
||
|
||
# Smart viewport culling for performance with large track counts
|
||
max_tracks_per_frame = 50000 # Reasonable limit per frame
|
||
min_width = 0.0001
|
||
|
||
# Helper function to get layer z-order (back to front rendering)
|
||
def layer_order_key(track):
|
||
layer = track.get('layer', 'F.Cu')
|
||
if isinstance(layer, int):
|
||
return layer # 0=F.Cu (front), higher numbers are deeper
|
||
elif layer == 'F.Cu':
|
||
return 999 # Draw last (on top)
|
||
elif layer == 'B.Cu':
|
||
return -1 # Draw first (on bottom)
|
||
elif layer.startswith('In'):
|
||
# Extract layer number from "In1.Cu", "In2.Cu", etc.
|
||
try:
|
||
layer_num = int(layer[2:].split('.')[0])
|
||
return layer_num # Internal layers in order
|
||
except:
|
||
return 0
|
||
return 0
|
||
|
||
# Sort tracks back-to-front (B.Cu, internal layers, F.Cu)
|
||
tracks_sorted = sorted(tracks, key=layer_order_key)
|
||
|
||
drawn_tracks = 0
|
||
culled_tracks = 0
|
||
|
||
for track in tracks_sorted:
|
||
if drawn_tracks >= max_tracks_per_frame:
|
||
culled_tracks += 1
|
||
continue
|
||
|
||
try:
|
||
# Handle both coordinate key formats - updated for new track structure
|
||
if 'start' in track and 'end' in track:
|
||
x1, y1 = track['start']
|
||
x2, y2 = track['end']
|
||
else:
|
||
x1 = track.get('start_x', track.get('x1'))
|
||
y1 = track.get('start_y', track.get('y1'))
|
||
x2 = track.get('end_x', track.get('x2'))
|
||
y2 = track.get('end_y', track.get('y2'))
|
||
|
||
width = track.get('width', 0.1)
|
||
|
||
# Viewport culling - only draw tracks that intersect with visible area
|
||
track_rect = QRectF(min(x1, x2), min(y1, y2), abs(x2 - x1), abs(y2 - y1))
|
||
if not visible_rect.intersects(track_rect.adjusted(-1, -1, 1, 1)): # Small margin for line width
|
||
culled_tracks += 1
|
||
continue
|
||
|
||
# DEBUG: Check for None coordinates
|
||
if any(coord is None for coord in [x1, y1, x2, y2]):
|
||
logger.warning(f"TRACK SKIPPED: None coordinates in {track}")
|
||
continue
|
||
|
||
# DEBUG: Log coordinate verification for first track
|
||
if drawn_tracks == 0:
|
||
bounds = self.board_data.get('bounds', None)
|
||
if bounds:
|
||
board_min_x, board_max_x = bounds[0], bounds[2]
|
||
board_min_y, board_max_y = bounds[1], bounds[3]
|
||
in_bounds_x = board_min_x <= x1 <= board_max_x and board_min_x <= x2 <= board_max_x
|
||
in_bounds_y = board_min_y <= y1 <= board_max_y and board_min_y <= y2 <= board_max_y
|
||
logger.info(f"COORD VERIFY: Track ({x1:.1f},{y1:.1f})->({x2:.1f},{y2:.1f}) in_bounds=({in_bounds_x},{in_bounds_y})")
|
||
logger.info(f"COORD VERIFY: Board bounds ({board_min_x:.1f},{board_min_y:.1f})->({board_max_x:.1f},{board_max_y:.1f})")
|
||
|
||
# Handle both layer formats: integer and string
|
||
layer_raw = track.get('layer', 0)
|
||
if isinstance(layer_raw, int):
|
||
# Convert integer layer to KiCad layer name
|
||
if layer_raw == 0:
|
||
layer = 'F.Cu' # Front copper
|
||
elif layer_raw == 1:
|
||
layer = 'B.Cu' # Back copper
|
||
else:
|
||
layer = f'In{layer_raw-1}.Cu' # Internal layers In1.Cu, In2.Cu, etc.
|
||
else:
|
||
layer = layer_raw
|
||
|
||
# TEMPORARY: Disable viewport culling to debug coordinate system
|
||
# PERFORMANCE: Skip tracks outside visible viewport
|
||
# FIXED: Use proper line-viewport intersection instead of rectangles!
|
||
line_start = QPointF(x1, y1)
|
||
line_end = QPointF(x2, y2)
|
||
|
||
# DEBUG: Log first few tracks to see coordinate ranges
|
||
if drawn_tracks < 5:
|
||
logger.info(f"TRACK COORDS #{drawn_tracks+1}: ({x1:.1f},{y1:.1f})->({x2:.1f},{y2:.1f}) layer={layer}")
|
||
|
||
# DISABLED: Skip viewport culling for now
|
||
# if (visible_rect.contains(line_start) or
|
||
# visible_rect.contains(line_end) or
|
||
# self._line_intersects_rect(x1, y1, x2, y2, visible_rect)):
|
||
# # Line is visible, continue to draw
|
||
# pass
|
||
# else:
|
||
# # Log why tracks are being culled for first few tracks
|
||
# if drawn_tracks < 3:
|
||
# logger.info(f"TRACK CULLED #{drawn_tracks+1}: track ({x1:.1f},{y1:.1f})->({x2:.1f},{y2:.1f}) outside viewport ({visible_rect.left():.1f},{visible_rect.top():.1f})->({visible_rect.right():.1f},{visible_rect.bottom():.1f})")
|
||
# continue
|
||
|
||
# ALWAYS SHOW TRACKS - No width threshold for production visibility
|
||
# User requirement: tracks/vias visible at every zoom level
|
||
|
||
# Skip if layer is not visible
|
||
if layer not in self.visible_layers:
|
||
# Log layer visibility issue for first few tracks
|
||
if drawn_tracks < 3:
|
||
logger.info(f"TRACK LAYER FILTERED #{drawn_tracks+1}: track layer '{layer}' not in visible_layers {list(self.visible_layers)}")
|
||
continue
|
||
|
||
# Set color based on layer
|
||
if layer == 'F.Cu':
|
||
color = self.color_scheme.get_color('copper_front')
|
||
elif layer == 'B.Cu':
|
||
color = self.color_scheme.get_color('copper_back')
|
||
else:
|
||
# Handle internal layers with distinct colors
|
||
# Parse "In1.Cu", "In2.Cu" etc to get the correct color
|
||
try:
|
||
if layer.startswith('In') and layer.endswith('.Cu'):
|
||
layer_num = int(layer[2:-3]) # Extract number from "In{N}.Cu"
|
||
color_key = f'copper_in{layer_num}'
|
||
color = self.color_scheme.get_color(color_key)
|
||
else:
|
||
color = self.color_scheme.get_color('copper_inner')
|
||
except (ValueError, AttributeError):
|
||
color = self.color_scheme.get_color('copper_inner')
|
||
|
||
# Use actual track dimensions (just like footprints are drawn)
|
||
# Convert mm to scene coordinates - same as footprints use
|
||
line_width = width # Use actual track width in mm
|
||
painter.setPen(QPen(color, line_width))
|
||
painter.drawLine(QPointF(x1, y1), QPointF(x2, y2))
|
||
drawn_tracks += 1
|
||
|
||
# Log first few tracks drawn
|
||
if drawn_tracks <= 3:
|
||
logger.info(f"TRACK DRAWN #{drawn_tracks}: ({x1:.3f},{y1:.3f}) -> ({x2:.3f},{y2:.3f}) width={line_width} layer={layer}")
|
||
|
||
except (KeyError, TypeError):
|
||
continue
|
||
|
||
logger.info(f"_draw_tracks completed: drew {drawn_tracks} tracks, culled {culled_tracks} (viewport + limit), total {len(tracks)} available")
|
||
|
||
def _line_intersects_rect(self, x1, y1, x2, y2, rect):
|
||
"""Check if a line intersects with a rectangle (simple bounding box check)"""
|
||
# Simple bounding box intersection - good enough for viewport culling
|
||
line_left = min(x1, x2)
|
||
line_right = max(x1, x2)
|
||
line_top = min(y1, y2)
|
||
line_bottom = max(y1, y2)
|
||
|
||
return not (line_right < rect.left() or
|
||
line_left > rect.right() or
|
||
line_bottom < rect.top() or
|
||
line_top > rect.bottom())
|
||
|
||
def _draw_pads(self, painter: QPainter):
|
||
"""Draw component pads with viewport culling and LOD optimization"""
|
||
pads = self.board_data.get('pads', [])
|
||
|
||
# Get zoom level and viewport for optimization
|
||
zoom_level = painter.transform().m11()
|
||
viewport = painter.viewport()
|
||
|
||
# More robust viewport calculation
|
||
try:
|
||
transform_inverted, invertible = painter.transform().inverted()
|
||
if invertible:
|
||
visible_rect = transform_inverted.mapRect(QRectF(viewport))
|
||
else:
|
||
# Fallback: render everything if transform can't be inverted
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000) # Large fallback area
|
||
except:
|
||
# Fallback: render everything if viewport calculation fails
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000) # Large fallback area
|
||
|
||
# LOD: Skip very small pads when zoomed out
|
||
min_pad_size = 1.0 / zoom_level if zoom_level > 0 else 0.1 # Minimum size in world units
|
||
|
||
drawn_pads = 0
|
||
max_pads = min(len(pads), 5000) if zoom_level < 0.1 else len(pads) # Limit when zoomed way out
|
||
|
||
for i, pad in enumerate(pads):
|
||
if i >= max_pads:
|
||
break
|
||
try:
|
||
x = pad['x']
|
||
y = pad['y']
|
||
width = pad.get('width', 1.0)
|
||
height = pad.get('height', 1.0)
|
||
|
||
# Viewport culling: skip pads outside visible area (disabled for now - needs debugging)
|
||
# pad_rect = QRectF(x - width/2, y - height/2, width, height)
|
||
# if not visible_rect.intersects(pad_rect):
|
||
# continue
|
||
|
||
# LOD culling: skip very small pads when zoomed out
|
||
if max(width, height) < min_pad_size:
|
||
continue
|
||
|
||
pad_type = pad.get('type', 'smd')
|
||
layers = pad.get('layers', ['F.Cu'])
|
||
drill_size = pad.get('drill', 0.0)
|
||
|
||
# Skip if none of the pad's layers are visible
|
||
if not any(layer in self.visible_layers for layer in layers):
|
||
continue
|
||
|
||
drawn_pads += 1
|
||
|
||
# Debug logging for visualization (every 100th frame to avoid spam)
|
||
if DEBUG_LOGGING_AVAILABLE and drawn_pads == 50: # Log at 50th pad to get sample
|
||
debug_logger = get_debug_logger()
|
||
render_counts = {
|
||
'pads_processed': i,
|
||
'pads_drawn': drawn_pads,
|
||
'max_pads': max_pads,
|
||
'zoom_level': zoom_level,
|
||
'min_pad_size': min_pad_size,
|
||
'total_pads_available': len(pads)
|
||
}
|
||
debug_logger.log_visualization_data(render_counts, zoom_level)
|
||
|
||
# Get appropriate pad color based on type and layer
|
||
if pad_type == 'smd':
|
||
# SMD pads use layer-specific colors
|
||
if any('B.' in layer for layer in layers):
|
||
pad_color = self.color_scheme.get_color('pad_back')
|
||
else:
|
||
pad_color = self.color_scheme.get_color('pad_front')
|
||
elif pad_type == 'through_hole':
|
||
pad_color = self.color_scheme.get_color('pad_through_hole')
|
||
else:
|
||
pad_color = self.color_scheme.get_color('pad_front')
|
||
|
||
# Draw pad based on type
|
||
painter.setPen(QPen(pad_color, 0.05))
|
||
painter.setBrush(QBrush(pad_color))
|
||
|
||
if pad_type == 'through_hole':
|
||
# Draw through-hole pads as circles
|
||
pad_size = max(width, height) # Use larger dimension for circular pad
|
||
pad_rect = QRectF(x - pad_size/2, y - pad_size/2, pad_size, pad_size)
|
||
painter.drawEllipse(pad_rect)
|
||
|
||
# Draw drill hole for through-hole pads
|
||
if drill_size > 0:
|
||
# Use gold color for plated holes (more realistic)
|
||
hole_color = QColor(255, 215, 0) # Gold color
|
||
|
||
# Draw circular hole with gold fill and no outline
|
||
painter.setPen(QPen(hole_color, 0)) # No outline
|
||
painter.setBrush(QBrush(hole_color)) # Gold fill
|
||
|
||
# Draw circular hole
|
||
hole_rect = QRectF(x - drill_size/2, y - drill_size/2, drill_size, drill_size)
|
||
painter.drawEllipse(hole_rect)
|
||
else:
|
||
# Draw SMD pads as rectangles
|
||
pad_rect = QRectF(x - width/2, y - height/2, width, height)
|
||
painter.drawRect(pad_rect)
|
||
|
||
except (KeyError, TypeError):
|
||
continue
|
||
|
||
def _draw_components(self, painter: QPainter):
|
||
"""Draw component outlines and reference designators"""
|
||
components = self.board_data.get('components', [])
|
||
|
||
# Use silkscreen color for component text
|
||
painter.setPen(QPen(self.color_scheme.get_color('f_silks'), 0.1))
|
||
painter.setBrush(QBrush())
|
||
|
||
font = QFont("Arial", max(1, int(2.0 / self.zoom_factor)))
|
||
painter.setFont(font)
|
||
|
||
for component in components:
|
||
try:
|
||
x = component['x']
|
||
y = component['y']
|
||
ref_des = component.get('ref_des', 'Unknown')
|
||
|
||
# Draw component reference designator
|
||
painter.drawText(QPointF(x, y), ref_des)
|
||
|
||
except (KeyError, TypeError, ValueError) as e:
|
||
# Skip invalid component data
|
||
continue
|
||
|
||
def _draw_vias(self, painter: QPainter):
|
||
"""Draw vias with proper coloring and visibility control"""
|
||
vias = self.board_data.get('vias', [])
|
||
|
||
if not vias:
|
||
logger.info("_draw_vias: No vias to draw")
|
||
return # No vias to draw
|
||
|
||
logger.info(f"_draw_vias called: board_data has {len(vias)} vias")
|
||
|
||
# Get zoom level for LOD optimization
|
||
zoom_level = painter.transform().m11()
|
||
|
||
# Get viewport bounds for culling
|
||
viewport = painter.viewport()
|
||
|
||
try:
|
||
transform_inverted, invertible = painter.transform().inverted()
|
||
if invertible:
|
||
visible_rect = transform_inverted.mapRect(QRectF(viewport))
|
||
else:
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000) # Large fallback area
|
||
except:
|
||
visible_rect = QRectF(-1000, -1000, 2000, 2000) # Large fallback area
|
||
|
||
# Helper function to get via z-order (internal vias drawn first, surface vias last)
|
||
def via_order_key(via):
|
||
start_layer = via.get('start_layer', via.get('from_layer', 'F.Cu'))
|
||
end_layer = via.get('end_layer', via.get('to_layer', 'B.Cu'))
|
||
|
||
# Convert to layer numbers
|
||
def layer_to_num(layer):
|
||
if layer == 'F.Cu':
|
||
return 0
|
||
elif layer == 'B.Cu':
|
||
return 999
|
||
elif layer.startswith('In'):
|
||
try:
|
||
return int(layer[2:].split('.')[0])
|
||
except:
|
||
return 50
|
||
return 50
|
||
|
||
# Vias are sorted by their deepest layer (higher = more internal)
|
||
return max(layer_to_num(start_layer), layer_to_num(end_layer))
|
||
|
||
# Sort vias back-to-front (internal vias first, surface vias last)
|
||
vias_sorted = sorted(vias, key=via_order_key, reverse=True)
|
||
|
||
# Draw vias
|
||
drawn_vias = 0
|
||
for via in vias_sorted:
|
||
try:
|
||
# Handle both coordinate formats
|
||
if 'position' in via:
|
||
x, y = via['position']
|
||
else:
|
||
x = via['x']
|
||
y = via['y']
|
||
|
||
# Skip if outside visible area
|
||
if not visible_rect.contains(QPointF(x, y)):
|
||
continue
|
||
|
||
# Determine via type and size
|
||
via_type = via.get('type', via.get('via_type', 'through'))
|
||
diameter = via.get('diameter', via.get('size', 0.25)) # Support both 'diameter' and 'size'
|
||
drill = via.get('drill', 0.3)
|
||
start_layer = via.get('start_layer', via.get('from_layer', 'F.Cu'))
|
||
end_layer = via.get('end_layer', via.get('to_layer', 'B.Cu'))
|
||
|
||
# ALWAYS SHOW VIAS - No LoD threshold for production visibility
|
||
# User requirement: tracks/vias visible at every zoom level
|
||
|
||
# Set color based on via type
|
||
if via_type == 'through':
|
||
color = self.color_scheme.get_color('via_through')
|
||
elif via_type in ['blind_buried', 'blind', 'buried']:
|
||
color = self.color_scheme.get_color('via_blind_buried')
|
||
elif via_type == 'micro':
|
||
color = self.color_scheme.get_color('via_micro')
|
||
else:
|
||
color = self.color_scheme.get_color('via_through') # Default
|
||
|
||
# Draw via ring
|
||
painter.setPen(QPen(color, 0.1))
|
||
painter.setBrush(QBrush(color))
|
||
|
||
# Draw via outer diameter
|
||
via_rect = QRectF(x - diameter/2, y - diameter/2, diameter, diameter)
|
||
painter.drawEllipse(via_rect)
|
||
|
||
# Draw via hole
|
||
hole_color = self.color_scheme.get_color('via_hole')
|
||
painter.setPen(QPen(hole_color, 0.1))
|
||
painter.setBrush(QBrush(hole_color))
|
||
|
||
hole_rect = QRectF(x - drill/2, y - drill/2, drill, drill)
|
||
painter.drawEllipse(hole_rect)
|
||
|
||
# Draw layer indicator for blind/buried vias
|
||
if via_type == 'blind_buried':
|
||
# Draw small indicators showing which layers this via connects
|
||
indicator_size = diameter / 4
|
||
painter.setPen(QPen(QColor(255, 255, 255), 0.1))
|
||
|
||
# This would be expanded for more sophisticated layer indicators
|
||
painter.drawText(QRectF(x + diameter/2, y - diameter/2,
|
||
indicator_size*2, indicator_size*2),
|
||
Qt.AlignmentFlag.AlignCenter,
|
||
f"{start_layer[0]}-{end_layer[0]}")
|
||
|
||
drawn_vias += 1
|
||
|
||
except (KeyError, TypeError) as e:
|
||
logger.warning(f"Invalid via data: {via}, error: {e}")
|
||
continue
|
||
|
||
logger.info(f"_draw_vias completed: drew {drawn_vias} vias out of {len(vias)} available")
|
||
|
||
def wheelEvent(self, event: QWheelEvent):
|
||
"""Handle mouse wheel for zooming"""
|
||
zoom_factor = 1.2 if event.angleDelta().y() > 0 else 1.0 / 1.2
|
||
self.zoom_factor *= zoom_factor
|
||
self.zoom_factor = max(0.01, min(100.0, self.zoom_factor))
|
||
self.update()
|
||
|
||
def mousePressEvent(self, event: QMouseEvent):
|
||
"""Handle mouse press for panning"""
|
||
if event.button() == Qt.MouseButton.LeftButton:
|
||
self.is_panning = True
|
||
self.last_pan_point = event.pos()
|
||
self.setCursor(Qt.CursorShape.ClosedHandCursor)
|
||
|
||
def mouseMoveEvent(self, event: QMouseEvent):
|
||
"""Handle mouse move for panning"""
|
||
if self.is_panning:
|
||
delta = event.pos() - self.last_pan_point
|
||
self.pan_x -= delta.x() / self.zoom_factor
|
||
self.pan_y -= delta.y() / self.zoom_factor
|
||
self.last_pan_point = event.pos()
|
||
self.update()
|
||
|
||
def mouseReleaseEvent(self, event: QMouseEvent):
|
||
"""Handle mouse release"""
|
||
if event.button() == Qt.MouseButton.LeftButton:
|
||
self.is_panning = False
|
||
self.setCursor(Qt.CursorShape.ArrowCursor)
|
||
|
||
def debug_screenshot(self, filename_prefix: str = "debug_routing", scale_factor: int = 1, output_dir: str = None):
|
||
"""Capture screenshot of the PCB viewer for debugging with optional high-res rendering"""
|
||
try:
|
||
import os
|
||
from datetime import datetime
|
||
|
||
# Determine output directory
|
||
if output_dir is None:
|
||
debug_dir = "debug_output"
|
||
os.makedirs(debug_dir, exist_ok=True)
|
||
else:
|
||
debug_dir = output_dir
|
||
os.makedirs(debug_dir, exist_ok=True)
|
||
|
||
# Generate filename with timestamp
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] # milliseconds
|
||
filename = f"{debug_dir}/{filename_prefix}_{timestamp}.png"
|
||
|
||
# Capture the widget at specified resolution
|
||
if scale_factor > 1:
|
||
# High-res rendering
|
||
widget_size = self.size()
|
||
scaled_size = widget_size * scale_factor
|
||
|
||
# Create high-res image
|
||
image = QImage(scaled_size, QImage.Format.Format_ARGB32)
|
||
image.fill(Qt.GlobalColor.transparent)
|
||
|
||
# Render widget to image with scaling
|
||
painter = QPainter(image)
|
||
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
|
||
painter.setRenderHint(QPainter.RenderHint.SmoothPixmapTransform)
|
||
painter.scale(scale_factor, scale_factor)
|
||
self.render(painter)
|
||
painter.end()
|
||
|
||
# Save the image
|
||
success = image.save(filename, "PNG")
|
||
else:
|
||
# Standard resolution
|
||
pixmap = self.grab()
|
||
success = pixmap.save(filename, "PNG")
|
||
|
||
if success:
|
||
print(f"DEBUG: Screenshot saved to {filename} (scale={scale_factor}x)")
|
||
return filename
|
||
else:
|
||
print(f"DEBUG: Failed to save screenshot to {filename}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"DEBUG: Screenshot error: {e}")
|
||
return None
|
||
|
||
def update_routing(self, tracks, vias):
|
||
"""Update the board data with new routing information"""
|
||
if tracks:
|
||
self.board_data['tracks'] = tracks
|
||
if vias:
|
||
self.board_data['vias'] = vias
|
||
# Trigger repaint
|
||
self.update()
|
||
|
||
|
||
class OrthoRouteMainWindow(QMainWindow):
|
||
"""Main OrthoRoute GUI window - faithful recreation of original interface"""
|
||
|
||
def __init__(self, board_data: Dict[str, Any], kicad_interface, plugin=None):
|
||
super().__init__()
|
||
self.board_data = board_data
|
||
self.kicad_interface = kicad_interface
|
||
self.plugin = plugin
|
||
self.pcb_viewer = None
|
||
self.algorithm_combo = None
|
||
self.display_checkboxes = {}
|
||
self.layer_actions = {}
|
||
self.route_preview_btn = None
|
||
self.commit_btn = None
|
||
self.rollback_btn = None
|
||
self.status_label = None
|
||
self.gpu_status = None
|
||
self.routing_result = None
|
||
|
||
# Initialize KiCad color scheme (needed for layers panel)
|
||
self.color_scheme = KiCadColorScheme()
|
||
|
||
# Initialize window
|
||
self.setWindowTitle("OrthoRoute - PCB Autorouter")
|
||
self.setMinimumSize(1200, 800)
|
||
self.resize(1800, 800)
|
||
|
||
# Detect GPU status
|
||
self.detect_gpu_status()
|
||
|
||
# Setup UI components
|
||
self.setup_ui()
|
||
self.setup_menus()
|
||
self.setup_status_bar()
|
||
|
||
# Load board data
|
||
self.load_board_data()
|
||
|
||
def detect_gpu_status(self):
|
||
"""Detect GPU capabilities for algorithm selection"""
|
||
try:
|
||
# Try to detect CUDA/GPU availability
|
||
import subprocess
|
||
result = subprocess.run(['nvidia-smi'], capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0:
|
||
self.gpu_status = {'available': True, 'can_use_gpu_routing': True}
|
||
logger.info("GPU detected and available for routing")
|
||
else:
|
||
self.gpu_status = {'available': False, 'can_use_gpu_routing': False}
|
||
logger.info("GPU not available, using CPU routing")
|
||
except:
|
||
self.gpu_status = {'available': False, 'can_use_gpu_routing': False}
|
||
logger.info("GPU detection failed, using CPU routing")
|
||
|
||
def setup_ui(self):
|
||
"""Setup the main UI layout - three panel design like original"""
|
||
central_widget = QWidget()
|
||
self.setCentralWidget(central_widget)
|
||
|
||
# Main layout
|
||
main_layout = QHBoxLayout(central_widget)
|
||
|
||
# Create splitter for resizable panels
|
||
splitter = QSplitter(Qt.Orientation.Horizontal)
|
||
main_layout.addWidget(splitter)
|
||
|
||
# Left panel for controls
|
||
left_panel = self.create_left_panel()
|
||
splitter.addWidget(left_panel)
|
||
|
||
# Center panel for PCB viewer
|
||
self.pcb_viewer = PCBViewer()
|
||
splitter.addWidget(self.pcb_viewer)
|
||
|
||
# Right panel for information
|
||
right_panel = self.create_right_panel()
|
||
splitter.addWidget(right_panel)
|
||
|
||
# Set splitter proportions (left:center:right = 300:800:300)
|
||
splitter.setSizes([300, 800, 300])
|
||
|
||
def create_left_panel(self) -> QWidget:
|
||
"""Create the left control panel - matching original layout"""
|
||
panel = QWidget()
|
||
layout = QVBoxLayout(panel)
|
||
|
||
# Display options group
|
||
display_group = QGroupBox("Display Options")
|
||
display_layout = QVBoxLayout(display_group)
|
||
|
||
self.display_checkboxes = {}
|
||
display_options = [
|
||
('Components', 'show_components'),
|
||
('Tracks', 'show_tracks'),
|
||
('Vias', 'show_vias'),
|
||
('Pads', 'show_pads'),
|
||
('Airwires', 'show_airwires'),
|
||
('Zones', 'show_zones'),
|
||
('Keepouts', 'show_keepouts')
|
||
]
|
||
|
||
for label, attr in display_options:
|
||
checkbox = QCheckBox(label)
|
||
checkbox.setChecked(True) # Default to showing all
|
||
checkbox.toggled.connect(lambda checked, attr=attr: self.toggle_display_option(attr, checked))
|
||
self.display_checkboxes[attr] = checkbox
|
||
display_layout.addWidget(checkbox)
|
||
|
||
layout.addWidget(display_group)
|
||
|
||
# Routing controls group
|
||
routing_group = QGroupBox("Routing Controls")
|
||
routing_layout = QVBoxLayout(routing_group)
|
||
|
||
# Algorithm selection
|
||
algorithm_layout = QHBoxLayout()
|
||
algorithm_layout.addWidget(QLabel("Algorithm:"))
|
||
self.algorithm_combo = QComboBox()
|
||
|
||
# Available algorithms - simplified to single best option
|
||
algorithm_options = [
|
||
"Manhattan RRG"
|
||
]
|
||
|
||
self.algorithm_combo.addItems(algorithm_options)
|
||
self.algorithm_combo.setCurrentIndex(0) # Only option
|
||
|
||
if self.gpu_status and self.gpu_status.get('can_use_gpu_routing'):
|
||
logger.info("All algorithms will use GPU acceleration with CPU fallback")
|
||
else:
|
||
logger.info("All algorithms will use CPU (GPU not available)")
|
||
|
||
# Connect algorithm change signal
|
||
self.algorithm_combo.currentTextChanged.connect(self.on_algorithm_changed)
|
||
|
||
algorithm_layout.addWidget(self.algorithm_combo)
|
||
routing_layout.addLayout(algorithm_layout)
|
||
|
||
# Batch Size Control
|
||
batch_layout = QHBoxLayout()
|
||
batch_layout.addWidget(QLabel("Batch Size:"))
|
||
self.batch_size_spinner = QSpinBox()
|
||
self.batch_size_spinner.setMinimum(1)
|
||
self.batch_size_spinner.setMaximum(500)
|
||
self.batch_size_spinner.setValue(50) # Default batch size
|
||
self.batch_size_spinner.setToolTip("Number of nets to route in each PathFinder iteration")
|
||
batch_layout.addWidget(self.batch_size_spinner)
|
||
routing_layout.addLayout(batch_layout)
|
||
|
||
# GPU acceleration checkbox (beta feature)
|
||
self.gpu_checkbox = QCheckBox("Enable GPU acceleration (beta)")
|
||
|
||
# GPU mode is hardcoded via GPUConfig (no env vars for KiCad plugin compatibility)
|
||
try:
|
||
from ...algorithms.manhattan.unified_pathfinder import GPUConfig
|
||
self.gpu_checkbox.setChecked(GPUConfig.GPU_MODE)
|
||
except ImportError:
|
||
self.gpu_checkbox.setChecked(False) # Fallback if import fails
|
||
|
||
self.gpu_checkbox.setToolTip("Enable CUDA GPU acceleration for routing (experimental)")
|
||
|
||
# Disable if no GPU detected
|
||
if hasattr(self, 'gpu_status') and not self.gpu_status.get('available', False):
|
||
self.gpu_checkbox.setEnabled(False)
|
||
self.gpu_checkbox.setToolTip("GPU not available on this system")
|
||
|
||
routing_layout.addWidget(self.gpu_checkbox)
|
||
|
||
# Main routing buttons
|
||
self.route_preview_btn = QPushButton("Begin Autorouting")
|
||
self.route_preview_btn.clicked.connect(self.begin_autorouting)
|
||
routing_layout.addWidget(self.route_preview_btn)
|
||
|
||
# Solution control buttons (initially disabled)
|
||
solution_layout = QHBoxLayout()
|
||
self.commit_btn = QPushButton("Apply to KiCad")
|
||
self.commit_btn.clicked.connect(self.commit_routes)
|
||
self.commit_btn.setEnabled(False)
|
||
|
||
self.rollback_btn = QPushButton("Discard")
|
||
self.rollback_btn.clicked.connect(self.rollback_routes)
|
||
self.rollback_btn.setEnabled(False)
|
||
|
||
self.replay_btn = QPushButton("Replay")
|
||
self.replay_btn.clicked.connect(self.replay_routing)
|
||
self.replay_btn.setEnabled(False)
|
||
self.replay_btn.setToolTip("Re-run the same routing with clean state")
|
||
|
||
solution_layout.addWidget(self.commit_btn)
|
||
solution_layout.addWidget(self.rollback_btn)
|
||
solution_layout.addWidget(self.replay_btn)
|
||
routing_layout.addLayout(solution_layout)
|
||
|
||
layout.addWidget(routing_group)
|
||
|
||
# Debug controls group
|
||
debug_group = QGroupBox("Debug Controls")
|
||
debug_layout = QVBoxLayout(debug_group)
|
||
|
||
# Focus net debugging
|
||
focus_net_layout = QHBoxLayout()
|
||
focus_net_layout.addWidget(QLabel("Focus Net:"))
|
||
self.focus_net_input = QLineEdit()
|
||
self.focus_net_input.setPlaceholderText("e.g. B06B14_000")
|
||
self.focus_net_input.returnPressed.connect(self.focus_on_net)
|
||
focus_net_layout.addWidget(self.focus_net_input)
|
||
|
||
self.focus_net_btn = QPushButton("Highlight")
|
||
self.focus_net_btn.clicked.connect(self.focus_on_net)
|
||
focus_net_layout.addWidget(self.focus_net_btn)
|
||
debug_layout.addLayout(focus_net_layout)
|
||
|
||
# Show pad stubs toggle - check environment variable
|
||
import os
|
||
show_stubs_default = os.getenv("ORTHO_SHOW_PORTALS", "1").lower() in ("1", "true", "yes")
|
||
self.show_pad_stubs_checkbox = QCheckBox("Show pad stubs")
|
||
self.show_pad_stubs_checkbox.setChecked(show_stubs_default)
|
||
self.show_pad_stubs_checkbox.toggled.connect(self.toggle_pad_stubs)
|
||
debug_layout.addWidget(self.show_pad_stubs_checkbox)
|
||
|
||
# Portal visualization for first 50 nets
|
||
self.show_portal_dots_checkbox = QCheckBox("Show portal dots (first 50 nets)")
|
||
self.show_portal_dots_checkbox.setChecked(False)
|
||
self.show_portal_dots_checkbox.toggled.connect(self.toggle_portal_dots)
|
||
debug_layout.addWidget(self.show_portal_dots_checkbox)
|
||
|
||
layout.addWidget(debug_group)
|
||
|
||
# Nets statistics group
|
||
nets_stats_group = QGroupBox("Nets Statistics")
|
||
nets_stats_layout = QVBoxLayout(nets_stats_group)
|
||
|
||
self.nets_stats_label = QLabel("Loading nets...")
|
||
nets_stats_layout.addWidget(self.nets_stats_label)
|
||
|
||
layout.addWidget(nets_stats_group)
|
||
|
||
# Progress group - Overuse history table
|
||
progress_group = QGroupBox("Progress")
|
||
progress_layout = QVBoxLayout(progress_group)
|
||
|
||
# Overuse table showing last 3 iterations
|
||
from PyQt6.QtGui import QFont
|
||
overuse_font = QFont("Courier New", 9) # Monospace for table alignment
|
||
self.overuse_table_label = QLabel("Waiting for routing to start...")
|
||
self.overuse_table_label.setFont(overuse_font)
|
||
self.overuse_table_label.setStyleSheet("""
|
||
QLabel {
|
||
color: #222;
|
||
background-color: #f5f5f5;
|
||
padding: 8px;
|
||
border: 1px solid #999;
|
||
border-radius: 4px;
|
||
}
|
||
""")
|
||
self.overuse_table_label.setWordWrap(False)
|
||
progress_layout.addWidget(self.overuse_table_label)
|
||
|
||
layout.addWidget(progress_group)
|
||
|
||
return panel
|
||
|
||
def create_right_panel(self) -> QWidget:
|
||
"""Create the right information panel with Board Info and Layers"""
|
||
panel = QWidget()
|
||
layout = QVBoxLayout(panel)
|
||
|
||
# Board information group (at top)
|
||
board_info_group = QGroupBox("Board Information")
|
||
board_info_layout = QVBoxLayout(board_info_group)
|
||
|
||
self.board_info_label = QLabel("Loading board information...")
|
||
board_info_layout.addWidget(self.board_info_label)
|
||
|
||
layout.addWidget(board_info_group)
|
||
|
||
# Layers visibility group (dynamic, scrollable, fills remaining space)
|
||
layers_group = QGroupBox("Layers")
|
||
layers_layout = QVBoxLayout(layers_group)
|
||
|
||
# Scrollable area for layer checkboxes
|
||
layers_scroll = QScrollArea()
|
||
layers_scroll.setWidgetResizable(True)
|
||
layers_scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
|
||
layers_scroll.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
|
||
layers_scroll.setMinimumHeight(200) # Ensure reasonable minimum height
|
||
|
||
# Style the scrollbar to be thicker and more visible
|
||
layers_scroll.setStyleSheet("""
|
||
QScrollArea {
|
||
border: none;
|
||
}
|
||
QScrollBar:vertical {
|
||
background: #2b2b2b;
|
||
width: 16px;
|
||
margin: 0px;
|
||
}
|
||
QScrollBar::handle:vertical {
|
||
background: #555555;
|
||
min-height: 30px;
|
||
border-radius: 8px;
|
||
}
|
||
QScrollBar::handle:vertical:hover {
|
||
background: #666666;
|
||
}
|
||
QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical {
|
||
height: 0px;
|
||
}
|
||
""")
|
||
|
||
# Container widget for checkboxes
|
||
self.layers_container = QWidget()
|
||
self.layers_container_layout = QVBoxLayout(self.layers_container)
|
||
self.layers_container_layout.setContentsMargins(5, 5, 5, 5)
|
||
self.layers_container_layout.setSpacing(2)
|
||
|
||
layers_scroll.setWidget(self.layers_container)
|
||
layers_layout.addWidget(layers_scroll)
|
||
|
||
layout.addWidget(layers_group, stretch=1) # Give it stretch factor to fill available space
|
||
|
||
# No addStretch() - let Layers panel expand to bottom
|
||
|
||
# Store reference to layers group for updating
|
||
self.layers_group = layers_group
|
||
self.layer_checkboxes = {}
|
||
|
||
# Create stub widgets that other code expects but hide them
|
||
self.nets_tree = QTreeWidget()
|
||
self.nets_tree.setVisible(False)
|
||
self.pathfinder_stats = PathFinderStatsWidget()
|
||
self.pathfinder_stats.setVisible(False)
|
||
self.routing_log = QTextEdit()
|
||
self.routing_log.setVisible(False)
|
||
|
||
return panel
|
||
|
||
def setup_status_bar(self):
|
||
"""Setup status bar with GPU status and instrumentation metrics"""
|
||
status_bar = QStatusBar()
|
||
self.setStatusBar(status_bar)
|
||
|
||
# Main status label for routing progress
|
||
self.status_label = QLabel("Ready")
|
||
status_bar.addWidget(self.status_label)
|
||
|
||
# Instrumentation metrics labels (initially hidden)
|
||
self.metrics_label = QLabel("")
|
||
self.metrics_label.setVisible(False)
|
||
status_bar.addWidget(self.metrics_label)
|
||
|
||
# CSV export status
|
||
self.csv_status_label = QLabel("")
|
||
self.csv_status_label.setVisible(False)
|
||
status_bar.addWidget(self.csv_status_label)
|
||
|
||
# GPU status indicator
|
||
gpu_text = "GPU Available" if self.gpu_status.get('available') else "CPU Only"
|
||
status_bar.addPermanentWidget(QLabel(f"{gpu_text} | OrthoRoute v1.0"))
|
||
|
||
def setup_menus(self):
|
||
"""Setup menu bar - matching original menus"""
|
||
menubar = self.menuBar()
|
||
|
||
# File menu
|
||
file_menu = menubar.addMenu("File")
|
||
|
||
refresh_action = QAction("Refresh Board", self)
|
||
refresh_action.triggered.connect(self.refresh_board)
|
||
file_menu.addAction(refresh_action)
|
||
|
||
file_menu.addSeparator()
|
||
|
||
# Checkpoint submenu
|
||
checkpoint_menu = file_menu.addMenu("Checkpoints")
|
||
|
||
save_checkpoint_action = QAction("Save Checkpoint Now", self)
|
||
save_checkpoint_action.setShortcut("Ctrl+S")
|
||
save_checkpoint_action.triggered.connect(self.save_checkpoint_manual)
|
||
checkpoint_menu.addAction(save_checkpoint_action)
|
||
|
||
load_checkpoint_action = QAction("Load Checkpoint...", self)
|
||
load_checkpoint_action.setShortcut("Ctrl+L")
|
||
load_checkpoint_action.triggered.connect(self.load_checkpoint_dialog)
|
||
checkpoint_menu.addAction(load_checkpoint_action)
|
||
|
||
resume_checkpoint_action = QAction("Resume from Latest", self)
|
||
resume_checkpoint_action.setShortcut("Ctrl+R")
|
||
resume_checkpoint_action.triggered.connect(self.resume_from_latest)
|
||
checkpoint_menu.addAction(resume_checkpoint_action)
|
||
|
||
checkpoint_menu.addSeparator()
|
||
|
||
self.auto_checkpoint_action = QAction("Auto-save Checkpoints", self)
|
||
self.auto_checkpoint_action.setCheckable(True)
|
||
self.auto_checkpoint_action.setChecked(True)
|
||
self.auto_checkpoint_action.triggered.connect(self.toggle_auto_checkpoint)
|
||
checkpoint_menu.addAction(self.auto_checkpoint_action)
|
||
|
||
file_menu.addSeparator()
|
||
|
||
# Cloud routing workflow menu items
|
||
self.export_pcb_action = QAction("Export PCB...", self)
|
||
self.export_pcb_action.setShortcut("Ctrl+E")
|
||
self.export_pcb_action.triggered.connect(self.export_pcb)
|
||
self.export_pcb_action.setEnabled(True) # Enabled when board is loaded
|
||
file_menu.addAction(self.export_pcb_action)
|
||
|
||
self.import_solution_action = QAction("Import Solution...", self)
|
||
self.import_solution_action.setShortcut("Ctrl+I")
|
||
self.import_solution_action.triggered.connect(self.import_solution)
|
||
self.import_solution_action.setEnabled(True)
|
||
file_menu.addAction(self.import_solution_action)
|
||
|
||
file_menu.addSeparator()
|
||
|
||
exit_action = QAction("Exit", self)
|
||
exit_action.triggered.connect(self.close)
|
||
file_menu.addAction(exit_action)
|
||
|
||
# View menu
|
||
view_menu = menubar.addMenu("View")
|
||
|
||
# Layer visibility submenu
|
||
self.layers_menu = view_menu.addMenu("Layers")
|
||
|
||
# Create layer visibility actions - will be updated when board loads
|
||
self.layer_actions = {}
|
||
|
||
view_menu.addSeparator()
|
||
|
||
fit_action = QAction("Fit to Window", self)
|
||
fit_action.setShortcut("Ctrl+0")
|
||
fit_action.triggered.connect(self.zoom_fit)
|
||
view_menu.addAction(fit_action)
|
||
|
||
# Tools menu
|
||
tools_menu = menubar.addMenu("Tools")
|
||
|
||
route_action = QAction("Auto Route All", self)
|
||
route_action.triggered.connect(self.auto_route_all)
|
||
tools_menu.addAction(route_action)
|
||
|
||
smoke_route_action = QAction("Quick Smoke Route", self)
|
||
smoke_route_action.triggered.connect(self.quick_smoke_route)
|
||
tools_menu.addAction(smoke_route_action)
|
||
|
||
tools_menu.addSeparator()
|
||
|
||
# Self-tests
|
||
lattice_test_action = QAction("Self-test: Lattice", self)
|
||
lattice_test_action.triggered.connect(self.self_test_lattice)
|
||
tools_menu.addAction(lattice_test_action)
|
||
|
||
tiny_route_test_action = QAction("Self-test: Tiny Route", self)
|
||
tiny_route_test_action.triggered.connect(self.self_test_tiny_route)
|
||
tools_menu.addAction(tiny_route_test_action)
|
||
|
||
def load_board_data(self):
|
||
"""Load and display the board data"""
|
||
if not self.board_data:
|
||
return
|
||
|
||
# Update board info
|
||
filename = self.board_data.get('filename', 'Unknown')
|
||
width = self.board_data.get('width', 0)
|
||
height = self.board_data.get('height', 0)
|
||
layers = self.board_data.get('layers', 0)
|
||
|
||
pads_count = len(self.board_data.get('pads', []))
|
||
nets_count = len(self.board_data.get('nets', {}))
|
||
components_count = len(self.board_data.get('components', []))
|
||
|
||
board_info = f"Board: {filename}\n"
|
||
board_info += f"Size: {width:.1f} × {height:.1f} mm\n"
|
||
board_info += f"Layers: {layers}\n"
|
||
board_info += f"Components: {components_count}\n"
|
||
board_info += f"Pads: {pads_count}\n"
|
||
board_info += f"Nets: {nets_count}"
|
||
|
||
self.board_info_label.setText(board_info)
|
||
|
||
# Load nets into tree
|
||
self.load_nets_tree()
|
||
|
||
# Set board data in PCB viewer
|
||
self.pcb_viewer.set_board_data(self.board_data)
|
||
|
||
# Fit to view after a short delay to ensure proper widget sizing
|
||
QTimer.singleShot(100, self.pcb_viewer.fit_to_view)
|
||
|
||
# Update layer visibility menu
|
||
self.update_layer_visibility_menu()
|
||
|
||
# Update status
|
||
self.status_label.setText(f"Loaded {nets_count} nets, {pads_count} pads")
|
||
|
||
logger.info("Board data loaded into GUI: %d components, %d tracks - starting progressive net processing", components_count, len(self.board_data.get('tracks', [])))
|
||
logger.info("Using %d airwires from KiCad interface (no regeneration needed)", len(self.board_data.get('airwires', [])))
|
||
logger.info("Displaying ALL %d nets (no performance limits)", nets_count)
|
||
|
||
def load_nets_tree(self):
|
||
"""Load nets into the tree widget"""
|
||
if not self.nets_tree:
|
||
return
|
||
|
||
self.nets_tree.clear()
|
||
|
||
nets = self.board_data.get('nets', {})
|
||
|
||
for net_name, net_data in nets.items():
|
||
if not net_name or net_name.strip() == "":
|
||
continue
|
||
|
||
pads = net_data.get('pads', [])
|
||
pad_count = len(pads)
|
||
|
||
if pad_count < 2:
|
||
continue # Skip single-pad nets
|
||
|
||
item = QTreeWidgetItem([net_name, str(pad_count), "Unrouted"])
|
||
self.nets_tree.addTopLevelItem(item)
|
||
|
||
def route_all_nets(self):
|
||
"""Route all nets"""
|
||
logger.info("Route All Nets button clicked")
|
||
self.status_label.setText("Routing all nets...")
|
||
|
||
# TODO: Implement actual routing logic
|
||
QMessageBox.information(self, "Routing", "Routing functionality not yet implemented")
|
||
|
||
self.status_label.setText("Ready")
|
||
|
||
def clear_routes(self):
|
||
"""Clear all existing routes"""
|
||
logger.info("Clear All Routes button clicked")
|
||
self.status_label.setText("Clearing routes...")
|
||
|
||
# TODO: Implement route clearing logic
|
||
QMessageBox.information(self, "Clear Routes", "Route clearing functionality not yet implemented")
|
||
|
||
self.status_label.setText("Ready")
|
||
|
||
def refresh_from_kicad(self):
|
||
"""Refresh board data from KiCad"""
|
||
logger.info("Refreshing board data from KiCad...")
|
||
self.status_label.setText("Refreshing from KiCad...")
|
||
|
||
try:
|
||
# Get fresh board data
|
||
new_board_data = self.kicad_interface.get_board_data()
|
||
|
||
if new_board_data:
|
||
self.board_data = new_board_data
|
||
self.load_board_data()
|
||
self.status_label.setText("Board data refreshed successfully")
|
||
logger.info("Board data refreshed from KiCad")
|
||
else:
|
||
self.status_label.setText("Failed to refresh board data")
|
||
QMessageBox.warning(self, "Refresh Failed", "Could not refresh board data from KiCad")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error refreshing board data: {e}")
|
||
self.status_label.setText("Refresh failed")
|
||
QMessageBox.critical(self, "Refresh Error", f"Error refreshing board data:\\n{e}")
|
||
|
||
# Additional methods for full original functionality
|
||
def update_layer_visibility_menu(self):
|
||
"""Update layer visibility menu and panel with actual board layers"""
|
||
if not hasattr(self, 'layers_menu'):
|
||
return
|
||
|
||
# Clear existing actions
|
||
self.layers_menu.clear()
|
||
self.layer_actions = {}
|
||
|
||
# Get layers from board data (dynamic from KiCad file)
|
||
layers = self.board_data.get('layer_names', ['F.Cu', 'B.Cu'])
|
||
|
||
# Update menu
|
||
for layer in layers:
|
||
action = QAction(layer, self)
|
||
action.setCheckable(True)
|
||
action.setChecked(True) # Default to visible
|
||
action.triggered.connect(lambda checked, l=layer: self.toggle_layer_visibility(l, checked))
|
||
self.layer_actions[layer] = action
|
||
self.layers_menu.addAction(action)
|
||
|
||
# Update layers panel checkboxes
|
||
self.update_layers_panel(layers)
|
||
|
||
def update_layers_panel(self, layers: list):
|
||
"""Populate the layers panel with checkboxes for each layer"""
|
||
if not hasattr(self, 'layers_container_layout'):
|
||
logger.warning(f"update_layers_panel: No layers_container_layout attribute")
|
||
return
|
||
|
||
logger.info(f"update_layers_panel: Creating checkboxes for {len(layers)} layers: {layers}")
|
||
|
||
# Clear existing checkboxes
|
||
while self.layers_container_layout.count():
|
||
child = self.layers_container_layout.takeAt(0)
|
||
if child.widget():
|
||
child.widget().deleteLater()
|
||
|
||
self.layer_checkboxes = {}
|
||
|
||
# Create checkbox for each layer with color indicator
|
||
for layer_name in layers:
|
||
# Create horizontal layout for checkbox + color indicator
|
||
layer_widget = QWidget()
|
||
layer_layout = QHBoxLayout(layer_widget)
|
||
layer_layout.setContentsMargins(0, 0, 0, 0)
|
||
layer_layout.setSpacing(8)
|
||
|
||
# Checkbox
|
||
checkbox = QCheckBox(layer_name)
|
||
checkbox.setChecked(True) # Default to visible
|
||
checkbox.toggled.connect(lambda checked, l=layer_name: self.toggle_layer_visibility(l, checked))
|
||
|
||
# Color indicator square
|
||
color = self.color_scheme.get_layer_color(layer_name)
|
||
color_label = QLabel(" ")
|
||
color_label.setStyleSheet(f"background-color: {color.name()}; border: 1px solid #888; min-width: 20px; min-height: 16px; max-width: 20px; max-height: 16px;")
|
||
color_label.setToolTip(f"Layer color: {color.name()}")
|
||
|
||
layer_layout.addWidget(color_label)
|
||
layer_layout.addWidget(checkbox)
|
||
layer_layout.addStretch()
|
||
|
||
self.layers_container_layout.addWidget(layer_widget)
|
||
self.layer_checkboxes[layer_name] = checkbox
|
||
|
||
# Add stretch at bottom
|
||
self.layers_container_layout.addStretch()
|
||
|
||
# Event handler methods
|
||
def on_algorithm_changed(self, algorithm_text: str):
|
||
"""Handle algorithm selection change"""
|
||
logger.info(f"INFO: Algorithm changed to: {algorithm_text}")
|
||
self.status_label.setText(f"Selected algorithm: {algorithm_text}")
|
||
|
||
def toggle_display_option(self, option: str, checked: bool):
|
||
"""Handle display option checkbox changes"""
|
||
logger.info(f"Display option {option}: {'enabled' if checked else 'disabled'}")
|
||
# Update PCB viewer display settings
|
||
if self.pcb_viewer:
|
||
setattr(self.pcb_viewer, option, checked)
|
||
self.pcb_viewer.update()
|
||
|
||
def toggle_layer_visibility(self, layer: str, checked: bool):
|
||
"""Handle layer visibility changes - sync between menu, panel, and viewer"""
|
||
logger.info(f"Layer {layer}: {'visible' if checked else 'hidden'}")
|
||
|
||
# Update PCB viewer layer visibility
|
||
if self.pcb_viewer:
|
||
if checked:
|
||
self.pcb_viewer.visible_layers.add(layer)
|
||
else:
|
||
self.pcb_viewer.visible_layers.discard(layer)
|
||
self.pcb_viewer.update()
|
||
|
||
# Sync menu action state (if exists)
|
||
if hasattr(self, 'layer_actions') and layer in self.layer_actions:
|
||
self.layer_actions[layer].blockSignals(True)
|
||
self.layer_actions[layer].setChecked(checked)
|
||
self.layer_actions[layer].blockSignals(False)
|
||
|
||
# Sync panel checkbox state (if exists)
|
||
if hasattr(self, 'layer_checkboxes') and layer in self.layer_checkboxes:
|
||
self.layer_checkboxes[layer].blockSignals(True)
|
||
self.layer_checkboxes[layer].setChecked(checked)
|
||
self.layer_checkboxes[layer].blockSignals(False)
|
||
|
||
def focus_on_net(self):
|
||
"""Focus on specific net for debugging"""
|
||
net_id = self.focus_net_input.text().strip()
|
||
if not net_id:
|
||
return
|
||
|
||
logger.info(f"[GUI-DEBUG] Focusing on net: {net_id}")
|
||
|
||
# Update PCB viewer to highlight the focused net
|
||
if self.pcb_viewer:
|
||
# Set focused net in viewer
|
||
self.pcb_viewer.focused_net = net_id
|
||
|
||
# Clear previous highlights and highlight this net
|
||
if hasattr(self.pcb_viewer, 'highlight_net'):
|
||
self.pcb_viewer.highlight_net(net_id)
|
||
self.pcb_viewer.update()
|
||
|
||
# Log debug info about this net if available
|
||
if hasattr(self.pcb_viewer, 'board_data') and self.pcb_viewer.board_data:
|
||
# Look for tracks for this net
|
||
tracks_for_net = []
|
||
if 'tracks' in self.pcb_viewer.board_data:
|
||
tracks_for_net = [t for t in self.pcb_viewer.board_data['tracks'] if t.get('net_id') == net_id]
|
||
|
||
# Look for stubs for this net
|
||
stubs_for_net = []
|
||
if hasattr(self.pcb_viewer, 'stub_tracks'):
|
||
stubs_for_net = [s for s in self.pcb_viewer.stub_tracks if s.get('net_id') == net_id]
|
||
|
||
logger.info(f"[GUI-DEBUG] Net {net_id}: {len(tracks_for_net)} tracks, {len(stubs_for_net)} stubs")
|
||
|
||
# Log first few track coordinates for debugging
|
||
for i, track in enumerate(tracks_for_net[:3]):
|
||
logger.info(f"[GUI-DEBUG] Track {i}: start={track.get('start')}, end={track.get('end')}, layer={track.get('layer')}")
|
||
|
||
def toggle_pad_stubs(self, checked: bool):
|
||
"""Toggle visibility of pad connection stubs"""
|
||
logger.info(f"[GUI-DEBUG] Pad stubs: {'visible' if checked else 'hidden'}")
|
||
|
||
# Update PCB viewer stub visibility
|
||
if self.pcb_viewer:
|
||
self.pcb_viewer.show_pad_stubs = checked
|
||
self.pcb_viewer.update()
|
||
|
||
def toggle_portal_dots(self, checked: bool):
|
||
"""Toggle visibility of portal dots for first 50 nets"""
|
||
logger.info(f"[GUI-DEBUG] Portal dots: {'visible' if checked else 'hidden'}")
|
||
|
||
# Update PCB viewer portal visualization
|
||
if self.pcb_viewer:
|
||
self.pcb_viewer.show_portal_dots = checked
|
||
self.pcb_viewer.update()
|
||
|
||
# Routing control methods
|
||
def begin_autorouting(self):
|
||
"""Begin autorouting with unified pipeline"""
|
||
if not self.plugin:
|
||
QMessageBox.critical(self, "Plugin Error", "No plugin instance available")
|
||
return
|
||
|
||
algorithm_text = self.algorithm_combo.currentText()
|
||
logger.info(f"Begin autorouting with {algorithm_text}")
|
||
|
||
# Set GPU environment variable based on checkbox
|
||
import os
|
||
gpu_enabled = self.gpu_checkbox.isChecked()
|
||
os.environ['ORTHO_GPU'] = '1' if gpu_enabled else '0'
|
||
gpu_mode = "GPU (beta)" if gpu_enabled else "CPU (safe default)"
|
||
self.log_to_gui(f"[GPU] Using {gpu_mode} acceleration", "INFO")
|
||
|
||
# Clear previous log and start fresh
|
||
self.clear_routing_log()
|
||
self.log_to_gui(f"[START] Starting unified autorouting pipeline", "SUCCESS")
|
||
|
||
# Create timestamped run folder for screenshots
|
||
import os
|
||
from datetime import datetime
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
run_folder = f"debug_output/run_{timestamp}"
|
||
os.makedirs(run_folder, exist_ok=True)
|
||
self.log_to_gui(f"📁 Created screenshot folder: {run_folder}", "DEBUG")
|
||
|
||
# Store run folder for later use
|
||
self._current_run_folder = run_folder
|
||
|
||
self._set_ui_busy(True, "Autorouting…")
|
||
|
||
# Take initial screenshots synchronously before routing starts
|
||
self._take_initial_screenshots()
|
||
|
||
# Start routing immediately after screenshots
|
||
self._continue_autorouting()
|
||
|
||
def _take_initial_screenshots(self):
|
||
"""Take screenshots 1 & 2 synchronously at the start of routing"""
|
||
if not self.pcb_viewer:
|
||
return
|
||
|
||
# Screenshot 1: Board with airwires
|
||
self.pcb_viewer.show_airwires = True
|
||
self.pcb_viewer.fit_to_view()
|
||
self.pcb_viewer.update()
|
||
QApplication.processEvents() # Force render
|
||
self.pcb_viewer.debug_screenshot("01_board_with_airwires", scale_factor=8, output_dir=self._current_run_folder)
|
||
self.log_to_gui("📸 Screenshot 1/3: Board with airwires (8x)", "DEBUG")
|
||
|
||
# Screenshot 2: Board without airwires or escapes
|
||
self.pcb_viewer.show_airwires = False
|
||
|
||
# Temporarily clear tracks and vias for clean board screenshot
|
||
old_tracks = self.board_data.get('tracks', [])
|
||
old_vias = self.board_data.get('vias', [])
|
||
self.board_data['tracks'] = []
|
||
self.board_data['vias'] = []
|
||
|
||
if hasattr(self.pcb_viewer, 'update_routing'):
|
||
self.pcb_viewer.update_routing([], [])
|
||
|
||
self.pcb_viewer.update()
|
||
QApplication.processEvents() # Force render
|
||
self.pcb_viewer.debug_screenshot("02_board_no_airwires", scale_factor=8, output_dir=self._current_run_folder)
|
||
self.log_to_gui("📸 Screenshot 2/3: Board without airwires or escapes (8x)", "DEBUG")
|
||
|
||
# Restore tracks/vias
|
||
self.board_data['tracks'] = old_tracks
|
||
self.board_data['vias'] = old_vias
|
||
if hasattr(self.pcb_viewer, 'update_routing'):
|
||
self.pcb_viewer.update_routing(old_tracks, old_vias)
|
||
self.pcb_viewer.update()
|
||
|
||
def _continue_autorouting(self):
|
||
"""Continue autorouting after screenshots are taken"""
|
||
try:
|
||
# Get pathfinder and board from plugin
|
||
pf = self.plugin.get_pathfinder()
|
||
board = self._create_board_from_data()
|
||
|
||
# NOTE: IterationMetricsLogger not available in 22eb7db baseline
|
||
# Metrics logging functionality removed during rollback to 22eb7db
|
||
# PathFinder will run without metrics logger (basic logging still works)
|
||
|
||
self.log_to_gui(f"[GUI] Starting unified pipeline with pf={pf._instance_tag}", "INFO")
|
||
|
||
# 1) Build lattice + CSR + preflight
|
||
self.log_to_gui("[PIPELINE] Step 1: Building lattice & CSR...", "INFO")
|
||
pf.initialize_graph(board)
|
||
self.log_to_gui("✓ Lattice initialization complete", "SUCCESS")
|
||
|
||
self.log_to_gui("[PIPELINE] Step 2: Mapping pads to lattice...", "INFO")
|
||
pf.map_all_pads(board)
|
||
|
||
# PORTAL STATUS: Check new portal system (from pad_escape_planner)
|
||
# The new system uses pf.portals dict (pad_id -> Portal), not the old _pad_portal_map
|
||
portal_count = len(getattr(pf, "portals", {}))
|
||
logger.info("[PORTAL] Pre-computed portals: %d (from pad_escape_planner)", portal_count)
|
||
|
||
# Note: Portals will be populated after precompute_all_pad_escapes() is called below
|
||
# This check just verifies the portal system is available
|
||
|
||
self.log_to_gui("✓ Pad mapping complete", "SUCCESS")
|
||
|
||
# PRECOMPUTE PAD ESCAPES (for debugging, before routing)
|
||
self.log_to_gui("[DEBUG] Precomputing pad escapes...", "INFO")
|
||
|
||
# Attach GUI pad data to board for DRC
|
||
board._gui_pads = self.board_data.get('pads', [])
|
||
logger.info(f"Attached {len(board._gui_pads)} GUI pads to board for DRC")
|
||
|
||
escape_tracks, escape_vias = pf.precompute_all_pad_escapes(board)
|
||
|
||
# Push escape geometry to preview
|
||
if escape_tracks or escape_vias:
|
||
if 'tracks' not in self.board_data:
|
||
self.board_data['tracks'] = []
|
||
if 'vias' not in self.board_data:
|
||
self.board_data['vias'] = []
|
||
|
||
self.board_data['tracks'].extend(escape_tracks)
|
||
self.board_data['vias'].extend(escape_vias)
|
||
|
||
if hasattr(self.pcb_viewer, 'update_routing'):
|
||
self.pcb_viewer.update_routing(self.board_data.get('tracks', []),
|
||
self.board_data.get('vias', []))
|
||
self.pcb_viewer.update()
|
||
|
||
self.log_to_gui(f"✓ Precomputed {len(escape_tracks)} escape stubs, {len(escape_vias)} vias", "SUCCESS")
|
||
|
||
# Screenshot 3: Board with escape planning (no airwires) - taken synchronously
|
||
self.pcb_viewer.show_airwires = False
|
||
self.pcb_viewer.fit_to_view()
|
||
self.pcb_viewer.update()
|
||
QApplication.processEvents() # Force render
|
||
self.pcb_viewer.debug_screenshot("03_board_with_escapes", scale_factor=8, output_dir=self._current_run_folder)
|
||
self.log_to_gui("📸 Screenshot 3/3: Board with escape planning (8x)", "SUCCESS")
|
||
|
||
# Re-enable airwires for normal viewing
|
||
self.pcb_viewer.show_airwires = True
|
||
self.pcb_viewer.update()
|
||
|
||
self.log_to_gui("[PIPELINE] Step 3: Preparing routing runtime...", "INFO")
|
||
pf.prepare_routing_runtime()
|
||
self.log_to_gui("✓ Runtime preparation complete", "SUCCESS")
|
||
|
||
# 2) Route nets with GUI progress callback
|
||
self.log_to_gui("[PIPELINE] Step 4: Routing nets...", "INFO")
|
||
def progress_cb(done, total, eta_s, *_, **__):
|
||
# Net routing progress (not used for overuse display)
|
||
pass
|
||
|
||
# Store count of escape geometry before routing starts
|
||
escape_track_count = len([t for t in self.board_data.get('tracks', []) if t])
|
||
escape_via_count = len([v for v in self.board_data.get('vias', []) if v])
|
||
escape_tracks_list = self.board_data.get('tracks', [])[:escape_track_count]
|
||
escape_vias_list = self.board_data.get('vias', [])[:escape_via_count]
|
||
|
||
# Track overuse history for last 3 iterations
|
||
overuse_history = []
|
||
|
||
def iteration_cb(iteration, routing_tracks, routing_vias, overuse_sum=0, overuse_edges=0):
|
||
"""Iteration callback: update board_data, capture screenshot, and update progress with overuse info"""
|
||
try:
|
||
# Track overuse for display in progress bar
|
||
overuse_history.append((iteration, overuse_sum, overuse_edges))
|
||
if len(overuse_history) > 3:
|
||
overuse_history.pop(0) # Keep only last 3
|
||
|
||
# Update overuse table with last 3 iterations
|
||
if len(overuse_history) > 0:
|
||
# Build ASCII table
|
||
lines = []
|
||
lines.append("┌─────────┬───────────┬─────────┐")
|
||
lines.append("│ Iter │ Overuse │ Edges │")
|
||
lines.append("├─────────┼───────────┼─────────┤")
|
||
|
||
for iter_num, over_sum, over_edges in overuse_history:
|
||
# Format with proper spacing for alignment
|
||
iter_str = f"{iter_num:>5}"
|
||
over_str = f"{over_sum:>9,}" if over_sum > 0 else f"{'0':>9}"
|
||
edges_str = f"{over_edges:>7,}" if over_edges > 0 else f"{'0':>7}"
|
||
lines.append(f"│ {iter_str} │ {over_str} │ {edges_str} │")
|
||
|
||
lines.append("└─────────┴───────────┴─────────┘")
|
||
|
||
table_text = "\n".join(lines)
|
||
self.overuse_table_label.setText(table_text)
|
||
|
||
# Update PathFinder stats widget with overuse data
|
||
if hasattr(self, 'pathfinder_stats') and self.pathfinder_stats:
|
||
max_iters = getattr(pf.config, 'max_iterations', 40)
|
||
self.pathfinder_stats.update_iteration(iteration, max_iters, overuse_sum, overuse_edges)
|
||
|
||
# Combine escape geometry with provisional routing geometry
|
||
self.board_data['tracks'] = escape_tracks_list + routing_tracks
|
||
self.board_data['vias'] = escape_vias_list + routing_vias
|
||
|
||
# Update viewer
|
||
if hasattr(self.pcb_viewer, 'update_routing'):
|
||
self.pcb_viewer.update_routing(self.board_data['tracks'], self.board_data['vias'])
|
||
self.pcb_viewer.update()
|
||
QApplication.processEvents()
|
||
|
||
# Memory-efficient screenshot controls
|
||
import os
|
||
disable_screenshots = os.environ.get('ORTHO_NO_SCREENSHOTS', '0') == '1'
|
||
screenshot_freq = int(os.environ.get('ORTHO_SCREENSHOT_FREQ', '1'))
|
||
screenshot_scale = int(os.environ.get('ORTHO_SCREENSHOT_SCALE', '8'))
|
||
|
||
# Only capture screenshots if enabled and at appropriate frequency
|
||
if not disable_screenshots and (iteration % screenshot_freq == 0):
|
||
screenshot_name = f"{iteration+3:02d}_iteration_{iteration:02d}"
|
||
self.pcb_viewer.show_airwires = False # Hide airwires for clarity
|
||
self.pcb_viewer.fit_to_view()
|
||
self.pcb_viewer.update()
|
||
QApplication.processEvents()
|
||
self.pcb_viewer.debug_screenshot(screenshot_name, scale_factor=screenshot_scale, output_dir=self._current_run_folder)
|
||
logger.info(f"📸 Iteration {iteration}: screenshot captured ({len(routing_tracks)} routing tracks, {len(routing_vias)} routing vias)")
|
||
else:
|
||
logger.info(f"⏩ Iteration {iteration}: {len(routing_tracks)} routing tracks, {len(routing_vias)} routing vias (screenshot skipped)")
|
||
except Exception as e:
|
||
logger.warning(f"Iteration callback failed: {e}")
|
||
|
||
# Capture routing result to check for convergence
|
||
routing_result = pf.route_multiple_nets(board.nets, progress_cb=progress_cb, iteration_cb=iteration_cb)
|
||
self.log_to_gui("✓ Net routing complete", "SUCCESS")
|
||
|
||
# 3) Emit geometry -> update viewer
|
||
tracks, vias = pf.emit_geometry(board)
|
||
geom = pf.get_geometry_payload()
|
||
self.log_to_gui(f"[GUI] Emitting geometry: tracks={tracks} vias={vias}", "SUCCESS")
|
||
|
||
if hasattr(self, 'pcb_viewer') and self.pcb_viewer:
|
||
self.pcb_viewer.update_routing(geom.tracks, geom.vias)
|
||
|
||
# 4) Check convergence status and show appropriate dialog
|
||
showed_convergence_dialog = False
|
||
if routing_result and not routing_result.get('success', True):
|
||
# UNCONVERGENCE: Show detailed failure dialog
|
||
failed_nets = routing_result.get('failed_nets', 0)
|
||
total_nets = len(board.nets)
|
||
fail_percentage = (failed_nets / total_nets * 100) if total_nets > 0 else 0
|
||
overuse_edges = routing_result.get('overuse_edges', 0)
|
||
overuse_sum = routing_result.get('overuse_sum', 0)
|
||
layer_rec = routing_result.get('layer_recommendation', {})
|
||
|
||
# Build dialog message
|
||
dialog_msg = f"Your board did not converge because there are too few routing layers.\n\n"
|
||
dialog_msg += f"ROUTING INCOMPLETE: {failed_nets}/{total_nets} nets failed ({fail_percentage:.1f}%)\n"
|
||
dialog_msg += f" Overuse: {overuse_edges} edges with {overuse_sum} total conflicts\n\n"
|
||
|
||
if layer_rec.get('needs_more', False):
|
||
dialog_msg += f" RECOMMENDATION: Add {layer_rec.get('additional', 0)} more layers "
|
||
dialog_msg += f"(→{layer_rec.get('recommended_total', 0)} total)\n\n"
|
||
dialog_msg += f" Reason: {layer_rec.get('reason', 'Insufficient routing capacity')}"
|
||
else:
|
||
dialog_msg += f" Note: Current layer count appears adequate.\n"
|
||
dialog_msg += f" Convergence may improve with tuning or may have reached practical limit."
|
||
|
||
QMessageBox.warning(
|
||
self,
|
||
"Unconvergence Alert",
|
||
dialog_msg
|
||
)
|
||
showed_convergence_dialog = True
|
||
elif tracks > 0 or vias > 0:
|
||
# SUCCESS: Show convergence success dialog
|
||
QMessageBox.information(
|
||
self,
|
||
"Converged!",
|
||
f"Routing completed successfully!\n\n"
|
||
f"Results:\n"
|
||
f" • {tracks} tracks placed\n"
|
||
f" • {vias} vias placed\n\n"
|
||
f"All nets routed with zero overuse."
|
||
)
|
||
showed_convergence_dialog = True
|
||
|
||
# 5) Handle no copper emitted case (different from unconvergence)
|
||
# Only show this if we didn't already explain the problem with an unconvergence dialog
|
||
if tracks == 0 and vias == 0 and not showed_convergence_dialog:
|
||
self.log_to_gui("⚠️ No copper emitted - analyzing reasons...", "WARNING")
|
||
|
||
# Analyze why no copper was generated
|
||
failure_reasons = self._analyze_routing_failures(pf, board)
|
||
|
||
for reason in failure_reasons:
|
||
self.log_to_gui(f" • {reason}", "WARNING")
|
||
|
||
# Offer smoke route as fallback
|
||
if failure_reasons:
|
||
self.log_to_gui("💡 Attempting smoke route fallback...", "INFO")
|
||
try:
|
||
success, smoke_tracks, smoke_vias = pf.smoke_route(board)
|
||
if success:
|
||
geom = pf.get_geometry_payload()
|
||
if hasattr(self, 'pcb_viewer') and self.pcb_viewer:
|
||
self.pcb_viewer.update_routing(geom.tracks, geom.vias)
|
||
self.log_to_gui(f"✅ Smoke route success: {smoke_tracks} tracks, {smoke_vias} vias", "SUCCESS")
|
||
self.status_label.setText(f"Smoke route fallback: {smoke_tracks} tracks, {smoke_vias} vias")
|
||
tracks, vias = smoke_tracks, smoke_vias # Update for button enabling logic
|
||
else:
|
||
self.log_to_gui("❌ Smoke route also failed", "ERROR")
|
||
except Exception as smoke_e:
|
||
self.log_to_gui(f"❌ Smoke route error: {smoke_e}", "ERROR")
|
||
|
||
# Show detailed dialog if still no copper (this is a setup issue, not convergence)
|
||
if tracks == 0 and vias == 0:
|
||
failure_text = "\n".join([f"• {reason}" for reason in failure_reasons])
|
||
QMessageBox.warning(
|
||
self,
|
||
"No Copper Generated",
|
||
f"Routing completed but no copper was generated.\n\n"
|
||
f"This indicates a board setup issue, not a convergence problem.\n\n"
|
||
f"Possible reasons:\n{failure_text}\n\n"
|
||
f"Try:\n"
|
||
f"• Check that nets have at least 2 pads\n"
|
||
f"• Verify components are properly connected\n"
|
||
f"• Use Tools → Quick Smoke Route for basic connectivity test"
|
||
)
|
||
else:
|
||
# 6) Log completion (dialog already shown above)
|
||
self.log_to_gui(f"✅ Routing completed: {tracks} tracks, {vias} vias", "SUCCESS")
|
||
|
||
self.status_label.setText(f"Autoroute complete: {tracks} tracks, {vias} vias")
|
||
|
||
# Enable commit/rollback buttons if we have results
|
||
if tracks > 0 or vias > 0:
|
||
self.commit_btn.setEnabled(True)
|
||
self.rollback_btn.setEnabled(True)
|
||
self.replay_btn.setEnabled(True)
|
||
|
||
except Exception as e:
|
||
logger.exception("Autoroute failed")
|
||
error_str = str(e)
|
||
|
||
# Downgrade zero-length track errors to warnings (don't kill autoroute)
|
||
if "zero-length" in error_str.lower() or "zero length" in error_str.lower():
|
||
logger.warning(f"[GUI] Zero-length track handling: {error_str}")
|
||
self.log_to_gui(f"⚠️ Zero-length tracks filtered (routing continues): {error_str}", "WARNING")
|
||
# Don't show critical dialog for zero-length issues - they're handled upstream
|
||
else:
|
||
self.log_to_gui(f"❌ Autoroute failed: {e}", "ERROR")
|
||
QMessageBox.critical(self, "Autoroute Error", f"Autoroute failed:\n{str(e)}")
|
||
finally:
|
||
self._set_ui_busy(False)
|
||
|
||
def _set_ui_busy(self, busy: bool, status_text: str = ""):
|
||
"""Set UI to busy state during routing"""
|
||
self.route_preview_btn.setEnabled(not busy)
|
||
if busy:
|
||
self.overuse_table_label.setText("Routing in progress...")
|
||
if status_text:
|
||
self.status_label.setText(status_text)
|
||
else:
|
||
self.status_label.setText("Ready" if not status_text else status_text)
|
||
|
||
def _create_board_from_data(self):
|
||
"""Create Board domain object from GUI board_data"""
|
||
from ...domain.models.board import Board, Net, Pad, Component, Coordinate
|
||
|
||
# DEBUG: Check what bounds data we have in board_data
|
||
logger.info(f"DEBUG: board_data keys: {list(self.board_data.keys())}")
|
||
if 'bounds' in self.board_data:
|
||
logger.info(f"DEBUG: board_data['bounds'] = {self.board_data['bounds']}")
|
||
else:
|
||
logger.info("DEBUG: 'bounds' key not found in board_data!")
|
||
|
||
# Extract layer count from board_data (set by plugin)
|
||
layer_count = self.board_data.get('layers', 2)
|
||
if isinstance(layer_count, int):
|
||
detected_layers = layer_count
|
||
else:
|
||
detected_layers = 2 # fallback
|
||
logger.info(f"DEBUG: Creating Board object with layer_count={detected_layers}")
|
||
|
||
board = Board(id="gui-board", name="GUI Board", layer_count=detected_layers)
|
||
board.nets = []
|
||
board.components = [] # Initialize components list
|
||
|
||
# Track components and their pads for proper Board structure
|
||
components_dict = {} # component_id -> Component object
|
||
all_pads = [] # Keep track of all pads for components
|
||
|
||
# First, create components from board_data components section
|
||
components_data = self.board_data.get('components', [])
|
||
logger.info(f"DEBUG: Found {len(components_data)} components in board_data")
|
||
for comp_data in components_data:
|
||
if isinstance(comp_data, dict):
|
||
comp_id = comp_data.get('name', comp_data.get('id', ''))
|
||
if comp_id:
|
||
component = Component(
|
||
id=comp_id,
|
||
reference=comp_id,
|
||
value=comp_data.get('value', ''),
|
||
footprint=comp_data.get('footprint', ''),
|
||
position=Coordinate(
|
||
x=comp_data.get('x', 0.0),
|
||
y=comp_data.get('y', 0.0)
|
||
),
|
||
angle=comp_data.get('rotation', 0.0)
|
||
)
|
||
components_dict[comp_id] = component
|
||
|
||
# Convert nets from GUI data format to domain objects
|
||
nets_data = self.board_data.get('nets', {})
|
||
logger.info(f"DEBUG: Processing {len(nets_data)} nets from board_data")
|
||
if isinstance(nets_data, dict):
|
||
for net_id, net_info in nets_data.items():
|
||
net = Net(id=net_id, name=net_info.get('name', net_id))
|
||
net.pads = []
|
||
|
||
# Add pads from net data
|
||
for pad_ref in net_info.get('pads', []):
|
||
if isinstance(pad_ref, dict):
|
||
# Extract component name from pad name (likely "ComponentName.PadNumber" format)
|
||
pad_name = pad_ref.get('name', '')
|
||
component_id = pad_ref.get('component', '') # First try explicit component field
|
||
|
||
# Fallback: extract from pad name if no explicit component
|
||
if not component_id and '.' in pad_name:
|
||
component_id = pad_name.split('.')[0]
|
||
|
||
# DEBUG: Check pad_ref structure
|
||
if len(all_pads) < 5: # Only log first few to avoid spam
|
||
logger.info(f"DEBUG: pad_ref keys: {list(pad_ref.keys())}")
|
||
logger.info(f"DEBUG: pad_ref.get('name') = '{pad_name}'")
|
||
logger.info(f"DEBUG: pad_ref.get('component') = '{pad_ref.get('component', 'N/A')}'")
|
||
logger.info(f"DEBUG: extracted component_id = '{component_id}'")
|
||
|
||
# Extract pad coordinates from KiCad data
|
||
pad_x = pad_ref.get('x', 0.0)
|
||
pad_y = pad_ref.get('y', 0.0)
|
||
|
||
pad = Pad(
|
||
id=pad_name or f"{component_id}.{pad_ref.get('pin', '')}",
|
||
component_id=component_id,
|
||
net_id=net_id,
|
||
position=Coordinate(x=pad_x, y=pad_y),
|
||
size=(0.2, 0.2), # Default pad size
|
||
layer=pad_ref.get('layer', 'F.Cu')
|
||
)
|
||
|
||
# DEBUG: Log pad position for first few pads
|
||
if len(all_pads) < 5:
|
||
logger.info(f"DEBUG: Creating pad '{pad.id}' at ({pad_x}, {pad_y}) from pad_ref x={pad_ref.get('x')}, y={pad_ref.get('y')}")
|
||
net.pads.append(pad)
|
||
all_pads.append(pad)
|
||
|
||
# Create component if it doesn't exist yet (fallback if not in components section)
|
||
if component_id and component_id not in components_dict:
|
||
component = Component(
|
||
id=component_id,
|
||
reference=component_id,
|
||
value="",
|
||
footprint="",
|
||
position=Coordinate(x=pad.position.x, y=pad.position.y), # Use pad position as reference
|
||
angle=0.0
|
||
)
|
||
components_dict[component_id] = component
|
||
|
||
if len(net.pads) >= 2: # Only add nets with at least 2 pads
|
||
board.nets.append(net)
|
||
|
||
# Populate components with their pads
|
||
pads_assigned = 0
|
||
pads_orphaned = 0
|
||
for pad in all_pads:
|
||
if pad.component_id and pad.component_id in components_dict:
|
||
components_dict[pad.component_id].pads.append(pad)
|
||
pads_assigned += 1
|
||
else:
|
||
pads_orphaned += 1
|
||
if pads_orphaned <= 5: # Log first few orphaned pads
|
||
logger.warning(f"DEBUG: Orphaned pad '{pad.id}' with component_id '{pad.component_id}' - not found in components_dict")
|
||
|
||
# FALLBACK: Create a generic component for orphaned pads
|
||
if pads_orphaned > 0:
|
||
logger.info(f"DEBUG: Creating generic component for {pads_orphaned} orphaned pads")
|
||
generic_component = Component(
|
||
id="GENERIC_COMPONENT",
|
||
reference="GENERIC_COMPONENT",
|
||
value="Generic",
|
||
footprint="Generic",
|
||
position=Coordinate(x=200.0, y=140.0), # Center of typical board
|
||
angle=0.0
|
||
)
|
||
|
||
# Add all orphaned pads to generic component
|
||
for pad in all_pads:
|
||
if not pad.component_id or pad.component_id not in components_dict:
|
||
pad.component_id = "GENERIC_COMPONENT" # Update pad's component reference
|
||
generic_component.pads.append(pad)
|
||
pads_assigned += 1
|
||
pads_orphaned -= 1
|
||
|
||
components_dict["GENERIC_COMPONENT"] = generic_component
|
||
|
||
# Add components to board
|
||
board.components = list(components_dict.values())
|
||
|
||
# DEBUG: Check component pad counts
|
||
component_pad_counts = [(comp.id, len(comp.pads)) for comp in board.components]
|
||
logger.info(f"DEBUG: Component pad counts: {component_pad_counts[:5]}...") # Show first 5
|
||
|
||
logger.info(f"Created board with {len(board.nets)} routable nets, {len(board.components)} components, {len(all_pads)} total pads")
|
||
logger.info(f"DEBUG: Pad assignment: {pads_assigned} assigned, {pads_orphaned} orphaned")
|
||
|
||
# Store KiCad-calculated bounds for accurate routing area
|
||
kicad_bounds = self.board_data.get('bounds', None)
|
||
board._kicad_bounds = kicad_bounds
|
||
|
||
# DEBUG: Verify bounds were set correctly
|
||
logger.info(f"DEBUG: Setting board._kicad_bounds = {kicad_bounds}")
|
||
logger.info(f"DEBUG: After setting, hasattr(board, '_kicad_bounds') = {hasattr(board, '_kicad_bounds')}")
|
||
if hasattr(board, '_kicad_bounds'):
|
||
logger.info(f"DEBUG: board._kicad_bounds = {board._kicad_bounds}")
|
||
|
||
return board
|
||
|
||
def quick_smoke_route(self):
|
||
"""Run quick smoke route test"""
|
||
if not self.plugin:
|
||
QMessageBox.critical(self, "Plugin Error", "No plugin instance available")
|
||
return
|
||
|
||
try:
|
||
pf = self.plugin.get_pathfinder()
|
||
board = self._create_board_from_data()
|
||
|
||
self.log_to_gui("🚀 Starting Quick Smoke Route test", "INFO")
|
||
self._set_ui_busy(True, "Running smoke route...")
|
||
|
||
# Initialize if needed
|
||
if not hasattr(pf, 'graph_state') or not pf.graph_state:
|
||
self.log_to_gui("[SMOKE] Initializing graph...", "INFO")
|
||
pf.initialize_graph(board)
|
||
pf.map_all_pads(board)
|
||
pf.prepare_routing_runtime()
|
||
|
||
# Run smoke route
|
||
success, tracks, vias = pf.smoke_route(board)
|
||
|
||
if success:
|
||
# Update viewer with smoke route results
|
||
geom = pf.get_geometry_payload()
|
||
if hasattr(self, 'pcb_viewer') and self.pcb_viewer:
|
||
self.pcb_viewer.update_routing(geom.tracks, geom.vias)
|
||
|
||
self.log_to_gui(f"✅ [SMOKE] Success: emitted {tracks} tracks, {vias} vias", "SUCCESS")
|
||
self.status_label.setText(f"Smoke route complete: {tracks} tracks, {vias} vias")
|
||
|
||
# Enable commit/rollback buttons
|
||
self.commit_btn.setEnabled(True)
|
||
self.rollback_btn.setEnabled(True)
|
||
self.replay_btn.setEnabled(True)
|
||
else:
|
||
self.log_to_gui("❌ [SMOKE] Failed: no copper generated", "ERROR")
|
||
self.status_label.setText("Smoke route failed")
|
||
QMessageBox.warning(self, "Smoke Route", "Smoke route failed - no routable pairs found")
|
||
|
||
except Exception as e:
|
||
logger.exception("Smoke route failed")
|
||
self.log_to_gui(f"❌ [SMOKE] Error: {e}", "ERROR")
|
||
QMessageBox.critical(self, "Smoke Route Error", f"Smoke route failed:\n{str(e)}")
|
||
finally:
|
||
self._set_ui_busy(False)
|
||
|
||
def _analyze_routing_failures(self, pf, board):
|
||
"""Analyze why routing failed to generate copper"""
|
||
reasons = []
|
||
|
||
try:
|
||
# Check basic board state
|
||
if not board or not board.nets:
|
||
reasons.append("No nets found in board data")
|
||
return reasons
|
||
|
||
nets_with_pads = [net for net in board.nets if len(net.pads) >= 2]
|
||
if len(nets_with_pads) == 0:
|
||
reasons.append("No nets with 2+ pads found")
|
||
|
||
# Check pathfinder state
|
||
if not hasattr(pf, 'graph_state') or not pf.graph_state:
|
||
reasons.append("Graph state not initialized")
|
||
return reasons
|
||
|
||
gs = pf.graph_state
|
||
|
||
# Check lattice
|
||
if not hasattr(gs, 'lattice_node_count') or gs.lattice_node_count == 0:
|
||
reasons.append("No lattice nodes generated")
|
||
|
||
# Check terminals
|
||
if not hasattr(gs, 'net_terminals') or not gs.net_terminals:
|
||
reasons.append("No net terminals mapped")
|
||
else:
|
||
terminal_count = sum(len(pins) for pins in gs.net_terminals.values())
|
||
if terminal_count == 0:
|
||
reasons.append("Net terminals mapped but empty")
|
||
|
||
# Check connectivity
|
||
if hasattr(pf, '_comp') and hasattr(pf, '_giant_label'):
|
||
giant_size = sum(1 for comp in pf._comp if comp == pf._giant_label)
|
||
total_nodes = len(pf._comp) if pf._comp else 0
|
||
if giant_size == 0:
|
||
reasons.append("No giant connected component found")
|
||
elif giant_size < total_nodes * 0.1:
|
||
reasons.append(f"Giant component too small: {giant_size}/{total_nodes} nodes")
|
||
|
||
# Check for specific routing failures
|
||
if hasattr(gs, 'committed_paths'):
|
||
if not gs.committed_paths:
|
||
reasons.append("No paths were successfully routed")
|
||
elif len(gs.committed_paths) == 0:
|
||
reasons.append("All routing attempts failed (no path found)")
|
||
|
||
# Default fallback
|
||
if not reasons:
|
||
reasons.append("Unknown routing failure - check logs for details")
|
||
|
||
except Exception as e:
|
||
reasons.append(f"Analysis error: {str(e)}")
|
||
|
||
return reasons
|
||
|
||
def self_test_lattice(self):
|
||
"""Self-test: Build lattice and verify connectivity"""
|
||
if not self.plugin:
|
||
QMessageBox.critical(self, "Plugin Error", "No plugin instance available")
|
||
return
|
||
|
||
try:
|
||
self.log_to_gui("🧪 Starting Lattice Self-Test...", "INFO")
|
||
self._set_ui_busy(True, "Running lattice test...")
|
||
|
||
pf = self.plugin.get_pathfinder()
|
||
board = self._create_board_from_data()
|
||
|
||
# Initialize lattice
|
||
pf.initialize_graph(board)
|
||
|
||
# Check lattice metrics
|
||
gs = pf.graph_state
|
||
lattice_count = getattr(gs, 'lattice_node_count', 0)
|
||
|
||
if hasattr(pf, '_comp') and hasattr(pf, '_giant_label'):
|
||
giant_size = sum(1 for comp in pf._comp if comp == pf._giant_label)
|
||
giant_ratio = giant_size / len(pf._comp) if pf._comp else 0
|
||
else:
|
||
pf._analyze_lattice_connectivity()
|
||
giant_size = sum(1 for comp in pf._comp if comp == pf._giant_label)
|
||
giant_ratio = giant_size / len(pf._comp) if pf._comp else 0
|
||
|
||
# Report results
|
||
self.log_to_gui(f"✓ Lattice nodes: {lattice_count}", "SUCCESS")
|
||
self.log_to_gui(f"✓ Giant component: {giant_size} nodes ({giant_ratio:.1%})", "SUCCESS")
|
||
|
||
if lattice_count > 0 and giant_ratio > 0.5:
|
||
self.log_to_gui("✅ LATTICE TEST PASSED", "SUCCESS")
|
||
self.status_label.setText("✓ Lattice test passed")
|
||
QMessageBox.information(self, "Lattice Test", f"✅ Lattice test PASSED\n\nLattice: {lattice_count} nodes\nGiant: {giant_size} nodes ({giant_ratio:.1%})")
|
||
else:
|
||
self.log_to_gui("❌ LATTICE TEST FAILED", "ERROR")
|
||
self.status_label.setText("❌ Lattice test failed")
|
||
QMessageBox.warning(self, "Lattice Test", f"❌ Lattice test FAILED\n\nLattice: {lattice_count} nodes\nGiant: {giant_size} nodes ({giant_ratio:.1%})")
|
||
|
||
except Exception as e:
|
||
logger.exception("Lattice test failed")
|
||
self.log_to_gui(f"❌ Lattice test error: {e}", "ERROR")
|
||
QMessageBox.critical(self, "Lattice Test Error", f"Lattice test failed:\n{str(e)}")
|
||
finally:
|
||
self._set_ui_busy(False)
|
||
|
||
def self_test_tiny_route(self):
|
||
"""Self-test: Create synthetic board and route nets"""
|
||
if not self.plugin:
|
||
QMessageBox.critical(self, "Plugin Error", "No plugin instance available")
|
||
return
|
||
|
||
try:
|
||
self.log_to_gui("🧪 Starting Tiny Route Self-Test...", "INFO")
|
||
self._set_ui_busy(True, "Running tiny route test...")
|
||
|
||
pf = self.plugin.get_pathfinder()
|
||
|
||
# Create synthetic 20x20x3 board
|
||
synthetic_board = self._create_synthetic_board()
|
||
|
||
# Route using unified pipeline
|
||
pf.initialize_graph(synthetic_board)
|
||
pf.map_all_pads(synthetic_board)
|
||
pf.prepare_routing_runtime()
|
||
pf.route_multiple_nets(synthetic_board.nets)
|
||
|
||
# Emit geometry
|
||
tracks, vias = pf.emit_geometry(synthetic_board)
|
||
geom = pf.get_geometry_payload()
|
||
|
||
# Update viewer with synthetic results
|
||
if hasattr(self, 'pcb_viewer') and self.pcb_viewer:
|
||
self.pcb_viewer.update_routing(geom.tracks, geom.vias)
|
||
|
||
# Report results
|
||
if tracks > 0 or vias > 0:
|
||
self.log_to_gui(f"✅ TINY ROUTE TEST PASSED: {tracks} tracks, {vias} vias", "SUCCESS")
|
||
self.status_label.setText("✓ Tiny route test passed")
|
||
QMessageBox.information(self, "Tiny Route Test", f"✅ Tiny Route test PASSED\n\nGenerated:\n{tracks} tracks\n{vias} vias")
|
||
else:
|
||
self.log_to_gui("❌ TINY ROUTE TEST FAILED: no copper generated", "ERROR")
|
||
self.status_label.setText("❌ Tiny route test failed")
|
||
QMessageBox.warning(self, "Tiny Route Test", "❌ Tiny Route test FAILED\n\nNo copper generated")
|
||
|
||
except Exception as e:
|
||
logger.exception("Tiny route test failed")
|
||
self.log_to_gui(f"❌ Tiny route test error: {e}", "ERROR")
|
||
QMessageBox.critical(self, "Tiny Route Test Error", f"Tiny route test failed:\n{str(e)}")
|
||
finally:
|
||
self._set_ui_busy(False)
|
||
|
||
def _create_synthetic_board(self):
|
||
"""Create a 20x20x3 synthetic board with 10 nets for testing"""
|
||
from ...domain.models.board import Board, Net, Pad, Component, Coordinate
|
||
import random
|
||
|
||
board = Board(id="synthetic", name="Synthetic Test Board 20x20x3")
|
||
board.nets = []
|
||
|
||
# Create 10 test nets, each with 2 pads
|
||
for net_id in range(10):
|
||
net = Net(id=f"net_{net_id}", name=f"TEST_NET_{net_id}")
|
||
net.pads = []
|
||
|
||
# Create 2 pads per net at random positions
|
||
for pad_id in range(2):
|
||
pad = Pad(
|
||
id=f"net_{net_id}_pad_{pad_id}",
|
||
component_id=f"U{net_id}",
|
||
net_id=f"net_{net_id}",
|
||
position=Coordinate(
|
||
x=random.uniform(0, 20), # 20mm x 20mm board
|
||
y=random.uniform(0, 20)
|
||
),
|
||
size=(0.2, 0.2), # Default pad size
|
||
layer="F.Cu" # All on top layer for simplicity
|
||
)
|
||
net.pads.append(pad)
|
||
|
||
board.nets.append(net)
|
||
|
||
self.log_to_gui(f"Created synthetic board: {len(board.nets)} nets, 20x20mm", "INFO")
|
||
return board
|
||
|
||
def commit_routes(self):
|
||
"""Apply routes to KiCad"""
|
||
logger.info("SUCCESS: Committing routes to KiCad")
|
||
self.status_label.setText("Applying routes to KiCad...")
|
||
|
||
try:
|
||
# Get routing solution from board_data
|
||
tracks = self.board_data.get('tracks', [])
|
||
vias = self.board_data.get('vias', [])
|
||
|
||
if not tracks and not vias:
|
||
QMessageBox.warning(self, "No Routes", "No routing solution to apply")
|
||
return
|
||
|
||
# Warn if applying provisional geometry with overuse
|
||
if hasattr(self, 'router') and self.router:
|
||
try:
|
||
over_sum, over_cnt = self.router.accounting.compute_overuse(router_instance=self.router)
|
||
if over_sum > 0:
|
||
reply = QMessageBox.warning(
|
||
self,
|
||
"Routing Has Conflicts",
|
||
f"⚠️ The routing did not fully converge.\n\n"
|
||
f"Overuse: {over_sum} (on {over_cnt} edges)\n\n"
|
||
f"This means some tracks/vias overlap and will cause DRC errors in KiCad.\n\n"
|
||
f"Do you want to apply anyway?",
|
||
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
|
||
QMessageBox.StandardButton.No
|
||
)
|
||
if reply == QMessageBox.StandardButton.No:
|
||
return
|
||
except Exception as e:
|
||
logger.warning(f"Could not check overuse status: {e}")
|
||
|
||
logger.info(f"Applying {len(tracks)} tracks and {len(vias)} vias to KiCad")
|
||
|
||
# Use existing KiCad connection
|
||
if not self.kicad_interface or not self.kicad_interface.board:
|
||
QMessageBox.critical(self, "Connection Error", "No active KiCad connection.")
|
||
logger.error("No kicad_interface.board available")
|
||
return
|
||
|
||
board = self.kicad_interface.board
|
||
|
||
# Get nets for lookup
|
||
board_nets = board.get_nets()
|
||
net_lookup = {net.name: net for net in board_nets}
|
||
|
||
# Layer string to BoardLayer enum
|
||
def layer_to_enum(layer_str):
|
||
layer_map = {'F.Cu': BoardLayer.BL_F_Cu, 'B.Cu': BoardLayer.BL_B_Cu}
|
||
for i in range(1, 31):
|
||
layer_map[f'In{i}.Cu'] = getattr(BoardLayer, f'BL_In{i}_Cu')
|
||
return layer_map.get(layer_str, BoardLayer.BL_F_Cu)
|
||
|
||
# Export geometry to JSON for debugging/comparison with DRC
|
||
import json
|
||
from datetime import datetime
|
||
|
||
json_export = {
|
||
'metadata': {
|
||
'timestamp': datetime.now().isoformat(),
|
||
'board_name': getattr(board, 'filename', 'unknown'),
|
||
'track_count': len(tracks),
|
||
'via_count': len(vias),
|
||
},
|
||
'tracks': [],
|
||
'vias': []
|
||
}
|
||
|
||
for track_data in tracks:
|
||
json_export['tracks'].append({
|
||
'net': track_data.get('net', ''),
|
||
'layer': track_data['layer'],
|
||
'start': {'x': track_data['x1'], 'y': track_data['y1']},
|
||
'end': {'x': track_data['x2'], 'y': track_data['y2']},
|
||
'width': track_data.get('width', 0.2),
|
||
'is_escape': track_data.get('escape', False)
|
||
})
|
||
|
||
for via_data in vias:
|
||
json_export['vias'].append({
|
||
'net': via_data.get('net', ''),
|
||
'position': {'x': via_data['x'], 'y': via_data['y']},
|
||
'from_layer': via_data.get('start_layer', via_data.get('from_layer', 'F.Cu')),
|
||
'to_layer': via_data.get('end_layer', via_data.get('to_layer', 'B.Cu')),
|
||
'diameter': via_data.get('diameter', 0.8),
|
||
'drill': via_data.get('drill', 0.4),
|
||
'is_escape': via_data.get('escape', False)
|
||
})
|
||
|
||
# Save to file
|
||
json_path = Path('debug_output') / f'kicad_export_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
|
||
json_path.parent.mkdir(exist_ok=True)
|
||
with open(json_path, 'w') as f:
|
||
json.dump(json_export, f, indent=2)
|
||
|
||
logger.info(f"📄 Exported geometry to JSON: {json_path}")
|
||
|
||
# Start transaction
|
||
commit = board.begin_commit()
|
||
|
||
# Create Track objects
|
||
items = []
|
||
for track_data in tracks:
|
||
track = Track()
|
||
track.start = Vector2.from_xy_mm(track_data['x1'], track_data['y1'])
|
||
track.end = Vector2.from_xy_mm(track_data['x2'], track_data['y2'])
|
||
track.layer = layer_to_enum(track_data['layer'])
|
||
track.width = int(track_data.get('width', 0.2) * 1_000_000) # Convert to nm
|
||
net_name = track_data.get('net', '')
|
||
if net_name in net_lookup:
|
||
track.net = net_lookup[net_name]
|
||
items.append(track)
|
||
|
||
# Create Via objects
|
||
for via_data in vias:
|
||
via = Via()
|
||
via.position = Vector2.from_xy_mm(via_data['x'], via_data['y'])
|
||
via.diameter = int(via_data.get('diameter', 0.8) * 1_000_000) # Convert to nm
|
||
via.drill_diameter = int(via_data.get('drill', 0.4) * 1_000_000) # Convert to nm
|
||
net_name = via_data.get('net', '')
|
||
if net_name in net_lookup:
|
||
via.net = net_lookup[net_name]
|
||
|
||
# Set blind/buried via layers
|
||
from_layer_str = via_data.get('start_layer', via_data.get('from_layer', 'F.Cu'))
|
||
to_layer_str = via_data.get('end_layer', via_data.get('to_layer', 'B.Cu'))
|
||
from_layer_enum = layer_to_enum(from_layer_str)
|
||
to_layer_enum = layer_to_enum(to_layer_str)
|
||
|
||
via.type = ViaType.VT_BLIND_BURIED
|
||
via.padstack.drill.start_layer = from_layer_enum
|
||
via.padstack.drill.end_layer = to_layer_enum
|
||
|
||
items.append(via)
|
||
|
||
# Batch create all items
|
||
board.create_items(items)
|
||
|
||
# Commit and save
|
||
board.push_commit(commit, f"OrthoRoute: {len(tracks)} tracks, {len(vias)} vias")
|
||
board.save()
|
||
|
||
QMessageBox.information(self, "Success",
|
||
f"Successfully applied routing to KiCad:\n"
|
||
f"• {len(tracks)} tracks\n"
|
||
f"• {len(vias)} vias\n\n"
|
||
f"You can apply again if needed.")
|
||
logger.info(f"✅ Applied {len(tracks)} tracks and {len(vias)} vias to KiCad")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error applying routes to KiCad: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
QMessageBox.critical(self, "Error", f"Failed to apply routes to KiCad:\n{e}")
|
||
|
||
# Keep button enabled for re-apply
|
||
self.overuse_table_label.setText("Routing completed - ready to apply")
|
||
self.status_label.setText("Routes applied - click 'Apply to KiCad' again if needed")
|
||
|
||
def rollback_routes(self):
|
||
"""Discard calculated routes"""
|
||
logger.info("Discarding routes")
|
||
self.status_label.setText("Discarding routes...")
|
||
|
||
# TODO: Implement route rollback
|
||
QMessageBox.information(self, "Discard Routes", "Route discarding not yet implemented")
|
||
|
||
self.commit_btn.setEnabled(False)
|
||
self.rollback_btn.setEnabled(False)
|
||
self.replay_btn.setEnabled(False)
|
||
self.route_preview_btn.setEnabled(True)
|
||
self.overuse_table_label.setText("Ready for routing")
|
||
self.status_label.setText("Routes discarded")
|
||
|
||
def replay_routing(self):
|
||
"""Re-run the same routing with clean state for demo repeatability"""
|
||
logger.info("Replaying last routing")
|
||
self.status_label.setText("Replaying routing...")
|
||
|
||
# Clear existing routes and restart
|
||
self.board_data['tracks'].clear()
|
||
self.board_data['vias'].clear()
|
||
|
||
# Refresh the viewer to clear previous routing
|
||
if self.pcb_viewer:
|
||
self.pcb_viewer.update_routing([], [])
|
||
|
||
# Reset button states and restart routing
|
||
self.commit_btn.setEnabled(False)
|
||
self.rollback_btn.setEnabled(False)
|
||
self.replay_btn.setEnabled(False)
|
||
self.route_preview_btn.setEnabled(False)
|
||
|
||
# Trigger autorouting again
|
||
self.begin_autorouting()
|
||
|
||
# Menu action methods
|
||
def refresh_board(self):
|
||
"""Refresh board data from KiCad"""
|
||
logger.info("Refreshing board data from KiCad...")
|
||
self.status_label.setText("Refreshing from KiCad...")
|
||
|
||
try:
|
||
# Get fresh board data
|
||
new_board_data = self.kicad_interface.get_board_data()
|
||
|
||
if new_board_data:
|
||
self.board_data = new_board_data
|
||
self.load_board_data()
|
||
self.status_label.setText("Board data refreshed successfully")
|
||
logger.info("Board data refreshed from KiCad")
|
||
else:
|
||
self.status_label.setText("Failed to refresh board data")
|
||
QMessageBox.warning(self, "Refresh Failed", "Could not refresh board data from KiCad")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error refreshing board data: {e}")
|
||
self.status_label.setText("Refresh failed")
|
||
QMessageBox.critical(self, "Refresh Error", f"Error refreshing board data:\\n{e}")
|
||
|
||
def zoom_fit(self):
|
||
"""Fit board to window"""
|
||
if self.pcb_viewer:
|
||
self.pcb_viewer.fit_to_view()
|
||
|
||
def auto_route_all(self):
|
||
"""Auto route all nets (menu action)"""
|
||
self.begin_autorouting()
|
||
|
||
def _ensure_router_exists(self):
|
||
"""Create router instance if it doesn't exist (for checkpoint loading)"""
|
||
if hasattr(self, 'router') and self.router is not None:
|
||
return # Router already exists
|
||
|
||
# Create router instance
|
||
from ...algorithms.manhattan.unified_pathfinder import UnifiedPathFinder, PathFinderConfig, GPUConfig
|
||
|
||
pf_config = PathFinderConfig()
|
||
pf_config.strict_drc = True
|
||
|
||
self.router = UnifiedPathFinder(config=pf_config, use_gpu=GPUConfig.GPU_MODE)
|
||
logger.info(f"[CHECKPOINT] Created new router instance for checkpoint loading")
|
||
|
||
def save_checkpoint_manual(self):
|
||
"""Manually save a checkpoint (Ctrl+S)"""
|
||
try:
|
||
if not hasattr(self, 'router') or self.router is None:
|
||
QMessageBox.warning(self, "No Router", "No active router to save. Start routing first.")
|
||
return
|
||
|
||
# Get current iteration and pres_fac (if available)
|
||
iteration = getattr(self.router, 'iteration', 0)
|
||
pres_fac = 1.0 # Default, would need to track actual value
|
||
|
||
metadata = {'manual_save': True}
|
||
filepath = self.router.checkpoint_manager.save_checkpoint(
|
||
self.router, iteration, pres_fac, metadata
|
||
)
|
||
|
||
QMessageBox.information(self, "Checkpoint Saved",
|
||
f"Saved checkpoint to:\n{filepath}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to save checkpoint: {e}")
|
||
QMessageBox.critical(self, "Save Failed", f"Failed to save checkpoint:\n{str(e)}")
|
||
|
||
def load_checkpoint_dialog(self):
|
||
"""Show dialog to select and load a checkpoint (Ctrl+L)"""
|
||
from PyQt6.QtWidgets import QFileDialog
|
||
|
||
try:
|
||
# Create router if it doesn't exist
|
||
self._ensure_router_exists()
|
||
|
||
# Get checkpoint directory
|
||
checkpoint_dir = str(self.router.checkpoint_manager.checkpoint_dir)
|
||
|
||
# Show file dialog
|
||
filepath, _ = QFileDialog.getOpenFileName(
|
||
self,
|
||
"Load Checkpoint",
|
||
checkpoint_dir,
|
||
"Checkpoint Files (*.pkl);;All Files (*)"
|
||
)
|
||
|
||
if filepath:
|
||
checkpoint = self.router.load_checkpoint(filepath)
|
||
metadata = checkpoint.get('metadata', {})
|
||
|
||
msg = (f"Loaded checkpoint from iteration {checkpoint['iteration']}\n"
|
||
f"Overuse: {metadata.get('overuse', 'N/A')}\n"
|
||
f"Routed nets: {metadata.get('routed_nets', 'N/A')}")
|
||
|
||
QMessageBox.information(self, "Checkpoint Loaded", msg)
|
||
|
||
# Refresh visualization with loaded routing
|
||
if hasattr(self, 'pcb_viewer'):
|
||
# Get geometry from router
|
||
geometry = self.router.get_provisional_geometry()
|
||
if geometry and (geometry.tracks or geometry.vias):
|
||
logger.info(f"[CHECKPOINT-VIS] Updating visualization: {len(geometry.tracks)} tracks, {len(geometry.vias)} vias")
|
||
self.pcb_viewer.update_routing(geometry.tracks, geometry.vias)
|
||
self.pcb_viewer.update()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to load checkpoint: {e}")
|
||
QMessageBox.critical(self, "Load Failed", f"Failed to load checkpoint:\n{str(e)}")
|
||
|
||
def resume_from_latest(self):
|
||
"""Resume routing from the most recent checkpoint (Ctrl+R)"""
|
||
try:
|
||
# Check if we have an existing router with board already initialized
|
||
has_initialized_router = (hasattr(self, 'router') and
|
||
self.router is not None and
|
||
hasattr(self.router, 'lattice') and
|
||
self.router.lattice is not None)
|
||
|
||
if not has_initialized_router:
|
||
# Check if checkpoint has geometry (new format = instant resume)
|
||
latest = self.router.checkpoint_manager.get_latest_checkpoint() if hasattr(self, 'router') and self.router else None
|
||
if not latest:
|
||
from ...algorithms.manhattan.checkpoint import CheckpointManager
|
||
cm = CheckpointManager()
|
||
latest = cm.get_latest_checkpoint()
|
||
|
||
has_geometry = False
|
||
if latest:
|
||
try:
|
||
import pickle
|
||
with open(latest, 'rb') as f:
|
||
checkpoint = pickle.load(f)
|
||
has_geometry = 'geometry' in checkpoint
|
||
except:
|
||
pass
|
||
|
||
if has_geometry:
|
||
# New checkpoint format - instant resume!
|
||
reply = QMessageBox.information(
|
||
self,
|
||
"Instant Resume Ready",
|
||
"This checkpoint contains full board geometry.\n\n"
|
||
"✓ Resume will be INSTANT (no 1-hour wait)\n"
|
||
"✓ All routing state preserved\n\n"
|
||
"Click OK to load checkpoint.",
|
||
QMessageBox.StandardButton.Ok
|
||
)
|
||
else:
|
||
# Old checkpoint format - needs board init
|
||
reply = QMessageBox.question(
|
||
self,
|
||
"Board Initialization Required",
|
||
"This is an OLD checkpoint without board geometry.\n\n"
|
||
"⏳ Requires board initialization (~1 hour)\n"
|
||
"✓ New checkpoints will have instant resume\n\n"
|
||
"Do you want to continue?",
|
||
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
|
||
)
|
||
if reply == QMessageBox.StandardButton.No:
|
||
return
|
||
|
||
# Create router if it doesn't exist
|
||
self._ensure_router_exists()
|
||
|
||
latest = self.router.checkpoint_manager.get_latest_checkpoint()
|
||
|
||
if latest is None:
|
||
QMessageBox.information(self, "No Checkpoints",
|
||
"No checkpoints found. Run routing first to create checkpoints.")
|
||
return
|
||
|
||
checkpoint = self.router.load_checkpoint(latest)
|
||
metadata = checkpoint.get('metadata', {})
|
||
|
||
msg = (f"✓ Loaded checkpoint from iteration {checkpoint['iteration']}\n\n"
|
||
f"Overuse: {metadata.get('overuse', 'N/A'):,}\n"
|
||
f"Routed nets: {metadata.get('routed_nets', 'N/A')}\n"
|
||
f"Failed nets: {metadata.get('failed_nets', 'N/A')}\n\n"
|
||
f"Router is now initialized with checkpoint state.\n"
|
||
f"You can modify parameters and start routing to continue.")
|
||
|
||
# Mark that we need to resume (board init required first)
|
||
self._pending_checkpoint_resume = True
|
||
|
||
msg_updated = (f"✓ Loaded checkpoint from iteration {checkpoint['iteration']}\n\n"
|
||
f"Overuse: {metadata.get('overuse', 'N/A'):,}\n"
|
||
f"Routed nets: {metadata.get('routed_nets', 'N/A')}\n"
|
||
f"Failed nets: {metadata.get('failed_nets', 'N/A')}\n\n"
|
||
f"⚠️ Board initialization still needed (~1 hour)\n"
|
||
f"Click 'Auto Route All' to initialize board and resume routing.\n"
|
||
f"Routing will continue from iteration {checkpoint['iteration'] + 1}.")
|
||
|
||
QMessageBox.information(self, "Checkpoint Loaded - Resume Pending", msg_updated)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to resume from checkpoint: {e}")
|
||
QMessageBox.critical(self, "Resume Failed", f"Failed to resume from checkpoint:\n{str(e)}")
|
||
|
||
def toggle_auto_checkpoint(self, checked):
|
||
"""Toggle auto-checkpoint saving"""
|
||
if hasattr(self, 'router') and self.router is not None:
|
||
self.router.auto_checkpoint = checked
|
||
status = "enabled" if checked else "disabled"
|
||
logger.info(f"Auto-checkpoint {status}")
|
||
|
||
def export_pcb(self):
|
||
"""Export board to .ORP file for cloud routing (Ctrl+E)"""
|
||
try:
|
||
if not self.board_data:
|
||
QMessageBox.warning(self, "No Board", "No board loaded. Please load a board first.")
|
||
return
|
||
|
||
# Derive default filename from board filename
|
||
board_filename = self.board_data.get('filename', 'board.kicad_pcb')
|
||
default_filename = derive_orp_filename(board_filename)
|
||
|
||
# Show save dialog
|
||
filepath, _ = QFileDialog.getSaveFileName(
|
||
self,
|
||
"Export PCB to ORP",
|
||
default_filename,
|
||
"OrthoRoute PCB Files (*.ORP);;All Files (*)"
|
||
)
|
||
|
||
if not filepath:
|
||
return # User cancelled
|
||
|
||
# Ensure .ORP extension
|
||
if not filepath.upper().endswith('.ORP'):
|
||
filepath += '.ORP'
|
||
|
||
# Export board data
|
||
self.status_label.setText("Exporting board to ORP...")
|
||
logger.info(f"Exporting board to {filepath}")
|
||
|
||
export_pcb_to_orp(self.board_data, filepath, compress=True)
|
||
|
||
# If we got here, export succeeded (would have raised exception otherwise)
|
||
self.status_label.setText("Board exported successfully")
|
||
|
||
# Show success message with details
|
||
nets_count = len(self.board_data.get('nets', {}))
|
||
pads_count = len(self.board_data.get('pads', []))
|
||
layers = self.board_data.get('layers', 0)
|
||
|
||
msg = (f"Board exported successfully!\n\n"
|
||
f"File: {filepath}\n"
|
||
f"Nets: {nets_count}\n"
|
||
f"Pads: {pads_count}\n"
|
||
f"Layers: {layers}\n\n"
|
||
f"You can now upload this file to a cloud GPU instance for routing.")
|
||
|
||
QMessageBox.information(self, "Export Successful", msg)
|
||
logger.info(f"Successfully exported board: {nets_count} nets, {pads_count} pads, {layers} layers")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error exporting PCB: {e}", exc_info=True)
|
||
self.status_label.setText("Export failed")
|
||
QMessageBox.critical(self, "Export Error", f"Error exporting board:\n{str(e)}")
|
||
|
||
def import_solution(self):
|
||
"""Import routing solution from .ORS file (Ctrl+I)"""
|
||
try:
|
||
# Show open dialog
|
||
filepath, _ = QFileDialog.getOpenFileName(
|
||
self,
|
||
"Import Solution from ORS",
|
||
"",
|
||
"OrthoRoute Solution Files (*.ORS);;All Files (*)"
|
||
)
|
||
|
||
if not filepath:
|
||
return # User cancelled
|
||
|
||
# Import solution
|
||
self.status_label.setText("Importing routing solution...")
|
||
logger.info(f"Importing solution from {filepath}")
|
||
|
||
# import_solution_from_ors returns a tuple: (geometry_data, metadata)
|
||
geometry_data, metadata = import_solution_from_ors(filepath)
|
||
|
||
if not geometry_data or not metadata:
|
||
self.status_label.setText("Import failed")
|
||
QMessageBox.critical(self, "Import Failed",
|
||
"Failed to import routing solution from ORS file.\n"
|
||
"Check that the file is valid and not corrupted.")
|
||
return
|
||
|
||
# Store the imported solution (combine for compatibility)
|
||
self.routing_result = {
|
||
'geometry': geometry_data,
|
||
'metadata': metadata
|
||
}
|
||
self.status_label.setText("Solution imported successfully")
|
||
|
||
# Generate summary for display (pass the metadata dict which contains 'metadata' and 'statistics' keys)
|
||
summary = get_solution_summary(metadata)
|
||
|
||
# Display solution in the preview
|
||
self._display_imported_solution(geometry_data)
|
||
|
||
# Show success message with routing metrics
|
||
QMessageBox.information(self, "Import Successful",
|
||
f"Routing solution imported successfully!\n\n{summary}")
|
||
|
||
logger.info(f"Successfully imported solution from {filepath}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error importing solution: {e}", exc_info=True)
|
||
self.status_label.setText("Import failed")
|
||
QMessageBox.critical(self, "Import Error", f"Error importing solution:\n{str(e)}")
|
||
|
||
def _display_imported_solution(self, geometry_data: Dict[str, Any]):
|
||
"""Display imported routing solution in the PCB viewer"""
|
||
try:
|
||
if not self.pcb_viewer:
|
||
logger.warning("No PCB viewer available to display solution")
|
||
return
|
||
|
||
# Build net lookup from board (needed for Track/Via net assignment)
|
||
board = self.kicad_interface.board
|
||
board_nets = board.get_nets()
|
||
net_lookup = {net.name: net for net in board_nets}
|
||
|
||
# Layer conversion functions
|
||
def layer_to_name(layer):
|
||
"""Convert layer (int or string) to layer name string"""
|
||
# If already a string, return it
|
||
if isinstance(layer, str):
|
||
return layer
|
||
|
||
# Convert integer to layer name
|
||
layer_int = int(layer)
|
||
if layer_int == 0:
|
||
return "F.Cu"
|
||
elif layer_int == 31:
|
||
return "B.Cu"
|
||
elif 1 <= layer_int <= 30:
|
||
return f"In{layer_int}.Cu"
|
||
else:
|
||
logger.warning(f"Unknown layer int {layer_int}, defaulting to F.Cu")
|
||
return "F.Cu"
|
||
|
||
def layer_to_enum(layer_str):
|
||
"""Convert layer name string to BoardLayer enum"""
|
||
layer_map = {'F.Cu': BoardLayer.BL_F_Cu, 'B.Cu': BoardLayer.BL_B_Cu}
|
||
for i in range(1, 31):
|
||
layer_map[f'In{i}.Cu'] = getattr(BoardLayer, f'BL_In{i}_Cu')
|
||
return layer_map.get(layer_str, BoardLayer.BL_F_Cu)
|
||
|
||
# Extract geometry from ORS data (geometry_data has 'by_net', not 'nets')
|
||
nets = geometry_data.get('by_net', {})
|
||
|
||
# Convert ORS format to display format
|
||
# The PCB viewer expects tracks and vias as dictionaries
|
||
tracks = []
|
||
vias = []
|
||
|
||
for net_name, net_data in nets.items():
|
||
# Process tracks (ORS format uses 'tracks' not 'traces')
|
||
for track_data in net_data.get('tracks', []):
|
||
# DEBUG: Log first track to see actual format
|
||
if len(tracks) == 0:
|
||
logger.info(f"DEBUG: First track_data from ORS: {track_data}")
|
||
logger.info(f"DEBUG: track_data keys: {track_data.keys() if isinstance(track_data, dict) else 'not a dict'}")
|
||
|
||
# ORS track format: {"layer": int, "start": {"x": float, "y": float}, "end": {...}, "width": float}
|
||
if isinstance(track_data, dict):
|
||
layer = track_data.get('layer', 0)
|
||
start = track_data.get('start', {})
|
||
end = track_data.get('end', {})
|
||
width = track_data.get('width', 0.15)
|
||
|
||
if len(tracks) == 0:
|
||
logger.info(f"DEBUG: start = {start}, end = {end}")
|
||
|
||
x1 = start.get('x', 0.0)
|
||
y1 = start.get('y', 0.0)
|
||
x2 = end.get('x', 0.0)
|
||
y2 = end.get('y', 0.0)
|
||
|
||
# Convert layer to name string for viewer
|
||
layer_name = layer_to_name(layer)
|
||
|
||
# Create track dictionary for viewer (not Track object)
|
||
track = {
|
||
'x1': x1,
|
||
'y1': y1,
|
||
'x2': x2,
|
||
'y2': y2,
|
||
'layer': layer_name,
|
||
'width': width,
|
||
'net': net_name
|
||
}
|
||
tracks.append(track)
|
||
|
||
# Process vias
|
||
for via_data in net_data.get('vias', []):
|
||
# ORS via format: {"position": {"x": float, "y": float}, "from_layer": int, "to_layer": int, "diameter": float, "drill": float}
|
||
if isinstance(via_data, dict):
|
||
position = via_data.get('position', {})
|
||
x = position.get('x', 0.0)
|
||
y = position.get('y', 0.0)
|
||
layer_from = via_data.get('from_layer', 0)
|
||
layer_to = via_data.get('to_layer', 1)
|
||
diameter = via_data.get('diameter', 0.4)
|
||
drill = via_data.get('drill', 0.2)
|
||
|
||
# Convert layers to name strings for viewer
|
||
from_layer_name = layer_to_name(layer_from)
|
||
to_layer_name = layer_to_name(layer_to)
|
||
|
||
# Create via dictionary for viewer (not Via object)
|
||
via = {
|
||
'x': x,
|
||
'y': y,
|
||
'from_layer': from_layer_name,
|
||
'to_layer': to_layer_name,
|
||
'diameter': diameter,
|
||
'drill': drill,
|
||
'net': net_name
|
||
}
|
||
vias.append(via)
|
||
|
||
# CRITICAL: Clear any existing tracks/vias first (they might be Track objects from KiCad load)
|
||
logger.info(f"Before import: board_data has {len(self.board_data.get('tracks', []))} existing tracks")
|
||
|
||
# Store tracks/vias directly in board_data (same as routing does)
|
||
logger.info(f"Storing {len(tracks)} tracks and {len(vias)} vias in board_data")
|
||
if tracks:
|
||
logger.info(f"Sample track: {tracks[0]}")
|
||
logger.info(f"Sample track type: {type(tracks[0])}")
|
||
if vias:
|
||
logger.info(f"Sample via: {vias[0]}")
|
||
|
||
self.board_data['tracks'] = tracks
|
||
self.board_data['vias'] = vias
|
||
|
||
logger.info(f"After import: board_data now has {len(self.board_data.get('tracks', []))} tracks")
|
||
logger.info(f"After import track type: {type(self.board_data['tracks'][0]) if self.board_data.get('tracks') else 'none'}")
|
||
|
||
# CRITICAL: Also set on viewer's board_data (they should be the same object, but be explicit)
|
||
if hasattr(self, 'pcb_viewer') and self.pcb_viewer:
|
||
if self.pcb_viewer.board_data is not None:
|
||
self.pcb_viewer.board_data['tracks'] = tracks
|
||
self.pcb_viewer.board_data['vias'] = vias
|
||
logger.info(f"Set {len(tracks)} tracks on viewer.board_data")
|
||
else:
|
||
logger.warning("Viewer board_data is None, setting it now")
|
||
self.pcb_viewer.board_data = self.board_data
|
||
|
||
# Trigger viewer repaint
|
||
self.pcb_viewer.update()
|
||
logger.info("Viewer updated - solution should now be visible")
|
||
else:
|
||
logger.warning("No pcb_viewer available to display routing")
|
||
|
||
# Enable commit button so user can apply the imported solution
|
||
if hasattr(self, 'commit_btn'):
|
||
self.commit_btn.setEnabled(True)
|
||
logger.info("Commit button enabled")
|
||
if hasattr(self, 'rollback_btn'):
|
||
self.rollback_btn.setEnabled(True)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error displaying imported solution: {e}", exc_info=True)
|
||
|
||
def _route_manhattan_rrg(self):
|
||
"""Perform Manhattan RRG routing with live GUI updates"""
|
||
try:
|
||
logger.info("Starting Manhattan routing with live updates...")
|
||
|
||
# Reset and prepare statistics widget
|
||
self.pathfinder_stats.reset()
|
||
|
||
# Create routing configuration matching RRG PathFinder parameters
|
||
config = RoutingConfig(
|
||
grid_pitch=0.4,
|
||
track_width=0.0889,
|
||
clearance=0.0889,
|
||
via_diameter=0.25,
|
||
via_drill=0.15,
|
||
k_length=1.0,
|
||
k_via=10.0,
|
||
k_bend=2.0,
|
||
max_iterations=50,
|
||
pres_fac_init=0.5,
|
||
pres_fac_mult=1.3,
|
||
hist_cost_step=1.0,
|
||
alpha=2.0
|
||
)
|
||
|
||
# Calculate total nets for statistics
|
||
nets = self.board_data.get('nets', [])
|
||
if isinstance(nets, list) and len(nets) > 0 and isinstance(nets[0], dict):
|
||
total_nets = len([net for net in nets if len(net.get('pads', [])) >= 2])
|
||
else:
|
||
# Fallback: use airwires count as total nets approximation
|
||
total_nets = len(self.board_data.get('airwires', []))
|
||
self.pathfinder_stats.start_routing(total_nets, config.max_iterations)
|
||
|
||
# Initialize progressive routing state
|
||
self._setup_progressive_routing(config)
|
||
|
||
# Set up progress callback for live statistics
|
||
if hasattr(self.router, 'gpu_pathfinder') and hasattr(self.router.gpu_pathfinder, 'parallel_pathfinder'):
|
||
self.router.gpu_pathfinder.parallel_pathfinder.progress_callback = self._on_pathfinder_progress
|
||
|
||
# Set up PathFinder instrumentation callback if available
|
||
if hasattr(self.router, 'pathfinder') and hasattr(self.router.pathfinder, 'set_gui_status_callback'):
|
||
self.router.pathfinder.set_gui_status_callback(self._update_pathfinder_status)
|
||
|
||
except Exception as e:
|
||
logger.exception("Error starting Manhattan routing")
|
||
self.status_label.setText(f"Error: {str(e)}")
|
||
self._reset_routing_ui()
|
||
QMessageBox.critical(self, "Routing Error", f"Error starting Manhattan routing:\n{str(e)}")
|
||
|
||
def _setup_progressive_routing(self, config):
|
||
"""Initialize progressive routing with live GUI updates"""
|
||
# Create GPU provider for router
|
||
try:
|
||
gpu_provider = CUDAProvider()
|
||
if not gpu_provider.is_available():
|
||
gpu_provider = CPUFallbackProvider()
|
||
logger.warning("CUDA not available, using CPU fallback")
|
||
except Exception as e:
|
||
logger.error(f"GPU initialization error: {e}")
|
||
gpu_provider = CPUFallbackProvider()
|
||
|
||
# Create basic constraints and initialize router
|
||
from orthoroute.domain.models.constraints import DRCConstraints
|
||
|
||
# Create constraints with default values (dataclass)
|
||
constraints = DRCConstraints()
|
||
constraints.min_track_width = 0.0889 # 3.5 mil
|
||
constraints.min_track_spacing = 0.2 # 0.2mm clearance
|
||
constraints.min_via_diameter = 0.8
|
||
constraints.min_via_drill = 0.4
|
||
|
||
# Initialize router with correct signature
|
||
self.router = ManhattanRRGRoutingEngine(
|
||
constraints=constraints,
|
||
gpu_provider=gpu_provider
|
||
)
|
||
|
||
# Skip the complex board creation and use the existing router initialization
|
||
# The router will use the KiCad interface directly to get nets
|
||
logger.info("Progressive routing: Using KiCad interface for board initialization")
|
||
|
||
# Convert board data to domain objects and initialize router
|
||
mock_board = self._convert_board_data_to_domain(self.board_data, constraints)
|
||
self.router.initialize(mock_board)
|
||
|
||
# Use real nets from the board
|
||
self.routing_nets = [net for net in mock_board.nets if net.is_routable]
|
||
logger.info(f"Progressive routing: Found {len(self.routing_nets)} routable nets from board data")
|
||
self.current_net_index = 0
|
||
self.routed_count = 0
|
||
self.failed_count = 0
|
||
self.routing_start_time = time.time()
|
||
|
||
# Setup GUI update timer
|
||
self.routing_timer = QTimer()
|
||
self.routing_timer.timeout.connect(self._routing_step)
|
||
self.routing_timer.start(100) # Update every 100ms
|
||
|
||
logger.info(f"Progressive routing initialized: {len(self.routing_nets)} nets to route")
|
||
|
||
def _routing_step(self):
|
||
"""FIXED: Route ALL nets at once using PathFinder, then update GUI progressively"""
|
||
try:
|
||
# Check if this is the first call - route ALL nets at once
|
||
if not hasattr(self, '_routing_started'):
|
||
logger.info(f"PATHFINDER SINGLE-PASS: Starting routing of ALL {len(self.routing_nets)} nets with negotiated congestion")
|
||
self.log_to_gui(f"🚀 Starting PathFinder routing of {len(self.routing_nets)} nets", "INFO")
|
||
|
||
# Route ALL nets in a single PathFinder call for proper congestion negotiation
|
||
self.routing_result = self.router.route_all_nets(
|
||
self.routing_nets,
|
||
timeout_per_net=30.0,
|
||
total_timeout=1800.0
|
||
)
|
||
|
||
# Get final routing statistics
|
||
self.routing_stats = self.router.get_routing_statistics()
|
||
|
||
logger.info(f"PATHFINDER COMPLETE: Routed {self.routing_stats.nets_routed}/{self.routing_stats.nets_attempted} nets ({self.routing_stats.success_rate:.1%})")
|
||
self.log_to_gui(f"✅ PathFinder complete: {self.routing_stats.nets_routed}/{self.routing_stats.nets_attempted} nets routed", "SUCCESS")
|
||
|
||
self._routing_started = True
|
||
self.current_net_index = 0
|
||
|
||
# Update visualization with all routed tracks
|
||
self._update_routing_visualization()
|
||
|
||
# Progressive GUI updates for visual feedback
|
||
if self.current_net_index < len(self.routing_nets):
|
||
batch_size = min(50, len(self.routing_nets) - self.current_net_index) # Show 50 nets per update
|
||
|
||
# Progress tracking (overuse table updated via iteration callback)
|
||
pass
|
||
|
||
# Log nets in current batch for user feedback
|
||
for i in range(batch_size):
|
||
if self.current_net_index + i < len(self.routing_nets):
|
||
net = self.routing_nets[self.current_net_index + i]
|
||
# Check if this net was successfully routed by looking at router state
|
||
if hasattr(self.router, 'routed_nets') and net.name in self.router.routed_nets:
|
||
self.routed_count += 1
|
||
logger.info(f"PATHFINDER SUCCESS: Routed net {net.name}")
|
||
else:
|
||
self.failed_count += 1
|
||
|
||
self.current_net_index += batch_size
|
||
self.status_label.setText(f"Processing routing results: {self.current_net_index}/{len(self.routing_nets)}")
|
||
else:
|
||
# All nets processed - complete routing
|
||
self._complete_routing()
|
||
return
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in PathFinder routing: {e}")
|
||
self._complete_routing()
|
||
|
||
def _route_net_batch(self, net_batch):
|
||
"""Route a batch of nets using REAL PathFinder with negotiated congestion"""
|
||
try:
|
||
if not net_batch:
|
||
return []
|
||
|
||
logger.info(f"REAL PATHFINDER: Routing batch of {len(net_batch)} nets with negotiated congestion")
|
||
|
||
# CRITICAL: Use route_all_nets() for REAL PathFinder routing
|
||
# This enables negotiated congestion, ripup/reroute, proper grid-based routing
|
||
routing_result = self.router.route_all_nets(
|
||
net_batch,
|
||
timeout_per_net=30.0, # 30 second timeout per net for real PathFinder
|
||
total_timeout=1800.0 # 30 minute total timeout for batch
|
||
)
|
||
|
||
# Get routing statistics to check success rate
|
||
routing_stats = self.router.get_routing_statistics()
|
||
|
||
# Extract success status for each net
|
||
batch_results = []
|
||
if routing_result.success and routing_stats.success_rate > 0:
|
||
# If overall routing was successful, assume all nets in this batch succeeded
|
||
batch_results = [True] * len(net_batch)
|
||
else:
|
||
# If overall routing failed, assume all nets in this batch failed
|
||
batch_results = [False] * len(net_batch)
|
||
|
||
logger.info(f"REAL PATHFINDER BATCH: Completed {len(net_batch)} nets with success rate: {routing_stats.success_rate:.1%}")
|
||
|
||
# Log batch results to GUI
|
||
if hasattr(self, 'log_to_gui'):
|
||
success_count = sum(batch_results)
|
||
self.log_to_gui(f"✅ Batch complete: {success_count}/{len(net_batch)} nets routed ({routing_stats.success_rate:.1%})", "SUCCESS" if success_count > 0 else "WARNING")
|
||
|
||
return batch_results
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in PathFinder batch routing: {e}")
|
||
import traceback
|
||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||
# Return all failures for this batch
|
||
return [False] * len(net_batch)
|
||
|
||
def _route_single_net_step(self, net):
|
||
"""DEPRECATED: Route a single net (fallback only - use batch routing for real PathFinder)"""
|
||
try:
|
||
if not net or not hasattr(net, 'name'):
|
||
return False
|
||
|
||
# DEPRECATED: This uses basic Dijkstra, not real PathFinder
|
||
# Use _route_net_batch() for proper PathFinder routing
|
||
logger.warning(f"DEPRECATED: Using single-net Dijkstra for {net.name} - should use batch PathFinder")
|
||
result = self.router.route_net(net, timeout=10.0)
|
||
|
||
if result and result.success:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error routing net {getattr(net, 'name', 'unknown')}: {e}")
|
||
return False
|
||
|
||
def _update_routing_visualization(self):
|
||
"""Update GUI visualization with current routing progress"""
|
||
logger.info("=== _update_routing_visualization() CALLED ===")
|
||
if self.pcb_viewer:
|
||
# Get current routed tracks and vias
|
||
tracks = self.router.get_routed_tracks()
|
||
vias = self.router.get_routed_vias()
|
||
|
||
# ACCUMULATIVE FIX: Add new tracks to existing tracks, don't overwrite
|
||
if tracks:
|
||
if 'tracks' not in self.board_data:
|
||
self.board_data['tracks'] = []
|
||
|
||
# Get existing track count
|
||
existing_count = len(self.board_data['tracks'])
|
||
|
||
# Add new tracks to existing ones
|
||
self.board_data['tracks'].extend(tracks)
|
||
|
||
total_count = len(self.board_data['tracks'])
|
||
logger.info(f"GUI: Added {len(tracks)} new tracks to {existing_count} existing tracks = {total_count} total tracks")
|
||
else:
|
||
logger.warning(f"No tracks received from router!")
|
||
|
||
if vias:
|
||
if 'vias' not in self.board_data:
|
||
self.board_data['vias'] = []
|
||
|
||
# Get existing via count
|
||
existing_via_count = len(self.board_data['vias'])
|
||
|
||
# Add new vias to existing ones
|
||
self.board_data['vias'].extend(vias)
|
||
|
||
total_via_count = len(self.board_data['vias'])
|
||
logger.info(f"GUI: Added {len(vias)} new vias to {existing_via_count} existing vias = {total_via_count} total vias")
|
||
else:
|
||
logger.debug(f"No vias received from router")
|
||
|
||
# Update viewer with ALL accumulated routing data
|
||
all_tracks = self.board_data.get('tracks', [])
|
||
all_vias = self.board_data.get('vias', [])
|
||
|
||
if hasattr(self.pcb_viewer, 'update_routing'):
|
||
logger.info(f"GUI: Calling pcb_viewer.update_routing() with {len(all_tracks)} total tracks, {len(all_vias)} total vias")
|
||
self.pcb_viewer.update_routing(all_tracks, all_vias)
|
||
else:
|
||
logger.info(f"GUI: pcb_viewer has no update_routing method, calling update() instead")
|
||
# Force repaint to show tracks
|
||
self.pcb_viewer.update()
|
||
|
||
def _complete_routing(self):
|
||
"""Complete the progressive routing process"""
|
||
self.routing_timer.stop()
|
||
|
||
elapsed_time = time.time() - self.routing_start_time
|
||
logger.info(f"SUCCESS: Progressive routing complete: {self.routed_count}/{len(self.routing_nets)} nets routed in {elapsed_time:.1f}s")
|
||
|
||
# Final GUI updates
|
||
self.status_label.setText(f"Routing complete: {self.routed_count}/{len(self.routing_nets)} nets routed")
|
||
|
||
# Final visualization update
|
||
self._update_routing_visualization()
|
||
|
||
# DEBUG: Screenshot after routing
|
||
if self.pcb_viewer:
|
||
self.pcb_viewer.debug_screenshot("after_routing")
|
||
|
||
# Reset UI
|
||
self._reset_routing_ui()
|
||
|
||
# Enable commit/rollback if any routes were created
|
||
if self.routed_count > 0:
|
||
self.commit_btn.setEnabled(True)
|
||
self.rollback_btn.setEnabled(True)
|
||
|
||
def _on_routing_progress(self, current, total, status, tracks, vias):
|
||
"""Handle routing progress updates from the thread"""
|
||
# Update status
|
||
self.status_label.setText(f"{status} ({current}/{total})")
|
||
|
||
# Update preview with new tracks/vias
|
||
if tracks:
|
||
if 'tracks' not in self.board_data:
|
||
self.board_data['tracks'] = []
|
||
self.board_data['tracks'].extend(tracks)
|
||
|
||
if vias:
|
||
if 'vias' not in self.board_data:
|
||
self.board_data['vias'] = []
|
||
self.board_data['vias'].extend(vias)
|
||
|
||
# PERFORMANCE: Throttle GUI updates during routing
|
||
if self.pcb_viewer and hasattr(self, '_last_update_time'):
|
||
import time
|
||
current_time = time.time()
|
||
if current_time - self._last_update_time > 0.25: # Throttle to 4 FPS, smooth ~25 nets/250ms
|
||
self.pcb_viewer.update()
|
||
self._last_update_time = current_time
|
||
elif self.pcb_viewer:
|
||
import time
|
||
self.pcb_viewer.update()
|
||
self._last_update_time = time.time()
|
||
|
||
def _update_preview_after_batch(self):
|
||
"""Extract routing results and update preview with traces and vias"""
|
||
logger.info("=== _update_preview_after_batch() CALLED ===")
|
||
try:
|
||
if not self.router:
|
||
logger.warning("No router available for result extraction")
|
||
return
|
||
|
||
# Get the latest routing results from router
|
||
if hasattr(self.router, 'routed_nets') and self.router.routed_nets:
|
||
logger.info(f"PREVIEW UPDATE: Extracting visual data from {len(self.router.routed_nets)} routed nets")
|
||
|
||
new_tracks = []
|
||
new_vias = []
|
||
|
||
for net_id, route in self.router.routing_results.items():
|
||
if route and hasattr(route, 'segments') and hasattr(route, 'vias'):
|
||
# Convert segments to tracks
|
||
for segment in route.segments:
|
||
if hasattr(segment, 'start') and hasattr(segment, 'end') and hasattr(segment, 'layer'):
|
||
track = {
|
||
'start_x': segment.start.x,
|
||
'start_y': segment.start.y,
|
||
'end_x': segment.end.x,
|
||
'end_y': segment.end.y,
|
||
'layer': segment.layer,
|
||
'width': getattr(segment, 'width', 0.2), # Default trace width
|
||
'net': net_id
|
||
}
|
||
new_tracks.append(track)
|
||
|
||
# Convert vias
|
||
for via in route.vias:
|
||
if hasattr(via, 'position') and hasattr(via, 'from_layer') and hasattr(via, 'to_layer'):
|
||
via_data = {
|
||
'x': via.position.x,
|
||
'y': via.position.y,
|
||
'from_layer': via.from_layer,
|
||
'to_layer': via.to_layer,
|
||
'drill': getattr(via, 'drill_size', 0.3), # Default drill size
|
||
'size': getattr(via, 'diameter', 0.6), # Default via diameter
|
||
'net': net_id
|
||
}
|
||
new_vias.append(via_data)
|
||
|
||
# Update board_data with new tracks and vias
|
||
if new_tracks:
|
||
if 'tracks' not in self.board_data:
|
||
self.board_data['tracks'] = []
|
||
self.board_data['tracks'].extend(new_tracks)
|
||
logger.info(f"PREVIEW UPDATE: Added {len(new_tracks)} tracks to preview")
|
||
|
||
if new_vias:
|
||
if 'vias' not in self.board_data:
|
||
self.board_data['vias'] = []
|
||
self.board_data['vias'].extend(new_vias)
|
||
logger.info(f"PREVIEW UPDATE: Added {len(new_vias)} vias to preview")
|
||
|
||
# Force preview update
|
||
if self.pcb_viewer and (new_tracks or new_vias):
|
||
self.pcb_viewer.update()
|
||
logger.info("PREVIEW UPDATE: Forced PCB viewer refresh")
|
||
|
||
else:
|
||
logger.debug("PREVIEW UPDATE: No routed nets available for visualization")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error updating preview after batch: {e}")
|
||
import traceback
|
||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||
|
||
def _on_routing_completed(self, result):
|
||
"""Handle routing completion"""
|
||
# Hide cancel button
|
||
if hasattr(self, 'cancel_btn'):
|
||
self.cancel_btn.setVisible(False)
|
||
|
||
# Update UI
|
||
self.commit_btn.setEnabled(True)
|
||
self.rollback_btn.setEnabled(True)
|
||
self.replay_btn.setEnabled(True)
|
||
self.route_preview_btn.setEnabled(True)
|
||
self.algorithm_combo.setEnabled(True)
|
||
|
||
# Store routing result for commit
|
||
self.routing_result = result
|
||
|
||
# Export demo artifacts (run_summary.json + GeoJSON)
|
||
self._export_demo_artifacts(result)
|
||
|
||
# DEBUG: Screenshot after routing completes
|
||
if self.pcb_viewer:
|
||
self.pcb_viewer.debug_screenshot("after_routing")
|
||
|
||
# Update status
|
||
routed_nets = result.get('routed_nets', 0)
|
||
failed_nets = result.get('failed_nets', 0)
|
||
self.status_label.setText(f"Manhattan routing completed: {routed_nets} nets routed")
|
||
|
||
def _on_pathfinder_progress(self, progress_data):
|
||
"""Handle PathFinder routing progress updates"""
|
||
if not hasattr(self, 'pathfinder_stats') or not self.pathfinder_stats:
|
||
return
|
||
|
||
progress_type = progress_data.get('type', '')
|
||
|
||
if progress_type == 'iteration_start':
|
||
# Update iteration progress
|
||
iteration = progress_data.get('iteration', 0)
|
||
max_iterations = progress_data.get('max_iterations', 50)
|
||
overuse_sum = progress_data.get('overuse_sum', 0)
|
||
overuse_edges = progress_data.get('overuse_edges', 0)
|
||
self.pathfinder_stats.update_iteration(iteration, max_iterations, overuse_sum, overuse_edges)
|
||
|
||
elif progress_type == 'routing_update':
|
||
# Update routing statistics
|
||
successful = progress_data.get('successful_routes', 0)
|
||
failed = progress_data.get('failed_routes', 0)
|
||
self.pathfinder_stats.update_routing_stats(successful, failed)
|
||
|
||
# Update congestion statistics
|
||
congested_edges = progress_data.get('congested_edges', 0)
|
||
total_edges = progress_data.get('total_edges', 1)
|
||
self.pathfinder_stats.update_congestion(congested_edges, total_edges)
|
||
|
||
elif progress_type == 'convergence':
|
||
# PathFinder converged
|
||
iteration = progress_data.get('iteration', 0)
|
||
max_iterations = 50 # Default, will be updated by iteration_start
|
||
overuse_sum = progress_data.get('overuse_sum', 0)
|
||
overuse_edges = progress_data.get('overuse_edges', 0)
|
||
self.pathfinder_stats.update_iteration(iteration, max_iterations, overuse_sum, overuse_edges)
|
||
|
||
elif progress_type == 'completion':
|
||
# Routing completed
|
||
successful = progress_data.get('successful_routes', 0)
|
||
total = progress_data.get('total_routes', 0)
|
||
converged = progress_data.get('converged', False)
|
||
|
||
final_message = f"Completed: {successful}/{total} routes"
|
||
if converged:
|
||
final_message += " (Converged)"
|
||
|
||
self.pathfinder_stats.finish_routing(successful > 0, final_message)
|
||
|
||
# Clean up GPU resources if they were used
|
||
try:
|
||
if hasattr(self.routing_thread, 'router') and hasattr(self.routing_thread.router, 'gpu_provider'):
|
||
if self.routing_thread.router.gpu_provider and hasattr(self.routing_thread.router.gpu_provider, 'cleanup'):
|
||
logger.info("Cleaning up GPU resources")
|
||
self.routing_thread.router.gpu_provider.cleanup()
|
||
except Exception as e:
|
||
logger.warning(f"Error cleaning up GPU resources: {e}")
|
||
|
||
def _on_routing_error(self, error_message):
|
||
"""Handle routing errors"""
|
||
logger.error(f"Routing error: {error_message}")
|
||
self.status_label.setText(f"Error: {error_message}")
|
||
self._reset_routing_ui()
|
||
|
||
# Clean up GPU resources if they were used
|
||
try:
|
||
if hasattr(self.routing_thread, 'router') and hasattr(self.routing_thread.router, 'gpu_provider'):
|
||
if self.routing_thread.router.gpu_provider and hasattr(self.routing_thread.router.gpu_provider, 'cleanup'):
|
||
logger.info("Cleaning up GPU resources")
|
||
self.routing_thread.router.gpu_provider.cleanup()
|
||
except Exception as e:
|
||
logger.warning(f"Error cleaning up GPU resources: {e}")
|
||
|
||
# Hide cancel button
|
||
if hasattr(self, 'cancel_btn'):
|
||
self.cancel_btn.setVisible(False)
|
||
|
||
QMessageBox.critical(self, "Routing Error", f"Error during routing:\n{error_message}")
|
||
|
||
def _cancel_routing(self):
|
||
"""Cancel current routing operation"""
|
||
if hasattr(self, 'routing_thread') and self.routing_thread.isRunning():
|
||
logger.info("Cancelling routing...")
|
||
self.status_label.setText("Cancelling routing...")
|
||
self.routing_thread.cancel()
|
||
|
||
# Clean up GPU resources if they were used
|
||
try:
|
||
if hasattr(self.routing_thread, 'router') and hasattr(self.routing_thread.router, 'gpu_provider'):
|
||
if self.routing_thread.router.gpu_provider and hasattr(self.routing_thread.router.gpu_provider, 'cleanup'):
|
||
logger.info("Cleaning up GPU resources")
|
||
self.routing_thread.router.gpu_provider.cleanup()
|
||
except Exception as e:
|
||
logger.warning(f"Error cleaning up GPU resources: {e}")
|
||
|
||
self.routing_thread.wait() # Wait for thread to finish
|
||
|
||
self._reset_routing_ui()
|
||
self.status_label.setText("Routing cancelled")
|
||
|
||
# Hide cancel button
|
||
if hasattr(self, 'cancel_btn'):
|
||
self.cancel_btn.setVisible(False)
|
||
|
||
def _reset_routing_ui(self):
|
||
"""Reset routing UI to initial state"""
|
||
self.route_preview_btn.setEnabled(True)
|
||
self.algorithm_combo.setEnabled(True)
|
||
self.commit_btn.setEnabled(False)
|
||
self.rollback_btn.setEnabled(False)
|
||
self.replay_btn.setEnabled(False)
|
||
self.overuse_table_label.setText("Waiting for routing to start...")
|
||
self.status_label.setText("Ready")
|
||
|
||
def log_to_gui(self, message: str, level: str = "INFO"):
|
||
"""Add real-time log message to GUI routing log"""
|
||
from datetime import datetime
|
||
|
||
# Add timestamp and format message
|
||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||
formatted_message = f"[{timestamp}] {level}: {message}"
|
||
|
||
# Color code by level
|
||
color_map = {
|
||
"INFO": "#d4d4d4", # Light gray
|
||
"SUCCESS": "#4EC9B0", # Teal
|
||
"WARNING": "#DCDCAA", # Yellow
|
||
"ERROR": "#F44747", # Red
|
||
"DEBUG": "#808080" # Gray
|
||
}
|
||
color = color_map.get(level, "#d4d4d4")
|
||
|
||
# Add colored message to log widget
|
||
if hasattr(self, 'routing_log'):
|
||
self.routing_log.append(f'<span style="color: {color};">{formatted_message}</span>')
|
||
|
||
# Auto-scroll to bottom
|
||
cursor = self.routing_log.textCursor()
|
||
cursor.movePosition(cursor.MoveOperation.End)
|
||
self.routing_log.setTextCursor(cursor)
|
||
|
||
# Limit log to last 1000 lines for performance
|
||
if self.routing_log.document().lineCount() > 1000:
|
||
cursor.movePosition(cursor.MoveOperation.Start)
|
||
cursor.movePosition(cursor.MoveOperation.Down, cursor.MoveMode.KeepAnchor, 200)
|
||
cursor.removeSelectedText()
|
||
|
||
def clear_routing_log(self):
|
||
"""Clear the routing log window"""
|
||
if hasattr(self, 'routing_log'):
|
||
self.routing_log.clear()
|
||
|
||
def _convert_board_data_to_domain(self, board_data, drc_constraints):
|
||
"""Convert board_data dict to domain Board object for RRG router"""
|
||
from orthoroute.domain.models.board import Board, Net, Pad, Bounds, Coordinate, Component
|
||
|
||
# Create board bounds
|
||
bounds_data = board_data.get('bounds', (0, 0, 100, 100))
|
||
board_bounds = Bounds(
|
||
min_x=bounds_data[0],
|
||
min_y=bounds_data[1],
|
||
max_x=bounds_data[2],
|
||
max_y=bounds_data[3]
|
||
)
|
||
|
||
# Convert nets and pads
|
||
nets = []
|
||
nets_data = board_data.get('nets', {})
|
||
|
||
for net_name, net_data in nets_data.items():
|
||
if not net_name or net_name.strip() == "":
|
||
continue
|
||
|
||
pads_data = net_data.get('pads', [])
|
||
if len(pads_data) < 2:
|
||
continue # Skip single-pad nets
|
||
|
||
# Convert pads
|
||
net_pads = []
|
||
for pad_data in pads_data:
|
||
pad = Pad(
|
||
id=f"{net_name}_pad_{len(net_pads)}",
|
||
component_id=f"comp_{net_name}_{len(net_pads)}",
|
||
net_id=f"net_{len(nets)}",
|
||
position=Coordinate(
|
||
x=pad_data.get('x', 0.0),
|
||
y=pad_data.get('y', 0.0)
|
||
),
|
||
size=(
|
||
pad_data.get('width', 1.0),
|
||
pad_data.get('height', 1.0)
|
||
),
|
||
drill_size=pad_data.get('drill', None),
|
||
layer=pad_data.get('layers', ['F.Cu'])[0] if pad_data.get('layers') else 'F.Cu'
|
||
)
|
||
net_pads.append(pad)
|
||
|
||
# Create net
|
||
net = Net(
|
||
id=f"net_{len(nets)}",
|
||
name=net_name,
|
||
pads=net_pads
|
||
)
|
||
nets.append(net)
|
||
|
||
# Create mock components for proper bounds calculation
|
||
components = []
|
||
all_pads = []
|
||
for net in nets:
|
||
for i, pad in enumerate(net.pads):
|
||
all_pads.append(pad)
|
||
|
||
# Create a single mock component containing all pads
|
||
if all_pads:
|
||
# Calculate center position
|
||
avg_x = sum(pad.position.x for pad in all_pads) / len(all_pads)
|
||
avg_y = sum(pad.position.y for pad in all_pads) / len(all_pads)
|
||
|
||
mock_component = Component(
|
||
id="mock_comp_1",
|
||
reference="U1",
|
||
value="MOCK",
|
||
footprint="MOCK_FP",
|
||
position=Coordinate(avg_x, avg_y),
|
||
pads=all_pads
|
||
)
|
||
components.append(mock_component)
|
||
|
||
# Create board - handle filename robustly
|
||
filename = board_data.get('filename') or board_data.get('name') or 'TestBackplane.kicad_pcb'
|
||
board = Board(
|
||
id="board_1",
|
||
name=filename,
|
||
components=components,
|
||
nets=nets,
|
||
layer_count=board_data.get('layers', 2) # Dynamic from KiCad file
|
||
)
|
||
|
||
# Store airwires as a custom attribute for RRG routing
|
||
board._airwires = board_data.get('airwires', [])
|
||
# Store KiCad-calculated bounds for accurate routing area
|
||
board._kicad_bounds = board_data.get('bounds', None)
|
||
|
||
return board
|
||
|
||
|
||
# PathFinder GUI Integration Methods
|
||
def _update_pathfinder_status(self, status_text: str):
|
||
"""Update status bar with PathFinder instrumentation metrics"""
|
||
self.status_label.setText(status_text)
|
||
self.metrics_label.setText(status_text)
|
||
self.metrics_label.setVisible(True)
|
||
|
||
# Also print to terminal for console monitoring
|
||
print(f"[PathFinder]: {status_text}")
|
||
|
||
def _update_csv_export_status(self, csv_status: str):
|
||
"""Update status bar with CSV export information"""
|
||
self.csv_status_label.setText(csv_status)
|
||
self.csv_status_label.setVisible(True)
|
||
|
||
# Auto-hide CSV status after 10 seconds
|
||
QTimer.singleShot(10000, lambda: self.csv_status_label.setVisible(False))
|
||
|
||
def _display_instrumentation_summary(self):
|
||
"""Display instrumentation summary when routing completes"""
|
||
if hasattr(self.router, 'pathfinder') and hasattr(self.router.pathfinder, 'get_instrumentation_summary'):
|
||
summary = self.router.pathfinder.get_instrumentation_summary()
|
||
|
||
if summary:
|
||
summary_text = (f"Session: {summary.get('session_id', 'N/A')} | "
|
||
f"Iterations: {summary.get('total_iterations', 0)} | "
|
||
f"Success: {summary.get('final_success_rate', 0):.1f}% | "
|
||
f"Nets: {summary.get('successful_nets', 0)}/{summary.get('total_nets_processed', 0)} | "
|
||
f"Avg Time: {summary.get('avg_routing_time_ms', 0):.1f}ms")
|
||
|
||
self.metrics_label.setText(summary_text)
|
||
self.metrics_label.setVisible(True)
|
||
|
||
print(f"[SUMMARY] Routing Summary: {summary_text}")
|
||
logger.info(f"Instrumentation summary: {summary}")
|
||
|
||
|
||
def _export_demo_artifacts(self, result):
|
||
"""Export run summary and GeoJSON artifacts for demo"""
|
||
try:
|
||
import json
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
# Create artifacts directory
|
||
artifacts_dir = Path("demo_artifacts")
|
||
artifacts_dir.mkdir(exist_ok=True)
|
||
|
||
# Generate run summary
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
# Get GPU status from GPUConfig (hardcoded, no env vars)
|
||
try:
|
||
from ...algorithms.manhattan.unified_pathfinder import GPUConfig
|
||
gpu_enabled = GPUConfig.GPU_MODE
|
||
except ImportError:
|
||
gpu_enabled = False
|
||
|
||
run_summary = {
|
||
"timestamp": timestamp,
|
||
"routing_engine": "UnifiedPathFinder",
|
||
"gpu_enabled": gpu_enabled,
|
||
"total_nets": getattr(result, 'total_nets', 0),
|
||
"successful_nets": getattr(result, 'successful_nets', 0),
|
||
"failed_nets": getattr(result, 'failed_nets', 0),
|
||
"success_rate": f"{(getattr(result, 'successful_nets', 0) / getattr(result, 'total_nets', 1) * 100):.1f}%",
|
||
"total_tracks": len(self.board_data.get('tracks', [])),
|
||
"total_vias": len(self.board_data.get('vias', [])),
|
||
"board_bounds": getattr(self.board_data, 'bounds', None),
|
||
"grid_pitch": 0.4,
|
||
}
|
||
|
||
# Export run_summary.json
|
||
summary_file = artifacts_dir / f"run_summary_{timestamp}.json"
|
||
with open(summary_file, 'w') as f:
|
||
json.dump(run_summary, f, indent=2)
|
||
|
||
# Export GeoJSON (simplified track/via representation)
|
||
geojson = {
|
||
"type": "FeatureCollection",
|
||
"features": []
|
||
}
|
||
|
||
# Add tracks as LineString features
|
||
for track in self.board_data.get('tracks', []):
|
||
feature = {
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "LineString",
|
||
"coordinates": [[track.get('start_x', 0), track.get('start_y', 0)],
|
||
[track.get('end_x', 0), track.get('end_y', 0)]]
|
||
},
|
||
"properties": {
|
||
"type": "track",
|
||
"width": track.get('width', 0.2),
|
||
"layer": track.get('layer', 0),
|
||
"net_id": track.get('net_id', 'unknown')
|
||
}
|
||
}
|
||
geojson["features"].append(feature)
|
||
|
||
# Add vias as Point features
|
||
for via in self.board_data.get('vias', []):
|
||
feature = {
|
||
"type": "Feature",
|
||
"geometry": {
|
||
"type": "Point",
|
||
"coordinates": [via.get('x', 0), via.get('y', 0)]
|
||
},
|
||
"properties": {
|
||
"type": "via",
|
||
"diameter": via.get('diameter', 0.3),
|
||
"drill": via.get('drill', 0.15),
|
||
"layers": f"L{via.get('from_layer', 0)}-L{via.get('to_layer', 1)}",
|
||
"net_id": via.get('net_id', 'unknown')
|
||
}
|
||
}
|
||
geojson["features"].append(feature)
|
||
|
||
# Export GeoJSON
|
||
geojson_file = artifacts_dir / f"routing_geometry_{timestamp}.geojson"
|
||
with open(geojson_file, 'w') as f:
|
||
json.dump(geojson, f, indent=2)
|
||
|
||
# Log success
|
||
logger.info(f"[EXPORT] Demo artifacts exported:")
|
||
logger.info(f"[EXPORT] - Summary: {summary_file}")
|
||
logger.info(f"[EXPORT] - GeoJSON: {geojson_file}")
|
||
self.log_to_gui(f"[EXPORT] Artifacts saved to demo_artifacts/", "SUCCESS")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to export demo artifacts: {e}")
|
||
self.log_to_gui(f"[EXPORT] Warning: Could not save artifacts", "WARNING")
|
||
|
||
|
||
# Add methods to OrthoRouteMainWindow class
|
||
OrthoRouteMainWindow._update_pathfinder_status = _update_pathfinder_status
|
||
OrthoRouteMainWindow._update_csv_export_status = _update_csv_export_status
|
||
OrthoRouteMainWindow._display_instrumentation_summary = _display_instrumentation_summary
|
||
OrthoRouteMainWindow._export_demo_artifacts = _export_demo_artifacts |