OrthoRoute/orthoroute/algorithms/manhattan/unified_pathfinder.py
2025-12-16 15:14:04 +01:00

5819 lines
276 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
═══════════════════════════════════════════════════════════════════════════════
UNIFIED HIGH-PERFORMANCE PATHFINDER - PCB ROUTING ENGINE WITH PORTAL ESCAPES
═══════════════════════════════════════════════════════════════════════════════
ALGORITHM OVERVIEW:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
This implements the PathFinder negotiated congestion routing algorithm for
multi-layer PCB routing with full blind/buried via support and portal-based
pad escapes. PathFinder is an iterative refinement algorithm that resolves
resource conflicts through economic pressure.
CORE PATHFINDER LOOP:
───────────────────────────────────────────────────────────────────────────────
1. Initialize: Build 3D lattice graph with H/V layer constraints + portal escapes
2. For iteration = 1 to MAX_ITERATIONS:
a) REFRESH: Rebuild usage from committed net paths (clean accounting)
b) UPDATE_COSTS: Apply congestion penalties (present + historical)
c) HOTSET: Select only nets touching overused edges (adaptive cap)
d) ROUTE: Route hotset nets using heap-based Dijkstra on ROI subgraphs
e) COMMIT: Update edge usage and tracking structures
f) CHECK: If no overuse → SUCCESS, exit
g) ESCALATE: Increase present_factor pressure for next iteration
3. If max iterations reached: Run detail refinement pass on conflict zones
KEY INSIGHT: PathFinder uses economics - overused edges get expensive, forcing
nets to find alternatives. Historical cost prevents oscillation. Portal escapes
provide cheap entry to inner layers to spread routing across all 18 layers.
═══════════════════════════════════════════════════════════════════════════════
PRECOMPUTED PAD ESCAPE ARCHITECTURE (THE BREAKTHROUGH)
═══════════════════════════════════════════════════════════════════════════════
THE PROBLEM:
───────────────────────────────────────────────────────────────────────────────
SMD pads are physically on F.Cu (layer 0). Without precomputed escapes:
• All nets start on F.Cu → massive congestion on top layer
• Via cost (3.0) discourages layer changes → router fights on F.Cu/In1.Cu
• 16 inner layers sit idle while top layers are saturated
• Routing completion: 16% (only 73/464 nets route successfully)
THE SOLUTION: PRECOMPUTED DRC-CLEAN PAD ESCAPES
───────────────────────────────────────────────────────────────────────────────
Before routing begins, we precompute escape routing for EVERY pad attached to
a net. This completely eliminates F.Cu as a bottleneck by distributing traffic
across 9 horizontal routing layers (In1, In3, In5, In7, In9, In11, In13, In15, B.Cu).
The routing problem transforms from "route 3200 pads on F.Cu" to "route between
portal landing points on horizontal layers" - a pure grid routing problem that
PathFinder excels at.
PRECOMPUTATION PIPELINE:
───────────────────────────────────────────────────────────────────────────────
1. PORTAL PLANNING (per pad):
• X-alignment: Snap to nearest lattice column (±½ pitch = 0.2mm tolerance)
• Length: Random 1.2mm - 5mm (3-12 grid steps @ 0.4mm pitch)
• Direction: Random ±Y (up or down from pad)
• Bounds checking: Flip direction if out of board bounds
• DRC Pass 1: Check 0.15mm clearance from ALL OTHER PADS
- Via position must maintain clearance
- Stub path (sampled at 25%, 50%, 75%) must maintain clearance
- Up to 10 random attempts per pad
• Result: Portal landing point (x_grid, y_grid ± delta_steps)
2. ESCAPE GEOMETRY GENERATION (first pass):
• Vertical stub: F.Cu track from pad center to portal landing point
• Portal via: F.Cu → random horizontal layer (odd index: 1, 3, 5, ...)
- Via spec: 0.15mm hole, 0.25mm diameter (0.05mm annular ring)
• Entry layer: Randomly selected from In1, In3, In5, In7, In9, In11, In13, In15, B.Cu
• All 3200 escapes generated in parallel
3. DRC CONFLICT RESOLUTION (second pass - up to 3 iterations):
• Via-to-via checking: Ensure escapes maintain 0.4mm clearance from each other
• Track-to-via checking: Ensure escape stubs don't violate via clearances
• Conflict detection: Point-to-segment distance calculations
• Retry logic: Regenerate conflicting escapes with new random parameters
• Result: DRC-clean escape geometry for all routable pads
4. ROUTING STARTS FROM PORTAL LANDINGS:
• Source terminal: (x_src_portal, y_src_portal, L_horizontal_i)
• Dest terminal: (x_dst_portal, y_dst_portal, L_horizontal_j)
• PathFinder routes between portal landing points on horizontal layers
• No F.Cu congestion - traffic is pre-distributed across 9 layers
• Pure Manhattan grid routing with alternating H/V discipline
5. FINAL GEOMETRY EMISSION:
• Precomputed escape stubs (vertical F.Cu tracks)
• Precomputed portal vias (F.Cu → entry layer)
• Routed paths (between portal landing points)
• Regular routing vias (layer transitions during pathfinding)
KEY ADVANTAGES:
───────────────────────────────────────────────────────────────────────────────
✓ F.Cu bottleneck eliminated: Traffic distributed before routing starts
✓ Deterministic layer spreading: Random layer selection ensures even distribution
✓ DRC-clean from the start: No escape geometry violates clearance rules
✓ Parallel precomputation: All 3200 escapes generated simultaneously
✓ Pure grid routing problem: PathFinder works on its optimal problem class
✓ Minimal via count: Only one via per pad (escape via), rest is grid routing
✓ Retry resilience: Conflicts automatically resolved through regeneration
RESULTS:
───────────────────────────────────────────────────────────────────────────────
• Before: 16% completion (73/464 nets), F.Cu saturated, inner layers idle
• After: Expected 80-90%+ completion, even layer utilization, clean geometry
═══════════════════════════════════════════════════════════════════════════════
GRAPH REPRESENTATION & DATA STRUCTURES
═══════════════════════════════════════════════════════════════════════════════
3D LATTICE:
───────────────────────────────────────────────────────────────────────────────
• Grid: (x_steps × y_steps × layers) nodes
- x_steps, y_steps: Board dimensions ÷ grid_pitch (default 0.4mm)
- layers: Copper layer count (6-18 typical, supports up to 32)
• Node indexing: flat_idx = layer × (x_steps × y_steps) + y × x_steps + x
- Fast arithmetic: layer = idx ÷ plane_size
- Enables O(1) coordinate lookups without function calls
LAYER DISCIPLINE (H/V Manhattan Routing):
───────────────────────────────────────────────────────────────────────────────
• F.Cu (L0): Vertical routing only (for portal escapes)
• Inner layers: Alternating H/V polarity
- L1 (In1.Cu): Horizontal
- L2 (In2.Cu): Vertical
- L3 (In3.Cu): Horizontal
- ... continues alternating
• B.Cu (L17): Opposite polarity of F.Cu
CSR GRAPH (Compressed Sparse Row):
───────────────────────────────────────────────────────────────────────────────
• Format: indptr[N+1], indices[E], base_costs[E]
- indptr[i] to indptr[i+1]: edge index range for node i
- indices[j]: destination node for edge j
- base_costs[j]: base cost for edge j (before congestion)
• Construction (memory-efficient for 30M edges):
- Pre-allocate numpy structured array with edge count
- Fill array directly (no Python list intermediate)
- Sort by source node in-place
- Extract indices/costs components
- Immediately free temporary arrays
EDGE TYPES:
───────────────────────────────────────────────────────────────────────────────
1. Lateral edges (H/V movement):
• Cost: grid_pitch (0.4mm base unit)
• Enforces Manhattan discipline per layer
• Count: ~3M edges for 183×482×18 lattice
2. Via edges (layer transitions):
• Constraint: Same (x,y), different layers
• Full blind/buried: ALL layer pairs allowed
• Cost: via_cost × (1 + span_alpha × (span-1))
• Count: ~27M edges for full blind/buried
• Storage: Boolean numpy array (~30MB, not Python set)
3. Portal escape edges:
• Special via edges at terminal nodes
• Cost: via_cost × portal_via_discount × span_cost
• Applied only to first hop from pad terminals
EDGE ACCOUNTING (EdgeAccountant):
───────────────────────────────────────────────────────────────────────────────
• canonical: Dict[edge_idx → usage_count] - persistent ground truth
• present: Array[E] - current iteration usage (REBUILT each iteration)
• history: Array[E] - accumulated historical congestion
• total_cost: Array[E] - final cost for routing
FORMULA: total_cost[e] = base[e] + pres_fac × overuse[e] + hist_weight × history[e]
COST EVOLUTION:
Iteration 1: pres_fac=1.0 → Light penalties, natural shortest paths
Iteration 2: pres_fac=1.8 → Moderate penalties on overused edges
Iteration 7: pres_fac=34.0 → Strong penalties, forced alternatives
Iteration 11: pres_fac=357 → Extreme penalties, via annealing kicks in
Iteration 16+: pres_fac=1000 (capped) → Near-infinite cost on overuse
═══════════════════════════════════════════════════════════════════════════════
HOTSET MECHANISM (PREVENTS THRASHING)
═══════════════════════════════════════════════════════════════════════════════
PROBLEM (without hotsets):
• Re-routing ALL 464 nets every iteration takes minutes
• 90% of nets are clean, re-routing them wastes time and risks new conflicts
SOLUTION (adaptive hotsets):
• Iteration 1: Route all nets (initial solution)
• Iteration 2+: Only re-route nets that touch overused edges
HOTSET BUILDING (O(1) via edge-to-nets tracking):
───────────────────────────────────────────────────────────────────────────────
1. Find overused edges: over_idx = {e | present[e] > capacity[e]}
2. Find offending nets: offenders = (edge_to_nets[e] for e in over_idx)
3. Score by impact: impact[net] = Σ(overuse[e] for e in net_to_edges[net] ∩ over_idx)
4. Adaptive cap: min(hotset_cap, max(64, 3 × |over_idx|))
• 26 overused edges → hotset ~78 nets (not 418)
• 500 overused edges → hotset capped at 150
NET-TO-EDGE TRACKING:
• _net_to_edges: Dict[net_id → [edge_indices]] - cached when paths committed
• _edge_to_nets: Dict[edge_idx → {net_ids}] - reverse mapping
• Updated on: commit, clear, rip operations
• Enables O(1) hotset building instead of O(N×E) path scanning
TYPICAL EVOLUTION:
• Iter 1: Route 464 nets → 81 succeed, 514 overused edges
• Iter 2: Hotset 150 nets → 81 succeed, 275 overused edges
• Iter 7: Hotset 150 nets → 81 succeed, 143 overused edges
• Iter 12: Hotset 96 nets → 61 succeed, 29 overused edges (rip event)
• Iter 27: Hotset 64 nets → 73 succeed, 22 overused edges
• Detail pass: Hotset 8 nets, 6 iters → 0 overuse (SUCCESS)
═══════════════════════════════════════════════════════════════════════════════
PATHFINDER NEGOTIATION - ITERATION DETAIL
═══════════════════════════════════════════════════════════════════════════════
STEP 0: CLEAN ACCOUNTING (iter 2+)
• _rebuild_usage_from_committed_nets()
• Clear canonical and present arrays
• Rebuild from all currently routed nets using net_to_edges cache
• Prevents ghost usage from rip/re-route cycles
STEP 1: UPDATE COSTS (once per iteration, not per net)
• Check via annealing policy:
- If pres_fac ≥ 200 and via_overuse > 70%: via_cost × 1.5 (penalize vias)
- Else if pres_fac ≥ 200: via_cost × 0.5 (encourage layer hopping)
• Compute: total_cost[e] = base[e] + pres_fac × overuse[e] + hist_weight × history[e]
• Costs reused for all nets in this iteration (major speedup)
STEP 2: BUILD HOTSET
• Find overused edges using edge_to_nets
• Adaptive cap prevents thrashing
• Log: overuse_edges, offenders, unrouted, cap, hotset_size
STEP 3: ROUTE NETS IN HOTSET
• For each net:
a) Clear old path from accounting (if exists)
b) Extract ROI: Typically 5K-50K nodes from 1.6M total
c) Run heap-based Dijkstra on ROI: O(E_roi × log V_roi)
d) Fallback to larger ROI if needed (max 5 per iteration)
e) Commit path: Update canonical, present, net_to_edges, edge_to_nets
STEP 4: COMPUTE OVERUSE & METRICS
• overuse_sum, overused_edge_count
• via_overuse percentage (for annealing policy)
• Every 3 iterations: Log top-10 overused channels with coordinates
STEP 5: UPDATE HISTORY
• history[e] += hist_gain × overuse[e]
• Prevents oscillation
STEP 6: TERMINATION & STAGNATION
• SUCCESS: If overuse == 0 → exit
• STAGNATION: If no improvement for 5 iterations:
- Rip top-K offenders (k=13-20)
- Hold pres_fac for 2 iterations
- Grow ROI margin (+0.6mm)
• CONTINUE: pres_fac × 1.8, next iteration
STEP 7: DETAIL REFINEMENT (after 30 iters if overuse remains)
• Extract conflict zone (nets touching overused edges)
• Run focused negotiation with pres_fac=500-1000
• 10 iteration limit
• Often achieves zero overuse on final 8-20 nets
═══════════════════════════════════════════════════════════════════════════════
ROI EXTRACTION & SHORTEST PATH SOLVING
═══════════════════════════════════════════════════════════════════════════════
ROI EXTRACTION (Region of Interest):
───────────────────────────────────────────────────────────────────────────────
• Problem: Full graph is 1.6M nodes, 30M edges - too large for per-net Dijkstra
• Solution: Extract subgraph containing only nodes near src/dst
• Method: BFS expansion from src and dst simultaneously
• Result: ROI typically 5K-50K nodes (100-1000× smaller than full graph)
ADAPTIVE ROI SIZING:
• initial_radius: 24 steps (~10mm @ 0.4mm pitch)
• Stagnation bonus: +0.6mm per stagnation event (grows when stuck)
• Fallback: If ROI fails, retry with radius=60 (limit 5 fallbacks/iteration)
SimpleDijkstra: HEAP-BASED O(E log V) SSSP
───────────────────────────────────────────────────────────────────────────────
• Priority queue: Python heapq with (distance, node) tuples
• Operates on ROI subgraph (not full graph)
• Early termination when destination reached
• Visited tracking prevents re-expansion
• Typical performance: 0.1-0.5s per net on 18-layer board
MULTI-SOURCE/MULTI-SINK (for portal routing - TO BE IMPLEMENTED):
• Initialize heap with multiple (distance, node) entries for all portal layers
• Terminate when ANY destination portal layer reached
• Choose best entry/exit layers dynamically per net
GPU SUPPORT (currently disabled):
───────────────────────────────────────────────────────────────────────────────
• config.use_gpu defaults to False
• GPU arrays available but SimpleDijkstra runs on CPU
• Avoids host↔device copy overhead without GPU SSSP kernel
• Future: GPU near-far/delta-stepping when fully vectorized
═══════════════════════════════════════════════════════════════════════════════
BLIND/BURIED VIA SUPPORT
═══════════════════════════════════════════════════════════════════════════════
VIA POLICY: ALL LAYER PAIRS ALLOWED
───────────────────────────────────────────────────────────────────────────────
• Any layer can connect to any other layer at same (x,y)
• Examples:
- F.Cu ↔ In1.Cu (microvia)
- In5.Cu ↔ In12.Cu (buried via)
- F.Cu ↔ B.Cu (through via)
- F.Cu ↔ In10.Cu (blind via)
VIA COSTING (encourages short spans but allows long):
───────────────────────────────────────────────────────────────────────────────
• Base cost: via_cost = 3.0
• Span penalty: cost = via_cost × (1 + 0.15 × (span - 1))
- span=1 (adjacent): 3.0
- span=5: 4.8
- span=10: 7.05
- span=17 (through): 10.2
• Portal discount (applied after graph build):
- First hop from pad terminals: cost × 0.4
- Escape via F.Cu → In1.Cu: 3.0 × 0.4 = 1.2 (cheap)
- Makes entering grid economical, encourages immediate layer spreading
VIA EDGE REPRESENTATION:
───────────────────────────────────────────────────────────────────────────────
• Count: C(18,2) × x_steps × y_steps = 153 via pairs/cell × 88,206 cells = 27M edges
• Storage: Boolean numpy array (30MB) marks which edges are vias
• Used for: via-specific overuse tracking and annealing policy
COORDINATE SYSTEMS:
───────────────────────────────────────────────────────────────────────────────
• World: (x_mm, y_mm, layer) - Physical PCB coordinates in millimeters
• Lattice: (x_idx, y_idx, layer) - Grid indices (0..x_steps, 0..y_steps)
• Node: flat_index - Single integer for CSR: layer×(x_steps×y_steps) + y×x_steps + x
CONVERSIONS:
• world_to_lattice(): (x_mm, y_mm) → (x_idx, y_idx) via floor + clamp
• lattice_to_world(): (x_idx, y_idx) → (x_mm, y_mm) via pitch×idx + offset
• node_idx(): (x_idx, y_idx, layer) → flat_index for CSR indexing
• Arithmetic layer lookup: layer = flat_idx ÷ (x_steps × y_steps)
═══════════════════════════════════════════════════════════════════════════════
CRITICAL INVARIANTS
═══════════════════════════════════════════════════════════════════════════════
INVARIANT 1: Edge capacity = 1 per edge
• No edge sharing allowed
• Multiple nets on same edge = overuse = must resolve
INVARIANT 2: Present usage rebuilt from committed nets each iteration
• Never carry stale present_usage between iterations
• Prevents ghost usage accumulation
INVARIANT 3: Hotset contains ONLY nets touching overused edges
• Plus unrouted nets + explicitly ripped nets
• Prevents thrashing (re-routing clean nets wastes time)
INVARIANT 4: Costs updated once per iteration, before routing
• All nets in iteration see same cost landscape
• Enables fair negotiation
INVARIANT 5: Portal escape stubs are private (no congestion)
• Not in global routing graph
• Emitted directly to geometry
═══════════════════════════════════════════════════════════════════════════════
COMMON FAILURE MODES & FIXES
═══════════════════════════════════════════════════════════════════════════════
"Stuck at 81/464 routed for many iterations"
• CAUSE: All pads on F.Cu, via cost too high, router fights on top layers
• FIX: Portal escapes with discounted vias (IMPLEMENTED)
"Hotset contains 400+ nets when only 26 edges overused"
• CAUSE: Hotset not capped adaptively
• FIX: adaptive_cap = min(150, max(64, 3 × overused_edges)) (FIXED)
"Overuse jumps: 193 → 265 → 318"
• CAUSE: Ghost usage from dirty accounting
• FIX: Rebuild present from scratch each iteration (FIXED)
"MemoryError during graph construction"
• CAUSE: Python list of 30M tuples exhausts memory
• FIX: Pre-allocate numpy structured array (FIXED)
"Only F.Cu and In1.Cu show overuse, 16 layers idle"
• CAUSE: Portal escapes not implemented yet
• FIX: Portal discounts + multi-layer seeding (TO BE IMPLEMENTED)
"48 nets unmapped (dropped during parsing)"
• CAUSE: Pad key mismatch between mapping and lookup
• FIX: Consistent key generation with coordinates for orphaned pads (FIXED)
═══════════════════════════════════════════════════════════════════════════════
PERFORMANCE OPTIMIZATIONS
═══════════════════════════════════════════════════════════════════════════════
1. NO EDGE LOOKUP DICT:
• OLD: 30M-entry Python dict (u,v) → edge_idx (~several GB)
• NEW: On-the-fly CSR scan (degree ~4-6 in Manhattan lattice)
• Saves: ~3GB memory + ~10s startup time
2. NUMPY VIA TRACKING:
• OLD: Python set with 27M edge indices (~750MB)
• NEW: Boolean array (~30MB)
• 25× memory reduction
3. BINARY SEARCH IN LOGGING:
• OLD: O(N) linear scan to find source node for edge
• NEW: np.searchsorted(indptr, edge_idx) → O(log N)
4. ARITHMETIC VIA DETECTION:
• OLD: idx_to_coord() calls for each edge
• NEW: layer = idx ÷ plane_size (arithmetic)
• Millions of function calls eliminated
5. HEAP-BASED DIJKSTRA:
• OLD: O(V²) np.argmin() scan per iteration
• NEW: O(E log V) priority queue
• 10-100× speedup on ROI pathfinding
6. COST COMPUTED ONCE PER ITERATION:
• OLD: ~464 full-graph cost sweeps per iteration
• NEW: 1 cost sweep per iteration
• Eliminated 14 billion operations per iteration
TYPICAL PERFORMANCE (18-layer backplane, 512 nets, 3200 pads):
───────────────────────────────────────────────────────────────────────────────
• Graph build: ~5-10s (with optimizations)
• Portal planning: ~1s (to be implemented)
• Iter 1 (route all 464 nets): ~2-3 minutes
• Iter 2+ (hotset 64-150 nets): ~30-60s each
• Detail pass (8 nets): ~5-10s
• Expected convergence: 15-25 iterations
MEMORY USAGE:
• CSR graph: ~360MB (30M edges × 12 bytes)
• Via tracking: ~30MB (boolean array)
• Edge accounting: ~120MB (3 float32 arrays)
• Net tracking: ~50MB (dicts)
• Total: ~600MB for 18-layer board
═══════════════════════════════════════════════════════════════════════════════
FILE ORGANIZATION
═══════════════════════════════════════════════════════════════════════════════
CLASSES (in order):
1. PathFinderConfig (line ~380): Configuration dataclass
2. CSRGraph (line ~430): Compressed sparse row graph with memory-efficient construction
3. EdgeAccountant (line ~490): Edge usage/cost accounting
4. Lattice3D (line ~550): 3D grid geometry with H/V discipline
5. ROIExtractor (line ~720): Region-of-interest extraction
6. SimpleDijkstra (line ~780): Heap-based O(E log V) shortest path solver
7. PathFinderRouter (line ~860): Main routing engine
KEY METHODS:
• initialize_graph(): Build lattice, graph, accounting structures
• route_multiple_nets(): Main entry, calls negotiation
• _pathfinder_negotiation(): Core PathFinder (30 iteration limit)
• _route_all(): Route hotset nets with ROI-based Dijkstra
• _build_hotset(): Identify nets touching overused edges (adaptive)
• _rebuild_usage_from_committed_nets(): Clean accounting
• _apply_portal_discount(): Reduce via cost at terminals
PORTAL METHODS (TO BE IMPLEMENTED):
• _plan_portal_for_pad(): Choose escape point 1.2-5mm from pad
• _get_portal_seeds(): Multi-layer entry points with discounted costs
• _route_with_portals(): Multi-source/multi-sink Dijkstra
• _emit_portal_geometry(): Vertical escape stubs + trimmed via stacks
• _retarget_failed_portals(): Adjust portals when nets fail repeatedly
• _gpu_roi_near_far_sssp_with_metrics(): GPU shortest path solver
• emit_geometry(): Converts paths to KiCad tracks/vias
═══════════════════════════════════════════════════════════════════════════════
Environment variables:
- SEQUENTIAL_ALL=1: Force sequential routing (cost update after every net)
- USE_GPU=1: Enable GPU acceleration
- INCREMENTAL_COST_UPDATE=1: Only update costs for edges that changed
- ORTHO_CPU_ONLY=1: Force CPU-only mode (no GPU)
"""
# Standard library
import logging
import time
import random
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Set
from collections import defaultdict
# Third-party
import numpy as np
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
cp = np # Fallback to numpy if cupy not available
CUPY_AVAILABLE = False
# Local config
from .pathfinder.config import PAD_CLEARANCE_MM
from .pad_escape_planner import PadEscapePlanner, Portal
from .board_analyzer import analyze_board_characteristics, BoardCharacteristics
from .parameter_derivation import derive_routing_parameters, apply_derived_parameters, DerivedRoutingParameters
from .pathfinder.via_kernels import ViaKernelManager, convert_via_metadata_to_gpu, ensure_gpu_array
# Optional GPU
try:
import cupy as cp
GPU_AVAILABLE = True
except ImportError:
cp = None
GPU_AVAILABLE = False
# Local imports
from ...domain.models.board import Board
from .pathfinder.kicad_geometry import KiCadGeometry
# GPU pathfinding
try:
from .pathfinder.cuda_dijkstra import CUDADijkstra
CUDA_DIJKSTRA_AVAILABLE = True
except ImportError:
CUDADijkstra = None
CUDA_DIJKSTRA_AVAILABLE = False
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════════════
# DATA STRUCTURES
# ═══════════════════════════════════════════════════════════════════════════════
class GeometryPayload:
"""Wrapper for geometry with attribute access"""
def __init__(self, tracks, vias):
self.tracks = tracks
self.vias = vias
class GPUConfig:
"""GPU pathfinding algorithm configuration"""
GPU_MODE = True # Enable GPU acceleration (set to False for CPU-only)
DEBUG_INVARIANTS = True
USE_PERSISTENT_KERNEL = False # DISABLED: Hangs on cooperative kernel launch; using wavefront with atomic keys instead
USE_GPU_COMPACTION = True
USE_DELTA_STEPPING = False # DISABLED: Causes OOM from bucket allocation on large graphs
DELTA_VALUE = 0.5 # Bucket width in mm (0.5mm ≈ 1.25 × 0.4mm grid pitch)
# Recommended delta values:
# - 0.4: Same as grid pitch (many buckets, high precision)
# - 0.5: 1.25× grid pitch (good balance) ← DEFAULT
# - 0.8: 2× grid pitch (fewer buckets, faster but less precise)
# - 1.6: 4× grid pitch (degenerates toward Dijkstra)
class PathFinderConfig:
"""PathFinder algorithm parameters - TUNED FOR FAST CONVERGENCE
Core convergence mechanism: Pathfinder uses negotiated congestion routing where
edge costs increase based on usage (present) and history. The cost function is:
total_cost = base + pres_fac*overuse + hist_gain*history
Key parameters for tuning:
- pres_fac: Present congestion penalty (increases each iteration)
- hist_gain: Historical congestion penalty (permanent after each iteration)
- via_cost: Vertical movement penalty (lower = more layer exploration)
WARNING: Cost function modifications are HIGH-RISK. Small changes can cause
20%+ convergence regression. Prefer infrastructure improvements over tuning.
"""
max_iterations: int = 40 # Extended to give convergence more time
# CONVERGENCE SCHEDULE (BALANCED BASED ON DIAGNOSTICS):
pres_fac_init: float = 1.0 # Start gentle (iteration 1)
pres_fac_mult: float = 1.10 # Gentler exponential to keep history competitive (was 1.15)
pres_fac_max: float = 64.0 # Allow higher pressure to resolve late-stage congestion (was 8.0)
hist_gain: float = 0.2 # Lowered for raw present (was 0.8 with present_ema)
# CRITICAL: Length vs Completion Trade-off
base_cost_weight: float = 0.3 # Weight for path length penalty (1.0=optimize length, 0.01=optimize completion)
# Setting this to 0.01-0.1 makes router prefer completion over short paths,
# enabling use of empty vertical channels. Lower values = more detours, higher completion.
grid_pitch: float = 0.4
via_cost: float = 0.7 # Cheaper vias to encourage layer hopping and redistribute load (was 1.0)
portal_discount: float = 0.4 # 60% discount on first escape via from terminals
span_alpha: float = 0.15 # Span penalty: cost *= (1 + alpha*(span-1))
# Iteration 1 policy: always-connect mode for maximum connectivity
iter1_always_connect: bool = True # Use soft costs in iteration 1 instead of hard blocks
# Portal escape configuration
portal_enabled: bool = True
portal_delta_min: int = 3 # Min vertical offset (1.2mm @ 0.4mm pitch)
portal_delta_max: int = 12 # Max vertical offset (4.8mm)
portal_delta_pref: int = 6 # Preferred offset (2.4mm)
portal_x_snap_max: float = 0.5 # Max x-snap in steps (½ pitch)
portal_via_discount: float = 0.15 # Escape via multiplier (85% discount)
portal_retarget_patience: int = 3 # Iters before retargeting
stagnation_patience: int = 5
use_gpu: bool = True # GPU algorithm fixed, validation will catch ROI construction issues
batch_size: int = 32
layer_count: int = 6
strict_drc: bool = False # Legacy compatibility
mode: str = "near_far"
roi_parallel: bool = False
per_net_budget_s: float = 5.0
max_roi_nodes: int = 750000 # Increased from 500K to 750K to accommodate large inter-bank nets
delta_multiplier: float = 4.0
adaptive_delta: bool = True
strict_capacity: bool = True
reroute_only_offenders: bool = True
layer_shortfall_percentile: float = 95.0
layer_shortfall_cap: int = 16
enable_profiling: bool = False
enable_instrumentation: bool = False
strict_overuse_block: bool = False
hist_cost_weight: float = 10.0 # Boost history weight to compete with present (was 2.0)
log_iteration_details: bool = False
acc_fac: float = 0.0
phase_block_after: int = 2
congestion_multiplier: float = 1.0
max_search_nodes: int = 2000000
layer_names: List[str] = field(default_factory=lambda: ['F.Cu', 'In1.Cu', 'In2.Cu', 'In3.Cu', 'In4.Cu', 'B.Cu'])
hotset_cap: int = 512 # Allow all nets to be rerouted if needed (was 150, bottleneck!)
allowed_via_spans: Optional[Set[Tuple[int, int]]] = None # None = all layer pairs allowed (blind/buried)
# Legacy constants
DEFAULT_GRID_PITCH = 0.4
GRID_PITCH = 0.4
LAYER_COUNT = 6
# ═══════════════════════════════════════════════════════════════════════════════
# CSR GRAPH
# ═══════════════════════════════════════════════════════════════════════════════
class CSRGraph:
"""Compressed Sparse Row graph"""
def __init__(self, use_gpu=False, edge_capacity=None):
self.use_gpu = use_gpu and GPU_AVAILABLE
self.xp = cp if self.use_gpu else np
self.indptr = None
self.indices = None
self.base_costs = None
# Pre-allocate numpy array if capacity known (memory efficient)
if edge_capacity:
self._edge_array = np.zeros(edge_capacity, dtype=[('src', 'i4'), ('dst', 'i4'), ('cost', 'f4')])
self._edge_idx = 0
self._use_array = True
else:
self._edges = []
self._use_array = False
def add_edge(self, u: int, v: int, cost: float):
"""Add directed edge"""
if self._use_array:
self._edge_array[self._edge_idx] = (u, v, cost)
self._edge_idx += 1
else:
self._edges.append((u, v, cost))
def finalize(self, num_nodes: int, num_layers: int = 0):
"""Build CSR from edge list (memory-efficient)"""
import time
start = time.time()
if self._use_array:
# Already in numpy array (pre-allocated)
E = self._edge_idx
edge_array = self._edge_array[:E] # Trim to actual size
logger.info(f"Finalizing CSR: {E:,} edges from pre-allocated array")
else:
if not self._edges:
raise ValueError("No edges")
E = len(self._edges)
# Convert to numpy array for memory-efficient sorting
logger.info(f"Converting {E:,} edges to numpy array...")
edge_array = np.array(self._edges, dtype=[('src', 'i4'), ('dst', 'i4'), ('cost', 'f4')])
# Free memory immediately
self._edges.clear()
# Sort by source node - GPU accelerated if available!
logger.info(f"Sorting {E:,} edges by source node...")
sort_start = time.time()
# OPTIMIZATION: Use GPU sort if available (8-10× faster for large arrays)
if self.use_gpu and GPU_AVAILABLE:
try:
logger.info(f"[GPU-SORT] Using CuPy GPU radix sort (expected ~3-5s for 54M edges)")
# Extract 'src' field as contiguous array (CuPy doesn't support structured arrays)
src_keys = edge_array['src'].copy()
# Transfer just the sort keys to GPU
src_keys_gpu = cp.asarray(src_keys)
# GPU radix sort to get indices (much faster than CPU quicksort/mergesort)
sorted_idx = cp.argsort(src_keys_gpu, kind='stable')
# Transfer indices back to CPU
sorted_idx_cpu = sorted_idx.get()
# Reorder the structured array using GPU-computed indices
edge_array = edge_array[sorted_idx_cpu]
sort_time = time.time() - sort_start
logger.info(f"[GPU-SORT] GPU sort completed in {sort_time:.1f} seconds ({E/sort_time/1e6:.1f}M edges/sec)")
except Exception as e:
logger.warning(f"[GPU-SORT] GPU sort failed: {e}, falling back to CPU")
# CPU fallback
edge_array.sort(order='src', kind='mergesort')
sort_time = time.time() - sort_start
logger.info(f"[CPU-SORT] CPU sort completed in {sort_time:.1f} seconds")
else:
# CPU sort (stable mergesort)
edge_array.sort(order='src', kind='mergesort')
sort_time = time.time() - sort_start
logger.info(f"[CPU-SORT] Sort completed in {sort_time:.1f} seconds")
# Extract components
indices = edge_array['dst'].astype(np.int32)
costs = edge_array['cost'].astype(np.float32)
indptr = np.zeros(num_nodes + 1, dtype=np.int32)
# Free edge array memory
if self._use_array:
del self._edge_array
# Build indptr
curr_src = -1
for i, u in enumerate(edge_array['src']):
while curr_src < u:
curr_src += 1
indptr[curr_src] = i
while curr_src < num_nodes:
curr_src += 1
indptr[curr_src] = E
# Build edge-to-layer mapping for layer balancing (vectorized, one-time cost)
# CRITICAL: Must do this BEFORE transferring to GPU, while indptr is still on CPU
if num_layers > 0:
plane_size = num_nodes // num_layers
# For each edge, determine its layer from source node
# Vectorized approach: build array of source nodes for each edge
edge_sources = np.zeros(E, dtype=np.int32)
edge_targets = np.zeros(E, dtype=np.int32)
for u in range(num_nodes):
start, end = indptr[u], indptr[u+1]
if end > start:
edge_sources[start:end] = u
edge_targets[start:end] = indices[start:end]
# Compute layer for each edge: layer = source_node // plane_size
self.edge_layer = (edge_sources // plane_size).astype(np.uint8)
# Compute edge_kind: 0 = horizontal/vertical (same layer), 1 = via (different layers)
source_layers = edge_sources // plane_size
target_layers = edge_targets // plane_size
self.edge_kind = (source_layers != target_layers).astype(np.uint8)
via_count = int(np.sum(self.edge_kind))
horiz_vert_count = E - via_count
logger.info(f"[LAYER-MAP] Built edge→layer mapping: {E} edges, {num_layers} layers")
logger.info(f"[EDGE-KIND] Horizontal/Vertical={horiz_vert_count}, Via={via_count}")
else:
self.edge_layer = None
self.edge_kind = None
if self.use_gpu:
self.indptr = cp.asarray(indptr)
self.indices = cp.asarray(indices)
self.base_costs = cp.asarray(costs)
# Transfer edge_layer and edge_kind to GPU if they exist
if self.edge_layer is not None:
self.edge_layer_gpu = cp.asarray(self.edge_layer)
if self.edge_kind is not None:
self.edge_kind_gpu = cp.asarray(self.edge_kind)
else:
self.indptr = indptr
self.indices = indices
self.base_costs = costs
self._edges = []
logger.info(f"CSR: {num_nodes} nodes, {E} edges")
# ═══════════════════════════════════════════════════════════════════════════════
# EDGE ACCOUNTING
# ═══════════════════════════════════════════════════════════════════════════════
class EdgeAccountant:
"""Edge usage tracking"""
def __init__(self, num_edges: int, use_gpu=False):
self.E = num_edges
self.use_gpu = use_gpu and GPU_AVAILABLE
self.xp = cp if self.use_gpu else np
self.canonical: Dict[int, int] = {}
self.present = self.xp.zeros(num_edges, dtype=self.xp.float32)
self.present_ema = self.xp.zeros(num_edges, dtype=self.xp.float32) # Smoothed present for stable convergence
self.history = self.xp.zeros(num_edges, dtype=self.xp.float32)
self.capacity = self.xp.ones(num_edges, dtype=self.xp.float32)
self.total_cost = None
def refresh_from_canonical(self):
"""Rebuild present"""
self.present.fill(0)
for idx, count in self.canonical.items():
if 0 <= idx < self.E:
self.present[idx] = float(count)
def commit_path(self, edge_indices: List[int]):
"""Add path and keep present in sync"""
for idx in edge_indices:
self.canonical[idx] = self.canonical.get(idx, 0) + 1
# Keep present in sync during iteration
self.present[idx] = self.present[idx] + 1
def clear_path(self, edge_indices: List[int]):
"""Remove path and keep present in sync"""
for idx in edge_indices:
if idx in self.canonical:
self.canonical[idx] -= 1
if self.canonical[idx] <= 0:
del self.canonical[idx]
# Reflect in present
self.present[idx] = self.xp.maximum(0, self.present[idx] - 1)
def compute_overuse(self, router_instance=None) -> Tuple[int, int]:
"""
Compute total overuse including edge AND via spatial overuse.
Args:
router_instance: Optional PathFinderRouter instance for via spatial checks
Returns:
(total_overuse_sum, edge_overuse_count)
"""
# Edge overuse (existing)
usage = self.present.get() if self.use_gpu else self.present
cap = self.capacity.get() if self.use_gpu else self.capacity
edge_over = np.maximum(0, usage - cap)
edge_over_sum = int(edge_over.sum())
edge_over_count = int((edge_over > 0).sum())
# Via spatial overuse (NEW)
via_col_over_sum = 0
via_seg_over_sum = 0
if router_instance is not None:
# Check via column overuse
if hasattr(router_instance, 'via_col_use') and hasattr(router_instance, 'via_col_cap'):
via_col_over = np.maximum(0, router_instance.via_col_use - router_instance.via_col_cap)
via_col_over_sum = int(via_col_over.sum())
# Check via segment overuse
if hasattr(router_instance, 'via_seg_use') and hasattr(router_instance, 'via_seg_cap'):
via_seg_over = np.maximum(0, router_instance.via_seg_use - router_instance.via_seg_cap)
via_seg_over_sum = int(via_seg_over.sum())
total_over = edge_over_sum + via_col_over_sum + via_seg_over_sum
# Log via violations if present (helps with debugging)
if via_col_over_sum > 0 or via_seg_over_sum > 0:
logger.info(f"[OVERUSE] edge={edge_over_sum} via_col={via_col_over_sum} via_seg={via_seg_over_sum} total={total_over}")
return (total_over, edge_over_count)
def verify_present_matches_canonical(self) -> bool:
"""Sanity check: verify present usage matches canonical store"""
recomputed = self.xp.zeros(self.E, dtype=self.xp.float32)
for idx, count in self.canonical.items():
if 0 <= idx < self.E:
recomputed[idx] = float(count)
if self.use_gpu:
present_cpu = self.present.get()
recomputed_cpu = recomputed.get()
else:
present_cpu = self.present
recomputed_cpu = recomputed
mismatch = np.sum(np.abs(present_cpu - recomputed_cpu))
if mismatch > 0.01:
logger.error(f"[ACCOUNTING] Present/canonical mismatch: {mismatch:.2f}")
return False
return True
def update_history(self, gain: float, base_costs=None, history_cap_multiplier=10.0, decay_factor=0.98, use_raw_present=False):
"""
Update history with:
- Gentle decay: history *= 0.98 before adding increment (decay_factor param)
- Clamping: increment capped at history_cap = 10 * base_cost
- Uses present_ema (smoothed) by default, or raw present if use_raw_present=True
"""
import logging
import sys
logger = logging.getLogger(__name__)
# DIAGNOSTIC: Log what's actually happening
if not hasattr(self, '_hist_update_count'):
self._hist_update_count = 0
self._hist_update_count += 1
# Always log first 5 calls
if self._hist_update_count <= 5:
# Before update
hist_before_max = float(self.history.max()) if self.history.size > 0 else 0.0
logger.debug(f"[UPDATE-HISTORY CALLED] Call #{self._hist_update_count} START gain={gain:.3f}")
# Apply gentle decay before adding new history
self.history *= decay_factor
# Use smoothed present_ema by default, or raw present if requested
present_for_history = self.present if use_raw_present else self.present_ema
over = self.xp.maximum(0, present_for_history - self.capacity)
increment = gain * over
# Clamp per-edge history increment
if base_costs is not None:
history_cap = history_cap_multiplier * base_costs
increment_before_cap = increment.copy()
increment = self.xp.minimum(increment, history_cap)
if self._hist_update_count <= 5:
# Check how many edges are being capped
capped_mask = increment_before_cap > history_cap
capped_count = int(self.xp.sum(capped_mask))
if capped_count > 0:
logger.debug(f" [HIST-CAP] {capped_count} edges capped! avg_cap={float(history_cap.mean()):.3f}")
self.history += increment
if self._hist_update_count <= 5:
# After update
hist_after_max = float(self.history.max())
incr_max = float(increment.max())
over_max = float(over.max())
over_mean = float(over[over > 0].mean()) if (over > 0).any() else 0.0
pres_max = float(present_for_history.max())
pres_ema_max = float(self.present_ema.max())
pres_raw_max = float(self.present.max())
logger.debug(f"[UPDATE-HISTORY #{self._hist_update_count}]")
logger.debug(f" gain={gain:.3f} decay={decay_factor:.3f} cap_mult={history_cap_multiplier:.1f}")
logger.debug(f" use_raw_present={use_raw_present}")
logger.debug(f" present_raw_max={pres_raw_max:.1f} present_ema_max={pres_ema_max:.1f}")
logger.debug(f" overuse: max={over_max:.2f} mean={over_mean:.3f}")
logger.debug(f" increment: max={incr_max:.3f}")
logger.debug(f" history: before={hist_before_max:.3f} → after={hist_after_max:.3f}")
if base_costs is not None:
logger.debug(f" base_cost: mean={float(base_costs.mean()):.4f} max={float(base_costs.max()):.4f}")
def update_present_ema(self, beta: float = 0.60):
"""
Update exponential moving average of present usage for stability.
Smooths bang-bang oscillations in overuse detection.
Args:
beta: EMA smoothing factor (higher = more smoothing, typically 0.6)
"""
self.present_ema = beta * self.present + (1.0 - beta) * self.present_ema
def update_costs(
self,
base_costs,
pres_fac: float,
hist_weight: float = 1.0,
add_jitter: bool = True,
via_cost_multiplier: float = 1.0,
base_cost_weight: float = 0.01,
*,
edge_layer=None, # np/cp array [E] with source layer per edge
layer_bias_per_layer=None, # np/cp array [L] with multiplicative bias
edge_kind=None # np/cp array [E] with 0=horiz/vert, 1=via
):
"""
total = (base * via_multiplier * base_weight * layer_bias) + pres_fac*overuse + hist_weight*history + epsilon_jitter
Jitter breaks ties and prevents oscillation in equal-cost paths.
Via cost multiplier enables late-stage via annealing.
Base cost weight controls length vs completion trade-off (lower = prefer completion over short paths).
Layer bias: applied only to horizontal/vertical edges (not vias) to rebalance layer usage.
Uses present_ema (smoothed) instead of raw present to prevent bang-bang oscillation.
"""
xp = self.xp
# Use smoothed present (EMA) to prevent oscillation - critical for convergence
over = xp.maximum(0, self.present_ema - self.capacity)
# Vectorized per-edge layer bias (single gather operation)
# Only apply to horizontal/vertical edges (edge_kind==0), not vias (edge_kind==1)
per_edge_bias = 1.0
if (edge_layer is not None) and (layer_bias_per_layer is not None) and (edge_kind is not None):
if self.use_gpu:
# Ensure arrays are on GPU
layer_bias = cp.asarray(layer_bias_per_layer) if not hasattr(layer_bias_per_layer, "get") else layer_bias_per_layer
edge_layer_arr = cp.asarray(edge_layer) if not hasattr(edge_layer, "get") else edge_layer
edge_kind_arr = cp.asarray(edge_kind) if not hasattr(edge_kind, "get") else edge_kind
else:
# NumPy arrays
layer_bias = layer_bias_per_layer
edge_layer_arr = edge_layer
edge_kind_arr = edge_kind
# Gather bias for each edge's layer
bias_factors = layer_bias[edge_layer_arr]
# Apply bias only to horizontal/vertical edges (edge_kind==0), set via edges to 1.0
per_edge_bias = xp.where(edge_kind_arr == 0, bias_factors, 1.0)
# Apply both via multiplier, base weight, and layer bias to base costs
# base_cost_weight < 1.0 makes router prefer completion over short paths
adjusted_base = base_costs * via_cost_multiplier * base_cost_weight * per_edge_bias
# Apply INVERTED layer bias to present term to directly pressure hot layers
# Base term uses per_edge_bias (hot layers cheaper for length optimization)
# Present term uses INVERSE (hot layers more expensive for congestion avoidance)
# For vias, keep bias at 1.0 (no layer-specific present penalty)
if (edge_layer is not None) and (layer_bias_per_layer is not None) and (edge_kind is not None):
# Invert bias for present: if bias=0.9 (cheap base), use 1/0.9=1.11 (expensive present)
# Clamp to prevent extreme values
inverted_bias = xp.where(per_edge_bias != 0, 1.0 / xp.maximum(per_edge_bias, 0.5), 1.0)
inverted_bias = xp.where(edge_kind_arr == 0, inverted_bias, 1.0) # Only H/V edges
present_term = (pres_fac * inverted_bias) * over
else:
present_term = pres_fac * over
self.total_cost = adjusted_base + present_term + hist_weight * self.history
# Add per-edge epsilon jitter to break ties (stable across iterations)
if add_jitter:
E = len(self.total_cost)
# Use edge index modulo prime for deterministic jitter
jitter = xp.arange(E, dtype=xp.float32) % 9973
jitter = jitter * 1e-6 # tiny epsilon
self.total_cost += jitter
def update_present_cost_only(self, pres_fac: float, base_costs):
"""
FAST per-net cost update: Only recomputes present cost term (not history).
Called after EACH net routes to update costs for the NEXT net.
History cost only updates at END of iteration.
This is critical for PathFinder convergence!
Formula: total_cost = base_cost + pres_fac * overuse + history
"""
# Recompute overuse with current occupancy (using smoothed present_ema for consistency)
over = self.xp.maximum(0, self.present_ema - self.capacity)
# Update total_cost: base + present_penalty + history
# Don't modify history here - only update present penalty based on current occupancy
self.total_cost = base_costs + pres_fac * over + self.history
# Log first few updates for debugging
if not hasattr(self, '_pernet_update_count'):
self._pernet_update_count = 0
self._pernet_update_count += 1
if self._pernet_update_count <= 3:
overuse_count = int(self.xp.sum(over > 0))
logger.info(f"[PER-NET-UPDATE #{self._pernet_update_count}] Overuse edges: {overuse_count}, pres_fac={pres_fac:.2f}")
# ═══════════════════════════════════════════════════════════════════════════════
# 3D LATTICE
# ═══════════════════════════════════════════════════════════════════════════════
class Lattice3D:
"""3D routing lattice with H/V discipline"""
def __init__(self, bounds: Tuple[float, float, float, float], pitch: float, layers: int):
self.bounds = bounds
self.pitch = pitch
self.layers = layers
self.geom = KiCadGeometry(bounds, pitch, layer_count=layers)
self.x_steps = self.geom.x_steps
self.y_steps = self.geom.y_steps
self.num_nodes = self.x_steps * self.y_steps * layers
self.layer_dir = self._assign_directions()
logger.info(f"Lattice: {self.x_steps}×{self.y_steps}×{layers} = {self.num_nodes:,} nodes")
def _assign_directions(self) -> List[str]:
"""F.Cu=V (vertical escape routing), internal layers alternate H/V"""
dirs = []
for z in range(self.layers):
if z == 0:
# F.Cu has vertical routing for escape stubs
dirs.append('v')
else:
# Internal layers alternate: In1.Cu=H, In2.Cu=V, In3.Cu=H, etc.
dirs.append('h' if z % 2 == 1 else 'v')
return dirs
def get_legal_axis(self, layer: int) -> str:
"""Return 'h' or 'v' for which axis this layer allows."""
if layer >= len(self.layer_dir):
return 'h' if layer % 2 == 1 else 'v'
return self.layer_dir[layer]
def is_legal_planar_edge(self, from_x: int, from_y: int, from_layer: int,
to_x: int, to_y: int, to_layer: int) -> bool:
"""Check if planar edge follows H/V discipline."""
if from_layer != to_layer:
return True # Vias always legal (checked separately)
dx = abs(to_x - from_x)
dy = abs(to_y - from_y)
# Must be adjacent (Manhattan distance 1)
if dx + dy != 1:
return False
# Check layer direction
direction = self.get_legal_axis(from_layer)
if direction == 'h':
return dy == 0 and dx == 1 # Horizontal: only ±X
else:
return dx == 0 and dy == 1 # Vertical: only ±Y
def get_legal_via_pairs(self, layer_count: int) -> set:
"""
Return set of legal (from_layer, to_layer) via pairs.
CRITICAL: Must include F.Cu (layer 0) → internal layer transitions!
The escape planner creates stubs on F.Cu, and PathFinder must be able
to create vias from F.Cu to whatever internal layer it chooses.
"""
# Check config for via policy (default to FULL blind/buried)
allow_any = True # ALWAYS allow full blind/buried for convergence
# Internal routing layers (exclude B.Cu which is layer_count-1)
routing_layers = list(range(1, layer_count - 1))
logger.info(f"[VIA-PAIRS] layer_count={layer_count}, routing_layers={len(routing_layers)}, "
f"allow_any={allow_any}")
if allow_any:
# FULL BLIND/BURIED: Any routing layer to any other routing layer
legal_pairs = set()
for z1 in routing_layers:
for z2 in routing_layers:
if z1 != z2:
legal_pairs.add((z1, z2))
# CRITICAL: Add F.Cu (layer 0) → internal layer transitions
# This allows PathFinder to create escape vias from F.Cu to any internal layer
for z in routing_layers:
legal_pairs.add((0, z)) # F.Cu → internal layer
legal_pairs.add((z, 0)) # internal layer → F.Cu (bidirectional)
logger.info(f"[VIA-PAIRS] Generated {len(legal_pairs)} pairs: {len(routing_layers)}×{len(routing_layers)} internal + {len(routing_layers)}×2 F.Cu transitions")
return legal_pairs
# FALLBACK: Adjacent routing layers only
legal_pairs = set()
for i in range(len(routing_layers) - 1):
z1, z2 = routing_layers[i], routing_layers[i+1]
legal_pairs.add((z1, z2))
legal_pairs.add((z2, z1))
logger.info(f"[VIA-PAIRS] Generated {len(legal_pairs)} adjacent-only pairs (fallback mode)")
return legal_pairs
def node_idx(self, x: int, y: int, z: int) -> int:
"""(x,y,z) → flat"""
return self.geom.node_index(x, y, z)
def idx_to_coord(self, idx: int) -> Tuple[int, int, int]:
"""flat → (x,y,z)"""
return self.geom.index_to_coords(idx)
def world_to_lattice(self, x_mm: float, y_mm: float) -> Tuple[int, int]:
"""mm → lattice"""
return self.geom.world_to_lattice(x_mm, y_mm)
def build_graph(self, via_cost: float, allowed_via_spans: Optional[Set[Tuple[int, int]]] = None, use_gpu=False) -> CSRGraph:
"""
Build graph with H/V constraints and flexible via spans.
Args:
via_cost: Base cost for via transitions
allowed_via_spans: Set of (from_layer, to_layer) pairs allowed for vias.
If None, all layer pairs are allowed (full blind/buried support).
Layers are indexed 0..N-1.
use_gpu: Enable GPU acceleration
"""
# Count edges to pre-allocate array (avoids MemoryError with 30M edges)
edge_count = 0
# Count H/V edges (exclude outer layers 0 and self.layers-1)
for z in range(1, self.layers - 1):
if self.layer_dir[z] == 'h':
edge_count += 2 * self.y_steps * (self.x_steps - 1)
else: # 'v'
edge_count += 2 * self.x_steps * (self.y_steps - 1)
# Count via edges using ACTUAL legal pairs (not parameter guess)
legal_via_pairs_set = self.get_legal_via_pairs(self.layers)
via_edge_count = 2 * self.x_steps * self.y_steps * len(legal_via_pairs_set)
edge_count += via_edge_count
logger.info(f"Pre-allocating for {edge_count:,} edges ({via_edge_count:,} via edges for {len(legal_via_pairs_set)} pairs)")
graph = CSRGraph(use_gpu, edge_capacity=edge_count)
# Build lateral edges (H/V discipline, exclude outer layers 0 and self.layers-1)
for z in range(1, self.layers - 1):
direction = self.layer_dir[z]
if direction == 'h':
for y in range(self.y_steps):
for x in range(self.x_steps - 1):
u = self.node_idx(x, y, z)
v = self.node_idx(x+1, y, z)
# MANHATTAN VALIDATION
if not self.is_legal_planar_edge(x, y, z, x+1, y, z):
logger.error(f"[MANHATTAN-VIOLATION] Illegal H edge on layer {z}: ({x},{y}) → ({x+1},{y})")
continue # Skip illegal edge
graph.add_edge(u, v, self.pitch)
graph.add_edge(v, u, self.pitch)
else: # direction == 'v'
for x in range(self.x_steps):
for y in range(self.y_steps - 1):
u = self.node_idx(x, y, z)
v = self.node_idx(x, y+1, z)
# MANHATTAN VALIDATION
if not self.is_legal_planar_edge(x, y, z, x, y+1, z):
logger.error(f"[MANHATTAN-VIOLATION] Illegal V edge on layer {z}: ({x},{y}) → ({x},{y+1})")
continue # Skip illegal edge
graph.add_edge(u, v, self.pitch)
graph.add_edge(v, u, self.pitch)
# Build via edges using the SAME legal pairs as pre-allocation
via_count = 0
for x in range(self.x_steps):
for y in range(self.y_steps):
for (z_from, z_to) in legal_via_pairs_set:
# Only add if this specific pair is legal
span = abs(z_to - z_from)
span_alpha = 0.15
cost = via_cost * (1.0 + span_alpha * (span - 1))
u = self.node_idx(x, y, z_from)
v = self.node_idx(x, y, z_to)
graph.add_edge(u, v, cost)
graph.add_edge(v, u, cost)
via_count += 2
# LOG what was built
logger.info(f"Vias: {via_count} edges created")
logger.info(f"Via policy: {len(legal_via_pairs_set)} layer pairs (FULL BLIND/BURIED ENABLED!)")
for pair in sorted(list(legal_via_pairs_set))[:10]:
logger.info(f" Legal via: {pair[0]}{pair[1]}")
if len(legal_via_pairs_set) > 20:
logger.info(f" ... and {len(legal_via_pairs_set) - 10} more pairs (showing first 10 only)")
# Finalize the graph before validation (converts edge list to CSR format)
graph.finalize(self.num_nodes, num_layers=self.layers)
# MANHATTAN VALIDATION: Sample 1000 random edges and verify they're legal
edge_count = len(graph.indices) if hasattr(graph, 'indices') else 0
sample_size = min(1000, edge_count)
if sample_size > 0:
logger.info(f"[MANHATTAN-VALIDATION] Sampling {sample_size} edges to verify H/V discipline...")
violations = 0
import numpy as np
# Convert indptr to CPU for validation (if it's on GPU)
indptr_cpu = graph.indptr if isinstance(graph.indptr, np.ndarray) else graph.indptr.get()
for _ in range(sample_size):
# Pick random edge from CSR structure
edge_idx = random.randint(0, edge_count - 1)
# Get source node (find which node this edge belongs to) using binary search
# indptr[u] <= edge_idx < indptr[u+1], so searchsorted gives us u+1
u = int(np.searchsorted(indptr_cpu, edge_idx, side='right')) - 1
# Get target node
v = int(graph.indices[edge_idx]) if isinstance(graph.indices[edge_idx], (int, np.integer)) else int(graph.indices[edge_idx].get())
# Convert to coordinates
x_u, y_u, z_u = self.idx_to_coord(u)
x_v, y_v, z_v = self.idx_to_coord(v)
# Convert to Python ints for set membership testing
z_u, z_v = int(z_u), int(z_v)
# Check if it's a via (different layers)
if z_u != z_v:
# Via edge - check if it's in legal pairs
if (z_u, z_v) not in legal_via_pairs_set and (z_v, z_u) not in legal_via_pairs_set:
logger.error(f"[MANHATTAN-VIOLATION] Illegal via: layer {z_u}{z_v} at ({x_u},{y_u})")
violations += 1
else:
# Planar edge - check H/V discipline
if not self.is_legal_planar_edge(x_u, y_u, z_u, x_v, y_v, z_v):
logger.error(f"[MANHATTAN-VIOLATION] Illegal planar edge on layer {z_u}: ({x_u},{y_u}) → ({x_v},{y_v})")
violations += 1
if violations > 0:
logger.error(f"[MANHATTAN-VALIDATION] Found {violations} illegal edges in graph!")
raise RuntimeError("Graph contains non-Manhattan edges")
else:
logger.info(f"[MANHATTAN-VALIDATION] All {sample_size} sampled edges are legal ✓")
return graph
# ═══════════════════════════════════════════════════════════════════════════════
# ROI EXTRACTION (GPU-Accelerated BFS)
# ═══════════════════════════════════════════════════════════════════════════════
class ROIExtractor:
"""Extract Region of Interest subgraph using GPU-vectorized BFS"""
def __init__(self, graph: CSRGraph, use_gpu: bool = False, lattice=None):
self.graph = graph
self.xp = graph.xp
self.N = len(graph.indptr) - 1
self.lattice = lattice # Need lattice for geometric ROI
def extract_roi_geometric(self, src: int, dst: int, corridor_buffer: int = 30, layer_margin: int = 3, portal_seeds: list = None) -> tuple:
"""
Geometric bounding box ROI extraction for long nets.
Much more efficient than BFS for point-to-point routing over long distances.
Strategy:
1. Calculate 3D bounding box between src and dst
2. Add corridor buffer perpendicular to the main routing direction
3. Limit vertical layers to ±layer_margin from entry/exit layers
Args:
src: Source node index
dst: Destination node index
corridor_buffer: Perpendicular buffer in grid steps (default: 30 steps = 12mm @ 0.4mm pitch)
layer_margin: Vertical layer margin from entry/exit layers (default: ±3 layers)
Returns: (roi_nodes, global_to_roi)
"""
import numpy as np
if not self.lattice:
# Fallback to BFS if no lattice available
logger.warning("Geometric ROI requires lattice, falling back to BFS")
return self.extract_roi_bfs(src, dst, initial_radius=40)
# Get 3D coordinates
src_x, src_y, src_z = self.lattice.idx_to_coord(src)
dst_x, dst_y, dst_z = self.lattice.idx_to_coord(dst)
# Calculate axis-aligned bounding box
min_x = min(src_x, dst_x)
max_x = max(src_x, dst_x)
min_y = min(src_y, dst_y)
max_y = max(src_y, dst_y)
min_z = min(src_z, dst_z)
max_z = max(src_z, dst_z)
# Include portal seeds in bounding box if provided
if portal_seeds:
seed_layers = [self.lattice.idx_to_coord(n)[2] for (n, _) in portal_seeds]
if seed_layers:
min_z = min(min_z, min(seed_layers))
max_z = max(max_z, max(seed_layers))
# Add corridor buffer perpendicular to main direction
min_x = int(max(0, min_x - corridor_buffer))
max_x = int(min(self.lattice.x_steps - 1, max_x + corridor_buffer))
min_y = int(max(0, min_y - corridor_buffer))
max_y = int(min(self.lattice.y_steps - 1, max_y + corridor_buffer))
# Add layer margin (clamp to inner layers only - exclude outer layers 0 and layers-1)
min_z = max(1, min_z - layer_margin)
max_z = min(self.lattice.layers - 2, max_z + layer_margin)
# Generate L-shaped corridor using SET for O(1) deduplication
x_steps = self.lattice.x_steps
y_steps = self.lattice.y_steps
plane_size = x_steps * y_steps
roi_nodes_set = set() # Use set for O(1) lookups
# SYMMETRIC L-CORRIDOR: Include BOTH possible L-paths to avoid directional bias
# L-Path 1: Horizontal first, then vertical (src → (dst.x, src.y) → dst)
# Horizontal segment: from src.x to dst.x (at src.y with buffer)
horiz1_min_y = int(max(0, src_y - corridor_buffer))
horiz1_max_y = int(min(y_steps - 1, src_y + corridor_buffer))
for z in range(min_z, max_z + 1):
for y in range(horiz1_min_y, horiz1_max_y + 1):
for x in range(min_x, max_x + 1):
node_idx = z * plane_size + y * x_steps + x
roi_nodes_set.add(node_idx)
# Vertical segment: from src.y to dst.y (at dst.x with buffer)
vert1_min_x = int(max(0, dst_x - corridor_buffer))
vert1_max_x = int(min(x_steps - 1, dst_x + corridor_buffer))
for z in range(min_z, max_z + 1):
for y in range(min_y, max_y + 1):
for x in range(vert1_min_x, vert1_max_x + 1):
node_idx = z * plane_size + y * x_steps + x
roi_nodes_set.add(node_idx)
# L-Path 2: Vertical first, then horizontal (src → (src.x, dst.y) → dst)
# Vertical segment: from src.y to dst.y (at src.x with buffer)
vert2_min_x = int(max(0, src_x - corridor_buffer))
vert2_max_x = int(min(x_steps - 1, src_x + corridor_buffer))
for z in range(min_z, max_z + 1):
for y in range(min_y, max_y + 1):
for x in range(vert2_min_x, vert2_max_x + 1):
node_idx = z * plane_size + y * x_steps + x
roi_nodes_set.add(node_idx)
# Horizontal segment: from src.x to dst.x (at dst.y with buffer)
horiz2_min_y = int(max(0, dst_y - corridor_buffer))
horiz2_max_y = int(min(y_steps - 1, dst_y + corridor_buffer))
for z in range(min_z, max_z + 1):
for y in range(horiz2_min_y, horiz2_max_y + 1):
for x in range(min_x, max_x + 1):
node_idx = z * plane_size + y * x_steps + x
roi_nodes_set.add(node_idx)
roi_nodes = np.array(list(roi_nodes_set), dtype=np.int32)
logger.info(f"Symmetric L-corridor ROI: {len(roi_nodes):,} nodes (both L-paths included), Z {min_z}-{max_z}")
# CRITICAL: Ensure src, dst, AND all portal seeds are in ROI BEFORE truncation
must_keep_nodes = [src, dst]
if portal_seeds:
# Add all portal seed node IDs to must-keep list
for node_id, _ in portal_seeds:
if node_id not in must_keep_nodes:
must_keep_nodes.append(node_id)
# Add any missing must-keep nodes
for node in must_keep_nodes:
if node not in roi_nodes_set:
roi_nodes_set.add(node)
roi_nodes = np.array(list(roi_nodes_set), dtype=np.int32)
# Cap ROI size if enormous (500K is ~96% of graph, allows access to empty channels)
max_nodes = getattr(self, "max_roi_nodes", 500_000) # Balanced: prevents worst hangs but allows long-distance routing
if roi_nodes.size > max_nodes:
logger.warning(f"Geometric ROI {roi_nodes.size:,} > {max_nodes:,}, truncating to {max_nodes} (keeping {len(must_keep_nodes)} critical nodes)")
# CONNECTIVITY-PRESERVING TRUNCATION: BFS growth from src/dst
# This ensures src's neighbors are included so GPU wavefront can expand
from collections import deque
# Build candidate set and neighbor lookup
cand_set = set(roi_nodes.tolist())
def get_neighbors(node_idx):
"""Get 4-way (+ via) neighbors for this node"""
neighbors = []
x_steps = self.lattice.x_steps
y_steps = self.lattice.y_steps
plane_size = x_steps * y_steps
z, r = divmod(node_idx, plane_size)
y, x = divmod(r, x_steps)
# 4-way horizontal
if x > 0:
neighbors.append(node_idx - 1)
if x + 1 < x_steps:
neighbors.append(node_idx + 1)
if y > 0:
neighbors.append(node_idx - x_steps)
if y + 1 < y_steps:
neighbors.append(node_idx + x_steps)
# Vias (vertical)
if z > 0:
neighbors.append(node_idx - plane_size)
if z + 1 < self.lattice.layers:
neighbors.append(node_idx + plane_size)
return neighbors
# BFS from src and dst alternating (to keep both regions connected)
selected = set(must_keep_nodes)
q_src = deque([src])
q_dst = deque([dst])
# Add portal seeds to their appropriate queues
if portal_seeds:
for node_id, _ in portal_seeds:
if node_id != src and node_id != dst:
# Add to src queue (arbitrary choice)
q_src.append(node_id)
toggle = 0
while len(selected) < max_nodes and (q_src or q_dst):
# Alternate between src and dst fronts for balanced growth
q = q_src if toggle == 0 else q_dst
toggle ^= 1
if not q:
continue
# Process one node from this frontier
u = q.popleft()
for v in get_neighbors(u):
if v in selected:
continue
if v not in cand_set:
continue # Must be in geometric corridor
selected.add(v)
q.append(v)
if len(selected) >= max_nodes:
break
if len(selected) >= max_nodes:
break
roi_nodes = np.array(list(selected), dtype=np.int32)
logger.info(f"After BFS truncation: {len(roi_nodes)} nodes (connectivity-preserving from src/dst) vs {max_nodes} budget")
# DEBUG: Verify src's immediate neighbors are included
# DISABLED: This BFS reachability check is too expensive (4s per net)
# and causes test timeouts. The wavefront will naturally fail if ROI is disconnected.
#
# src_neighbors = set(get_neighbors(src))
# neighbors_in_roi = src_neighbors & selected
# logger.info(f"[BFS-DEBUG] Src {src} has {len(src_neighbors)} neighbors, {len(neighbors_in_roi)} are in BFS-selected ROI")
#
# # ROI REACHABILITY CHECK: Verify dst is reachable from src within truncated ROI
# logger.info(f"[ROI-REACHABILITY] Testing if dst {dst} is reachable from src {src} within truncated ROI...")
#
# # Quick BFS to check connectivity (ignoring costs)
# from collections import deque
# queue = deque([src])
# visited_bfs = {src}
# found_dst = False
# hop_count = 0
# max_hops = 10000 # Safety limit
#
# while queue and hop_count < max_hops:
# u = queue.popleft()
# u = int(u) # CRITICAL: Cast to Python int
# hop_count += 1
#
# if u == dst:
# found_dst = True
# logger.info(f"[ROI-REACHABILITY] ✓ dst {dst} REACHABLE from src {src} in {hop_count} hops")
# break
#
# # Get neighbors from graph - CAST TO INT
# u_start = int(self.graph.indptr[u])
# u_end = int(self.graph.indptr[u + 1])
#
# for e in range(u_start, u_end):
# v = int(self.graph.indices[e]) # CAST neighbor to int
#
# # Only expand within ROI
# if v not in selected:
# continue
#
# if v not in visited_bfs:
# visited_bfs.add(v)
# queue.append(v)
#
# if not found_dst:
# logger.error(f"[ROI-REACHABILITY] ✗ dst {dst} NOT REACHABLE from src {src} within truncated ROI!")
# logger.error(f"[ROI-REACHABILITY] ROI has {len(roi_nodes)} nodes but src/dst are DISCONNECTED")
# logger.error(f"[ROI-REACHABILITY] Need to expand ROI budget or fix truncation logic")
# Build global_to_roi mapping
global_to_roi = np.full(self.N, -1, dtype=np.int32)
global_to_roi[roi_nodes] = np.arange(len(roi_nodes), dtype=np.int32)
# Log statistics
x_span = max_x - min_x + 1
y_span = max_y - min_y + 1
z_span = max_z - min_z + 1
logger.debug(f"Geometric ROI: {len(roi_nodes):,} nodes ({x_span}×{y_span}×{z_span} box, buffer={corridor_buffer}, layer_margin={layer_margin})")
return roi_nodes, global_to_roi
def extract_roi(self, src: int, dst: int, initial_radius: int = 40, stagnation_bonus: float = 0.0, portal_seeds: list = None) -> tuple:
"""BFS ROI extraction with adaptive radius"""
import numpy as np
# Calculate radius based on Manhattan distance (70% from each end = 140% coverage)
if self.lattice:
src_x, src_y, src_z = self.lattice.idx_to_coord(src)
dst_x, dst_y, dst_z = self.lattice.idx_to_coord(dst)
manhattan_dist = abs(dst_x - src_x) + abs(dst_y - src_y)
# Ensure radius covers full path length for wavefront to succeed
radius = max(60, int(manhattan_dist * 0.75 + stagnation_bonus * 2.0))
logger.debug(f"BFS ROI: dist={manhattan_dist}, radius={radius}")
else:
radius = initial_radius
return self.extract_roi_bfs(src, dst, initial_radius=radius, stagnation_bonus=stagnation_bonus, portal_seeds=portal_seeds)
def extract_roi_bfs(self, src: int, dst: int, initial_radius: int = 40, stagnation_bonus: float = 0.0, portal_seeds: list = None) -> tuple:
"""
Bidirectional BFS ROI extraction - expands until both src and dst are covered.
Good for short/medium nets with complex obstacle navigation.
Returns: (roi_nodes, global_to_roi)
"""
import numpy as np
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
N = self.N
seen = np.zeros(N, dtype=np.uint8) # 0=unseen, 1=src-wave, 2=dst-wave, 3=both
q_src = [src]
q_dst = [dst]
seen[src] = 1
seen[dst] = 2
depth = 0
# Apply stagnation bonus: +0.6mm per stagnation mark (grid_pitch=0.4mm → ~1.5 steps)
# CRITICAL: Limit max_depth to prevent full-board expansion
# With radius=60, max_depth=80 gives ~32mm radius (covers most routes)
# Board is 244×227mm, so depth=800 would cover ENTIRE board!
max_depth = min(int(initial_radius + stagnation_bonus * 2.0), 80)
met = False
# Limit ROI size for efficiency - smaller ROIs converge MUCH faster on GPU!
# 50K nodes = ~60×60×12 region (24mm × 24mm × 12 layers @ 0.4mm pitch)
# This is large enough for most routes but small enough for fast wavefront expansion
# Target: <50 iterations instead of 500+ on full graph
max_nodes = getattr(self, "max_roi_nodes", 50_000)
while depth < max_depth and (q_src or q_dst) and not met:
def step(queue, mark):
next_q = []
met_flag = False
for u in queue:
s, e = int(indptr[u]), int(indptr[u+1])
for ei in range(s, e):
v = int(indices[ei])
if seen[v] == 0:
seen[v] = mark
next_q.append(v)
elif seen[v] != mark:
# Visited by the other wave → mark as both
seen[v] = 3
met_flag = True
return next_q, met_flag
q_src, met_src = step(q_src, 1)
if met_src:
met = True
if not met:
q_dst, met_dst = step(q_dst, 2)
if met_dst:
met = True
depth += 1
# Early stop if ROI exceeds max size (will be truncated anyway)
if (seen > 0).sum() > max_nodes * 1.5:
logger.debug(f"BFS early stop at depth {depth}: ROI size {(seen > 0).sum():,} exceeds {max_nodes * 1.5:,.0f}")
break
roi_mask = seen > 0
roi_nodes = np.where(roi_mask)[0]
# CRITICAL: Ensure src, dst, AND all portal seeds are in ROI BEFORE truncation
must_keep_nodes = [src, dst]
if portal_seeds:
for node_id, _ in portal_seeds:
if node_id not in must_keep_nodes:
must_keep_nodes.append(node_id)
# Add any missing must-keep nodes
missing_nodes = [n for n in must_keep_nodes if n not in roi_nodes]
if missing_nodes:
roi_nodes = np.append(roi_nodes, missing_nodes)
# Truncate if needed (max_nodes defined above)
if roi_nodes.size > max_nodes:
logger.warning(f"BFS ROI {roi_nodes.size:,} > {max_nodes:,}, truncating (preserving {len(must_keep_nodes)} critical nodes)")
# Move ALL must-keep nodes to beginning
kept_indices = []
for i, node in enumerate(roi_nodes):
if node in must_keep_nodes:
kept_indices.append(i)
# Swap to front
for swap_pos, kept_idx in enumerate(kept_indices):
if kept_idx >= max_nodes:
roi_nodes[kept_idx], roi_nodes[swap_pos] = roi_nodes[swap_pos], roi_nodes[kept_idx]
roi_nodes = roi_nodes[:max_nodes]
logger.info(f"Verified {sum(1 for n in must_keep_nodes if n in roi_nodes)}/{len(must_keep_nodes)} critical nodes preserved")
global_to_roi = np.full(N, -1, dtype=np.int32)
global_to_roi[roi_nodes] = np.arange(len(roi_nodes), dtype=np.int32)
return roi_nodes, global_to_roi
def _check_roi_connectivity(self, src: int, dst: int, roi_nodes, roi_indptr, roi_indices) -> bool:
"""Fast BFS to check if src can reach dst in ROI (~1ms check vs 50-100ms failed GPU routing)."""
from collections import deque
import numpy as np
# Convert to set for O(1) lookup
if isinstance(roi_nodes, np.ndarray):
roi_set = set(roi_nodes.tolist() if hasattr(roi_nodes, 'tolist') else roi_nodes)
elif hasattr(roi_nodes, 'get'): # CuPy
roi_set = set(roi_nodes.get().tolist())
else:
roi_set = set(roi_nodes) if not isinstance(roi_nodes, set) else roi_nodes
if src not in roi_set or dst not in roi_set:
return False
# Convert to NumPy if CuPy
if hasattr(roi_indptr, 'get'):
roi_indptr = roi_indptr.get()
if hasattr(roi_indices, 'get'):
roi_indices = roi_indices.get()
# BFS from src to dst
visited = {src}
queue = deque([src])
nodes_explored = 0
while queue:
u = queue.popleft()
nodes_explored += 1
if u == dst:
return True
# Early termination
if nodes_explored > len(roi_set) * 0.5:
break
for ei in range(int(roi_indptr[u]), int(roi_indptr[u + 1])):
v = int(roi_indices[ei])
if v in roi_set and v not in visited:
visited.add(v)
queue.append(v)
return False
# ═══════════════════════════════════════════════════════════════════════════════
# DIJKSTRA WITH ROI
# ═══════════════════════════════════════════════════════════════════════════════
class SimpleDijkstra:
"""Dijkstra SSSP with ROI support (CPU only; copies from GPU if needed)"""
def __init__(self, graph: CSRGraph, lattice=None):
# Copy CSR to CPU if they live on GPU
self.indptr = graph.indptr.get() if hasattr(graph.indptr, "get") else graph.indptr
self.indices = graph.indices.get() if hasattr(graph.indices, "get") else graph.indices
self.N = len(self.indptr) - 1
# Store plane_size for layer calculation
self.plane_size = lattice.x_steps * lattice.y_steps if lattice else None
# Initialize path counters
self._gpu_path_count = 0
self._cpu_path_count = 0
def find_path_roi(self, src: int, dst: int, costs, roi_nodes, global_to_roi) -> Optional[List[int]]:
"""Find shortest path within ROI subgraph using heap-based Dijkstra (O(E log V))"""
import numpy as np
import heapq
# Use GPU if ROI is large enough and GPU solver available
roi_size = len(roi_nodes) if hasattr(roi_nodes, '__len__') else roi_nodes.shape[0]
gpu_threshold = getattr(getattr(self, 'config', None), 'gpu_roi_min_nodes', 1000)
use_gpu = hasattr(self, 'gpu_solver') and self.gpu_solver and roi_size > gpu_threshold
if use_gpu:
logger.info(f"[GPU] Using GPU pathfinding for ROI size={roi_size} (threshold={gpu_threshold})")
try:
path = self.gpu_solver.find_path_roi_gpu(src, dst, costs, roi_nodes, global_to_roi)
if path:
self._gpu_path_count += 1
return path
# If GPU returned None, fall through to CPU
except Exception as e:
logger.warning(f"[GPU] Pathfinding failed: {e}, using CPU")
# Fall through to CPU
# Track CPU pathfinding usage
self._cpu_path_count += 1
# Ensure arrays are CPU NumPy
costs = costs.get() if hasattr(costs, "get") else costs
roi_nodes = roi_nodes.get() if hasattr(roi_nodes, "get") else roi_nodes
global_to_roi = global_to_roi.get() if hasattr(global_to_roi, "get") else global_to_roi
# Map src/dst to ROI space
roi_src = int(global_to_roi[src])
roi_dst = int(global_to_roi[dst])
if roi_src < 0 or roi_dst < 0:
logger.warning("src or dst not in ROI")
return None
roi_size = len(roi_nodes)
dist = np.full(roi_size, np.inf, dtype=np.float32)
parent = np.full(roi_size, -1, dtype=np.int32)
visited = np.zeros(roi_size, dtype=bool)
dist[roi_src] = 0.0
# Heap-based Dijkstra: O(E log V) instead of O(V²)
heap = [(0.0, roi_src)]
while heap:
du, u_roi = heapq.heappop(heap)
# Skip if already visited (stale heap entry)
if visited[u_roi]:
continue
visited[u_roi] = True
# Early exit if we reached destination
if u_roi == roi_dst:
break
u_global = int(roi_nodes[u_roi])
s, e = int(self.indptr[u_global]), int(self.indptr[u_global + 1])
for ei in range(s, e):
v_global = int(self.indices[ei])
v_roi = int(global_to_roi[v_global])
if v_roi < 0 or visited[v_roi]:
continue
alt = du + float(costs[ei])
if alt < dist[v_roi]:
dist[v_roi] = alt
parent[v_roi] = u_roi
heapq.heappush(heap, (alt, v_roi))
if not np.isfinite(dist[roi_dst]):
return None
# Reconstruct path in global coordinates
path, cur = [], roi_dst
while cur != -1:
path.append(int(roi_nodes[cur]))
cur = int(parent[cur])
path.reverse()
return path if len(path) > 1 else None
def find_path_multisource_multisink(self, src_seeds: List[Tuple[int, float]],
dst_targets: List[Tuple[int, float]],
costs, roi_nodes, global_to_roi) -> Optional[Tuple[List[int], int, int]]:
"""
Find shortest path from any source to any destination with portal entry costs.
Returns: (path, entry_layer, exit_layer) or None
"""
import numpy as np
import heapq
# Ensure arrays are CPU NumPy
costs = costs.get() if hasattr(costs, "get") else costs
roi_nodes = roi_nodes.get() if hasattr(roi_nodes, "get") else roi_nodes
global_to_roi = global_to_roi.get() if hasattr(global_to_roi, "get") else global_to_roi
roi_size = len(roi_nodes)
dist = np.full(roi_size, np.inf, dtype=np.float32)
parent = np.full(roi_size, -1, dtype=np.int32)
visited = np.zeros(roi_size, dtype=bool)
# Initialize heap with all source seeds
heap = []
src_roi_nodes = set()
for global_node, initial_cost in src_seeds:
roi_idx = int(global_to_roi[global_node])
if roi_idx >= 0:
dist[roi_idx] = initial_cost
heapq.heappush(heap, (initial_cost, roi_idx))
src_roi_nodes.add(roi_idx)
# Build target set
dst_roi_nodes = {} # roi_idx -> (global_node, initial_cost)
for global_node, initial_cost in dst_targets:
roi_idx = int(global_to_roi[global_node])
if roi_idx >= 0:
dst_roi_nodes[roi_idx] = (global_node, initial_cost)
if not heap or not dst_roi_nodes:
return None
# Multi-source Dijkstra
reached_target = None
final_dist = np.inf
while heap:
du, u_roi = heapq.heappop(heap)
if visited[u_roi]:
continue
visited[u_roi] = True
# Check if we reached any target
if u_roi in dst_roi_nodes:
target_global, target_cost = dst_roi_nodes[u_roi]
total_dist = du + target_cost
if total_dist < final_dist:
final_dist = total_dist
reached_target = u_roi
# Don't break - might find better target
continue
u_global = int(roi_nodes[u_roi])
s, e = int(self.indptr[u_global]), int(self.indptr[u_global + 1])
for ei in range(s, e):
v_global = int(self.indices[ei])
v_roi = int(global_to_roi[v_global])
if v_roi < 0 or visited[v_roi]:
continue
alt = du + float(costs[ei])
if alt < dist[v_roi]:
dist[v_roi] = alt
parent[v_roi] = u_roi
heapq.heappush(heap, (alt, v_roi))
if reached_target is None:
return None
# Reconstruct path
path, cur = [], reached_target
while cur != -1:
path.append(int(roi_nodes[cur]))
cur = int(parent[cur])
path.reverse()
if len(path) <= 1:
return None
# Determine entry and exit layers
if self.plane_size:
entry_layer = path[0] // self.plane_size
exit_layer = path[-1] // self.plane_size
else:
# Fallback if plane_size not set
entry_layer = exit_layer = 0
return (path, entry_layer, exit_layer)
# ═══════════════════════════════════════════════════════════════════════════════
# PATHFINDER ROUTER
# ═══════════════════════════════════════════════════════════════════════════════
class PathFinderRouter:
"""PathFinder negotiated congestion routing"""
def __init__(self, config: PathFinderConfig = None, use_gpu: bool = None):
self.config = config or PathFinderConfig()
# Legacy API: accept use_gpu as kwarg
if use_gpu is not None:
self.config.use_gpu = use_gpu
# Environment variable overrides
env_use_gpu = os.getenv("USE_GPU")
if env_use_gpu is not None:
self.config.use_gpu = env_use_gpu == "1"
env_sequential = os.getenv("SEQUENTIAL_ALL")
if env_sequential is not None:
if hasattr(self.config, 'use_gpu_sequential'):
self.config.use_gpu_sequential = env_sequential == "1"
else:
setattr(self.config, 'use_gpu_sequential', env_sequential == "1")
env_incremental = os.getenv("INCREMENTAL_COST_UPDATE")
if env_incremental is not None:
if hasattr(self.config, 'use_incremental_cost_update'):
self.config.use_incremental_cost_update = env_incremental == "1"
else:
setattr(self.config, 'use_incremental_cost_update', env_incremental == "1")
# Log final configuration
logger.info(f"[CONFIG] use_gpu={self.config.use_gpu}")
logger.info(f"[CONFIG] use_gpu_sequential={getattr(self.config, 'use_gpu_sequential', True)}")
logger.info(f"[CONFIG] use_incremental_cost_update={getattr(self.config, 'use_incremental_cost_update', False)}")
self.lattice: Optional[Lattice3D] = None
self.graph: Optional[CSRGraph] = None
self.accounting: Optional[EdgeAccountant] = None
self.solver: Optional[SimpleDijkstra] = None
self.roi_extractor: Optional[ROIExtractor] = None
self.pad_to_node: Dict[str, int] = {}
self.net_paths: Dict[str, List[int]] = {}
self.iteration = 0
self._negotiation_ran = False
self._geometry_payload = GeometryPayload([], [])
self._provisional_geometry = GeometryPayload([], []) # For GUI feedback during routing
# Hotset management: locked nets and clean streak tracking
self.locked_nets: Set[str] = set()
self.net_clean_streak: Dict[str, int] = defaultdict(int) # iterations since last overuse
self.locked_freeze_threshold: int = 3 # Lock after K clean iterations
self.clean_nets_count: int = 0 # Track clean nets for sanity checking
# Edge-to-nets tracking for efficient hotset building
self._net_to_edges: Dict[str, List[int]] = {} # net_id -> [edge_indices]
self._edge_to_nets: Dict[int, Set[str]] = defaultdict(set) # edge_idx -> {net_ids}
# Portal escape tracking
self.portals: Dict[str, Portal] = {} # pad_id -> Portal
self.net_portal_failures: Dict[str, int] = defaultdict(int) # net_id -> failure count
self.net_pad_ids: Dict[str, Tuple[str, str]] = {} # net_id -> (src_pad_id, dst_pad_id)
self.net_portal_layers: Dict[str, Tuple[int, int]] = {} # net_id -> (entry_layer, exit_layer)
# Pad escape planner (initialized after lattice is created)
self.escape_planner: Optional[PadEscapePlanner] = None
# ROI policy: track stagnation and fallback usage
self.stagnation_counter: int = 0 # increments each stagnation event
self.full_graph_fallback_count: int = 0 # limit to 5 per iteration
# Rip tracking and pres_fac freezing (Fix 5)
self._last_ripped: Set[str] = set()
self._freeze_pres_fac_until: int = 0
# Connectivity check cache (optimization to avoid redundant BFS checks)
self._connectivity_cache: Dict[Tuple[int, int, int], bool] = {} # (src, dst, roi_hash) -> is_connected
self._connectivity_stats = {
'checks_performed': 0,
'cache_hits': 0,
'disconnected_found': 0,
'time_saved_ms': 0.0
}
# GPU vs CPU pathfinding usage tracking
self._gpu_path_count = 0 # Number of paths found using GPU
self._cpu_path_count = 0 # Number of paths found using CPU
# Legacy attributes for compatibility
self._instance_tag = f"PF-{int(time.time() * 1000) % 100000}"
logger.info(f"PathFinder (GPU={self.config.use_gpu and GPU_AVAILABLE}, Portals={self.config.portal_enabled})")
def initialize_graph(self, board: Board) -> bool:
"""Build routing graph"""
logger.info("=" * 80)
logger.info("PATHFINDER NEGOTIATED CONGESTION ROUTER - RUNTIME CONFIGURATION")
logger.info("=" * 80)
logger.info(f"[CONFIG] pres_fac_init = {self.config.pres_fac_init}")
logger.info(f"[CONFIG] pres_fac_mult = {self.config.pres_fac_mult}")
logger.info(f"[CONFIG] pres_fac_max = {self.config.pres_fac_max}")
logger.info(f"[CONFIG] hist_gain = {self.config.hist_gain}")
logger.info(f"[CONFIG] via_cost = {self.config.via_cost}")
logger.info(f"[CONFIG] grid_pitch = {self.config.grid_pitch} mm")
logger.info(f"[CONFIG] max_iterations = {self.config.max_iterations}")
logger.info(f"[CONFIG] stagnation_patience = {self.config.stagnation_patience}")
logger.info("=" * 80)
bounds = self._calc_bounds(board)
# Use board's real layer count (critical for dense boards)
layers_from_board = getattr(board, "layer_count", None) or len(getattr(board, "layers", [])) or self.config.layer_count
self.config.layer_count = int(layers_from_board)
# Ensure we have enough layer names
existing_names = getattr(self.config, "layer_names", None)
# Handle dataclass Field objects
if hasattr(existing_names, '__len__'):
has_enough_layers = len(existing_names) >= self.config.layer_count
else:
has_enough_layers = False
if not existing_names or not has_enough_layers:
self.config.layer_names = (
getattr(board, "layers", None)
or (["F.Cu"] + [f"In{i}.Cu" for i in range(1, self.config.layer_count-1)] + ["B.Cu"])
)
logger.info(f"Using {self.config.layer_count} layers from board")
self.lattice = Lattice3D(bounds, self.config.grid_pitch, self.config.layer_count)
self.graph = self.lattice.build_graph(
self.config.via_cost,
allowed_via_spans=self.config.allowed_via_spans,
use_gpu=self.config.use_gpu and GPU_AVAILABLE
)
# Note: graph.finalize() is now called inside build_graph() before validation
# Set N for ROI checks (number of nodes in full graph)
self.N = self.lattice.num_nodes
E = len(self.graph.indices)
self.accounting = EdgeAccountant(E, use_gpu=self.config.use_gpu and GPU_AVAILABLE)
# Via pooling arrays (GPU-accelerated for performance!)
Nx, Ny, Nz = self.lattice.x_steps, self.lattice.y_steps, self.lattice.layers
self._Nx, self._Ny, self._Nz = Nx, Ny, Nz
# Layer balancing (EWMA of per-layer horizontal overuse)
self.layer_bias = np.ones(Nz, dtype=np.float32) # Index by z (0..Nz-1), 1.0 = neutral
logger.info(f"[LAYER-BALANCE] Initialized for {Nz} layers")
# Determine if we should use GPU for via arrays
use_via_gpu = self.config.use_gpu and GPU_AVAILABLE
if getattr(self.config, "via_column_pooling", True):
# Create arrays on GPU if available, CPU otherwise
if use_via_gpu:
self.via_col_cap = cp.full((Nx, Ny), int(getattr(self.config, "via_column_capacity", 4)), dtype=cp.int16)
self.via_col_use = cp.zeros((Nx, Ny), dtype=cp.int16)
self.via_col_pres = cp.zeros((Nx, Ny), dtype=cp.float32)
logger.info(f"[VIA-POOL] Column pooling enabled (GPU): capacity={int(self.via_col_cap[0,0])} per (x,y)")
else:
self.via_col_cap = np.full((Nx, Ny), int(getattr(self.config, "via_column_capacity", 4)), dtype=np.int16)
self.via_col_use = np.zeros((Nx, Ny), dtype=np.int16)
self.via_col_pres = np.zeros((Nx, Ny), dtype=np.float32)
logger.info(f"[VIA-POOL] Column pooling enabled (CPU): capacity={self.via_col_cap[0,0]} per (x,y)")
if getattr(self.config, "via_segment_pooling", True):
# Segments between routing layers (1..Nz-2): segment z→z+1 stored at index z-1
self._segZ = Nz - 2 # Number of routing layers
if use_via_gpu:
self.via_seg_cap = cp.full((Nx, Ny, self._segZ), int(getattr(self.config, "via_segment_capacity", 2)), dtype=cp.int8)
self.via_seg_use = cp.zeros((Nx, Ny, self._segZ), dtype=cp.int16)
self.via_seg_pres = cp.zeros((Nx, Ny, self._segZ), dtype=cp.float32)
self.via_seg_prefix = cp.zeros((Nx, Ny, self._segZ), dtype=cp.float32)
logger.info(f"[VIA-POOL] Segment pooling enabled (GPU): {self._segZ} segments (z=1..{Nz-2}), capacity={int(self.via_seg_cap[0,0,0])} per segment")
else:
self.via_seg_cap = np.full((Nx, Ny, self._segZ), int(getattr(self.config, "via_segment_capacity", 2)), dtype=np.int8)
self.via_seg_use = np.zeros((Nx, Ny, self._segZ), dtype=np.int16)
self.via_seg_pres = np.zeros((Nx, Ny, self._segZ), dtype=np.float32)
self.via_seg_prefix = np.zeros((Nx, Ny, self._segZ), dtype=np.float32)
logger.info(f"[VIA-POOL] Segment pooling enabled (CPU): {self._segZ} segments (z=1..{Nz-2}), capacity={self.via_seg_cap[0,0,0]} per segment")
# Initialize ViaKernelManager for GPU-accelerated via operations
self.via_kernel_manager = ViaKernelManager(use_gpu=use_via_gpu)
logger.info(f"[VIA-KERNELS] Manager initialized (GPU={'YES' if use_via_gpu else 'NO'})")
# NODE OWNERSHIP TRACKING: Track which net owns each node (via barrels)
# -1 = free, otherwise net_id (mapped to integer)
# This is THE solution to via barrel conflicts - enforce at node level, not edge level!
self.node_owner = np.full(self.lattice.num_nodes, -1, dtype=np.int32)
self.net_id_map = {} # net_name -> integer ID
self.next_net_id = 0
logger.info(f"[NODE-OWNER] Initialized node ownership tracking for {self.lattice.num_nodes:,} nodes")
self.solver = SimpleDijkstra(self.graph, self.lattice)
# Add GPU solver if available
use_gpu_solver = self.config.use_gpu and GPU_AVAILABLE and CUDA_DIJKSTRA_AVAILABLE
# Enhanced debug logging
logger.info(f"[GPU-INIT] config.use_gpu={self.config.use_gpu}, GPU_AVAILABLE={GPU_AVAILABLE}, CUDA_DIJKSTRA_AVAILABLE={CUDA_DIJKSTRA_AVAILABLE}")
logger.info(f"[GPU-INIT] use_gpu_solver={use_gpu_solver}")
if use_gpu_solver:
try:
self.solver.gpu_solver = CUDADijkstra(self.graph, self.lattice)
logger.info("[GPU] CUDA Near-Far Dijkstra enabled (ROI > 5K nodes) with lattice dims")
# Log GPU details
device = cp.cuda.Device()
mem_free, mem_total = device.mem_info
logger.info(f"[GPU] GPU Compute Capability: {device.compute_capability}")
logger.info(f"[GPU] GPU Memory: {mem_free / 1e9:.1f} GB free / {mem_total / 1e9:.1f} GB total")
except Exception as e:
logger.warning(f"[GPU] Failed to initialize CUDA Dijkstra: {e}")
self.solver.gpu_solver = None
else:
self.solver.gpu_solver = None
reasons = []
if not self.config.use_gpu:
reasons.append("config.use_gpu=False")
if not GPU_AVAILABLE:
reasons.append("CuPy not installed")
if not CUDA_DIJKSTRA_AVAILABLE:
reasons.append("CUDADijkstra import failed")
logger.info(f"[GPU] CPU-only mode: {', '.join(reasons)}")
self.roi_extractor = ROIExtractor(self.graph, use_gpu=self.config.use_gpu and GPU_AVAILABLE, lattice=self.lattice)
# Identify via edges for via-specific accounting
self._identify_via_edges()
# Build via edge metadata for vectorized penalty application
self._build_via_edge_metadata()
self._map_pads(board)
# Initialize escape planner after pads are mapped
# Use deterministic random seed for reproducible escape planning (default: 42)
escape_seed = getattr(self.config, 'escape_random_seed', 42)
self.escape_planner = PadEscapePlanner(self.lattice, self.config, self.pad_to_node, random_seed=escape_seed)
# NOTE: Portal planning is now done by PadEscapePlanner.precompute_all_pad_escapes()
# which is called from main_window.py AFTER initialization.
# The old _plan_portals() method is disabled in favor of the column-based algorithm.
# Portals will be copied from escape_planner in precompute_all_pad_escapes().
logger.info(f"Portal planning delegated to PadEscapePlanner (column-based, seed={escape_seed})")
# Note: Portal discounts are applied at seed level in _get_portal_seeds()
# No need for graph-level discount modification
logger.info("=== Init complete ===")
return True
def _calc_bounds(self, board: Board) -> Tuple[float, float, float, float]:
"""
Compute routing grid bounds from pads extracted via board.nets.
This is called during initialize_graph() BEFORE escape planning,
so we extract pads from board.nets (which IS available) rather than
board.components (which may be incomplete).
"""
pads_with_nets = []
ROUTING_MARGIN = 3.0 # mm
# Extract pads from board.nets (reliable during initialization)
try:
if hasattr(board, 'nets') and board.nets:
for net in board.nets:
# Only consider nets with 2+ pads (routable nets)
if hasattr(net, 'pads') and len(net.pads) >= 2:
for pad in net.pads:
if hasattr(pad, 'position') and pad.position is not None:
pads_with_nets.append(pad)
if pads_with_nets:
xs = [p.position.x for p in pads_with_nets]
ys = [p.position.y for p in pads_with_nets]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
logger.info(f"[BOUNDS] Extracted {len(pads_with_nets)} pads from {len(board.nets)} nets")
logger.info(f"[BOUNDS] Pad area: ({min_x:.1f}, {min_y:.1f}) to ({max_x:.1f}, {max_y:.1f})")
# Add routing margin
bounds = (
min_x - ROUTING_MARGIN,
min_y - ROUTING_MARGIN,
max_x + ROUTING_MARGIN,
max_y + ROUTING_MARGIN
)
logger.info(f"[BOUNDS] Final with {ROUTING_MARGIN}mm margin: ({bounds[0]:.1f}, {bounds[1]:.1f}) to ({bounds[2]:.1f}, {bounds[3]:.1f})")
return bounds
except Exception as e:
logger.warning(f"[BOUNDS] Failed to extract pads from board.nets: {e}")
# Fallback: Use full board bounds + margin (suboptimal but safe)
logger.warning(f"[BOUNDS] No pads found via board.nets, falling back to board._kicad_bounds + {ROUTING_MARGIN}mm")
if hasattr(board, "_kicad_bounds"):
b = board._kicad_bounds
return (b[0] - ROUTING_MARGIN, b[1] - ROUTING_MARGIN,
b[2] + ROUTING_MARGIN, b[3] + ROUTING_MARGIN)
# Ultimate fallback
logger.error("[BOUNDS] No bounds available, using default 100x100mm")
return (0, 0, 100, 100)
def _pad_key(self, pad, comp=None):
"""Generate unique pad key with coordinates for orphaned pads"""
comp_id = getattr(pad, "component_id", None) or (getattr(comp, "id", None) if comp else None) or "GENERIC_COMPONENT"
# For orphaned pads (all in GENERIC_COMPONENT), include coordinates to ensure uniqueness
# since pad IDs like "1", "2", "3" will collide across multiple components
if comp_id == "GENERIC_COMPONENT" and hasattr(pad, 'position'):
xq = int(round(pad.position.x * 1000))
yq = int(round(pad.position.y * 1000))
return f"{comp_id}_{pad.id}@{xq},{yq}"
return f"{comp_id}_{pad.id}"
def _get_pad_layer(self, pad) -> int:
"""Get the layer index for a pad with fallback handling"""
# Check if pad has explicit layer information
if hasattr(pad, 'layer') and pad.layer:
layer_name = str(pad.layer)
if layer_name in self.config.layer_names:
return self.config.layer_names.index(layer_name)
logger.debug(f"Pad layer '{layer_name}' not in layer_names, using fallback")
# Check if pad has layers list (multi-layer pads)
if hasattr(pad, 'layers') and pad.layers:
# Use first layer in the list
layer_name = str(pad.layers[0])
if layer_name in self.config.layer_names:
return self.config.layer_names.index(layer_name)
logger.debug(f"Pad layers[0] '{layer_name}' not in layer_names, using fallback")
# Check drill attribute to determine if through-hole
drill = getattr(pad, 'drill', 0.0)
if drill > 0:
# Through-hole pad - default to F.Cu (layer 0)
return 0 # F.Cu
# Default to F.Cu for SMD pads
return 0
def _map_pads(self, board: Board):
"""Map every pad to a lattice node with unique keys."""
count_components = 0
count_board_level = 0
sample_ids = []
oob_count = 0
layer_fallback_count = 0
def _snap_to_node(x_mm, y_mm, layer=0):
x_idx, y_idx = self.lattice.world_to_lattice(x_mm, y_mm)
# Clamp to valid range (prevents OOB)
x_idx = max(0, min(x_idx, self.lattice.x_steps - 1))
y_idx = max(0, min(y_idx, self.lattice.y_steps - 1))
return self.lattice.node_idx(x_idx, y_idx, layer)
# Pads that come via components - keep on physical layers
for comp in getattr(board, "components", []):
for pad in getattr(comp, "pads", []):
pad_id = self._pad_key(pad, comp)
layer = self._get_pad_layer(pad)
node = _snap_to_node(pad.position.x, pad.position.y, layer)
self.pad_to_node[pad_id] = node
count_components += 1
if len(sample_ids) < 5:
sample_ids.append(pad_id)
# Pads that might live at board level (GUI created "generic component")
for pad in getattr(board, "pads", []):
pad_id = self._pad_key(pad, comp=None)
if pad_id not in self.pad_to_node:
layer = self._get_pad_layer(pad)
node = _snap_to_node(pad.position.x, pad.position.y, layer)
self.pad_to_node[pad_id] = node
count_board_level += 1
logger.info(f"Mapped {len(self.pad_to_node)} pads (from ~{count_components + count_board_level})")
logger.info(f"[VERIFY] Sample pad IDs: {sample_ids[:5]}")
def _plan_portals(self, board: Board):
"""Plan portal escape points for all pads"""
if not self.config.portal_enabled:
logger.info("Portal escapes disabled")
return
portal_count = 0
tht_skipped = 0
# Plan portals for component pads
for comp in getattr(board, "components", []):
for pad in getattr(comp, "pads", []):
# Skip through-hole pads (they already span all layers)
drill = getattr(pad, 'drill', 0.0)
if drill > 0:
tht_skipped += 1
continue
pad_id = self._pad_key(pad, comp)
if pad_id in self.pad_to_node:
portal = self._plan_portal_for_pad(pad, pad_id)
if portal:
self.portals[pad_id] = portal
portal_count += 1
# Plan portals for board-level pads
for pad in getattr(board, "pads", []):
drill = getattr(pad, 'drill', 0.0)
if drill > 0:
tht_skipped += 1
continue
pad_id = self._pad_key(pad, comp=None)
if pad_id in self.pad_to_node and pad_id not in self.portals:
portal = self._plan_portal_for_pad(pad, pad_id)
if portal:
self.portals[pad_id] = portal
portal_count += 1
logger.info(f"Planned {portal_count} portals (skipped {tht_skipped} THT pads)")
def _plan_portal_for_pad(self, pad, pad_id: str) -> Optional[Portal]:
"""Plan portal escape point for a single pad"""
# Get pad position and layer
pad_x, pad_y = pad.position.x, pad.position.y
pad_layer = self._get_pad_layer(pad)
# Snap pad x to nearest lattice column (within ½ pitch)
x_idx_nearest, _ = self.lattice.world_to_lattice(pad_x, pad_y)
x_idx_nearest = max(0, min(x_idx_nearest, self.lattice.x_steps - 1))
# Check if snap is within tolerance
x_mm_snapped, _ = self.lattice.geom.lattice_to_world(x_idx_nearest, 0)
x_snap_dist_steps = abs(pad_x - x_mm_snapped) / self.config.grid_pitch
if x_snap_dist_steps > self.config.portal_x_snap_max:
logger.debug(f"Pad {pad_id}: x-snap {x_snap_dist_steps:.2f} exceeds max {self.config.portal_x_snap_max}")
return None
x_idx = x_idx_nearest
# Get pad y index
_, y_idx_pad = self.lattice.world_to_lattice(pad_x, pad_y)
y_idx_pad = max(0, min(y_idx_pad, self.lattice.y_steps - 1))
# Score all candidate portal offsets
candidates = []
cfg = self.config
for delta_steps in range(cfg.portal_delta_min, cfg.portal_delta_max + 1):
for direction in [+1, -1]:
y_idx_portal = y_idx_pad + direction * delta_steps
# Check bounds
if y_idx_portal < 0 or y_idx_portal >= self.lattice.y_steps:
continue
# Score this candidate
# Component 1: Delta preference (prefer portal_delta_pref)
delta_penalty = abs(delta_steps - cfg.portal_delta_pref)
# Component 2: X-snap penalty
x_snap_penalty = x_snap_dist_steps * 2.0 # Weight x-snap errors
# Component 3: Congestion avoidance (sample history at portal location)
# (Skip for now - history not populated yet at init time)
congestion_penalty = 0.0
total_score = delta_penalty + x_snap_penalty + congestion_penalty
candidates.append((total_score, x_idx, y_idx_portal, delta_steps, direction))
if not candidates:
return None
# Pick best candidate (lowest score)
score, x_idx, y_idx, delta, direction = min(candidates)
return Portal(
x_idx=x_idx,
y_idx=y_idx,
pad_layer=pad_layer,
delta_steps=delta,
direction=direction,
pad_x=pad_x,
pad_y=pad_y,
score=score,
retarget_count=0
)
def _get_portal_seeds(self, portal: Portal) -> List[Tuple[int, float]]:
"""
Get portal entry point seeds for routing.
NEW ARCHITECTURE: Escape planner creates F.Cu stubs only (no portal vias).
PathFinder MUST route from F.Cu where stub ends.
PathFinder will create the escape via (F.Cu → internal layer) based on routing needs.
CRITICAL: Only seed on F.Cu (layer 0)! Multi-layer seeding would bypass F.Cu.
"""
seeds = []
# ONLY seed on F.Cu (layer 0) where the escape stub ends
# PathFinder will create via from F.Cu to whatever internal layer it needs
node_idx = self.lattice.node_idx(portal.x_idx, portal.y_idx, 0)
cost = 0.0 # F.Cu portal - where escape stub ends
seeds.append((node_idx, cost))
return seeds
def _build_routing_seeds(self, portal_seeds_list):
"""
Convert portal seeds from (node_idx, cost) tuples to plain node_idx arrays.
Args:
portal_seeds_list: List of (node_idx, cost) tuples from _get_portal_seeds()
Returns:
np.int32 array of node indices
"""
import numpy as np
if not portal_seeds_list:
return np.array([], dtype=np.int32)
# Extract just the node indices, ignore costs
return np.array([seed[0] for seed in portal_seeds_list], dtype=np.int32)
def route_multiple_nets(self, requests: List, progress_cb=None, iteration_cb=None) -> Dict:
"""
Main entry for routing multiple nets.
Args:
requests: List of net routing requests
progress_cb: Callback(done, total, eta) called after each net routed
iteration_cb: Callback(iteration, tracks, vias) called after each pathfinder iteration
Used for screenshot capture and progress visualization
Returns:
Dict of routing results
"""
logger.info(f"=== Route {len(requests)} nets ===")
gpu_threshold = getattr(self.config, 'gpu_roi_min_nodes', 1000)
logger.info(f"[GPU-THRESHOLD] GPU pathfinding enabled for ROIs with > {gpu_threshold} nodes")
tasks = self._parse_requests(requests)
if not tasks:
self._negotiation_ran = True
return {}
result = self._pathfinder_negotiation(tasks, progress_cb, iteration_cb)
return result
def _parse_requests(self, requests: List) -> Dict[str, Tuple[int, int]]:
"""Parse to (net: (src, dst))"""
tasks = {}
# Track why nets are dropped
unmapped_pads = 0
same_cell_trivial = 0
kept = 0
for req in requests:
if hasattr(req, 'name') and hasattr(req, 'pads'):
net_name = req.name
pads = req.pads
if len(pads) < 2:
continue
p1, p2 = pads[0], pads[1]
# Use same key scheme as mapping
p1_id = self._pad_key(p1)
p2_id = self._pad_key(p2)
if p1_id in self.pad_to_node and p2_id in self.pad_to_node:
# Route from PORTAL positions (escape vias), not from pads
# Portals are pre-computed and stored in self.portals
if p1_id in self.portals and p2_id in self.portals:
p1_portal = self.portals[p1_id]
p2_portal = self.portals[p2_id]
# CRITICAL FIX: Use portal's computed entry_layer (routing layer, NOT F.Cu!)
# Portals land on internal routing layers (1-17), NOT on F.Cu (0)
# The escape planner already handles F.Cu → portal transitions
entry_layer = p1_portal.entry_layer # Use portal's routing layer
exit_layer = p2_portal.entry_layer # Use portal's routing layer
# Convert portal positions to node indices
src = self.lattice.node_idx(p1_portal.x_idx, p1_portal.y_idx, entry_layer)
dst = self.lattice.node_idx(p2_portal.x_idx, p2_portal.y_idx, exit_layer)
if src != dst:
tasks[net_name] = (src, dst)
self.net_pad_ids[net_name] = (p1_id, p2_id) # Track pad IDs
self.net_portal_layers[net_name] = (entry_layer, exit_layer) # Track layers
kept += 1
else:
same_cell_trivial += 1
self.net_paths[net_name] = [src]
logger.debug(f"Net {net_name}: trivial route (portals at same position)")
else:
# Portals not found - skip this net
if p1_id not in self.portals:
logger.debug(f"Net {net_name}: no portal for {p1_id}")
if p2_id not in self.portals:
logger.debug(f"Net {net_name}: no portal for {p2_id}")
unmapped_pads += 1
else:
unmapped_pads += 1
if unmapped_pads <= 3: # Log first 3 examples
logger.warning(f"Net {net_name}: pads {p1_id}, {p2_id} not in pad_to_node")
logger.warning(f" Available keys sample: {list(self.pad_to_node.keys())[:5]}")
logger.warning(f" p1 attrs: {dir(p1)[:10] if hasattr(p1, '__dir__') else 'N/A'}")
else:
logger.debug(f"Net {net_name}: pads {p1_id}, {p2_id} not in pad_to_node")
elif isinstance(req, tuple) and len(req) == 3:
net_name, src, dst = req
if isinstance(src, int) and isinstance(dst, int):
if src != dst:
tasks[net_name] = (src, dst)
kept += 1
else:
same_cell_trivial += 1
# Mark as trivially routed
self.net_paths[net_name] = [src]
routed_trivial = same_cell_trivial
dropped = unmapped_pads
logger.info(f"[VERIFY] Parsed {len(tasks)} tasks from {len(requests)} requests")
logger.info(f"[VERIFY] routable={kept}, trivial={routed_trivial}, unmapped={unmapped_pads}, total_handled={kept+routed_trivial}")
return tasks
def _accumulate_via_usage_for_path(self, node_path: List[int], net_id: str = None):
"""
Accumulate via column and segment usage for a committed path.
Also registers via keepouts to prevent other nets from routing tracks through via locations.
"""
if not hasattr(self, 'via_col_use') and not hasattr(self, 'via_seg_use'):
return # Pooling not enabled
# Ensure via_keepouts_map exists
if not hasattr(self, '_via_keepouts_map'):
self._via_keepouts_map = {}
idx_to_coord = self.lattice.idx_to_coord
col_pool = hasattr(self, 'via_col_use')
seg_pool = hasattr(self, 'via_seg_use')
for u, v in zip(node_path, node_path[1:]):
xu, yu, zu = idx_to_coord(u)
xv, yv, zv = idx_to_coord(v)
# Check if it's a vertical transition (same x,y, different z)
if xu == xv and yu == yv and zu != zv:
# Column pooling: increment total vias at this (x,y)
if col_pool:
self.via_col_use[xu, yu] += 1
# Segment pooling: increment each segment crossed
if seg_pool:
z_lo, z_hi = (zu, zv) if zu < zv else (zv, zu)
# Clamp to routing layers: 1..Nz-2
z_lo = max(1, min(z_lo, self._Nz - 2))
z_hi = max(1, min(z_hi, self._Nz - 2))
# Increment each segment z→z+1 in range [z_lo, z_hi)
for z in range(z_lo, z_hi):
seg_idx = z - 1 # Segment z→z+1 stored at index z-1
if 0 <= seg_idx < self._segZ:
self.via_seg_use[xu, yu, seg_idx] += 1
# Register via keepouts for ALL layers the via touches (including endpoints!)
# This prevents other nets from routing tracks through via locations
if net_id:
z_lo, z_hi = (zu, zv) if zu < zv else (zv, zu)
for z in range(z_lo, z_hi + 1): # Include both endpoints!
key = (z, xu, yu)
# First owner wins
if key not in self._via_keepouts_map:
self._via_keepouts_map[key] = net_id
def _rebuild_via_usage_from_committed(self):
"""Rebuild via column/segment usage from all currently committed net paths"""
if not hasattr(self, 'via_col_use') and not hasattr(self, 'via_seg_use'):
return
# Clear counters
if hasattr(self, 'via_col_use'):
self.via_col_use.fill(0)
if hasattr(self, 'via_seg_use'):
self.via_seg_use.fill(0)
# Clear routing via keepouts but PRESERVE portal keepouts
# Portal keepouts were pre-registered to protect escape via columns
if hasattr(self, '_via_keepouts_map'):
# Save portal keepouts (any keepout not from a routed path)
portal_keepouts = {k: v for k, v in self._via_keepouts_map.items()
if v not in self.net_paths}
self._via_keepouts_map.clear()
self._via_keepouts_map.update(portal_keepouts)
if portal_keepouts:
logger.debug(f"[VIA-REBUILD] Preserved {len(portal_keepouts)} portal keepouts during rebuild")
# Rebuild from all committed paths (including keepout registration)
for net_id, node_path in self.net_paths.items():
if node_path and len(node_path) > 1:
self._accumulate_via_usage_for_path(node_path, net_id=net_id)
# Also re-track escape vias (they're not in net_paths but occupy space)
self._track_escape_vias_in_via_usage()
# Log keepout statistics
if hasattr(self, '_via_keepouts_map'):
logger.info(f"[VIA-KEEPOUTS] Registered {len(self._via_keepouts_map)} via keepout cells")
# REBUILD NODE OWNERSHIP (the correct solution!)
self._rebuild_node_owner()
def _rebuild_node_owner(self):
"""
Rebuild node ownership map from portal reservations and committed vias.
This is THE solution to via barrel conflicts:
- Track which net owns each node (via barrels occupy nodes, not just edges)
- Enforce via ROI bitmap masking (O(ROI_size) per net, not O(N×M)!)
- Works for both full-graph and ROI routing
Performance: O(portals + via_count × avg_span) = milliseconds, not minutes!
"""
if not hasattr(self, 'node_owner'):
return
# Reset to all free
self.node_owner.fill(-1)
owned_count = 0
# 1. PORTAL RESERVATIONS: DISABLED - causing frontier empty issues
# TODO: Debug why portal reservations block source seeds
# if hasattr(self, 'portals') and self.portals:
# for pad_id, portal in self.portals.items():
# net_name = pad_id.rsplit('-', 1)[0] if '-' in pad_id else pad_id
# net_id = self._get_net_id(net_name)
# x_idx, y_idx = portal.x_idx, portal.y_idx
# for z in range(1, self.lattice.layers - 1):
# node_idx = self.lattice.node_idx(x_idx, y_idx, z)
# self.node_owner[node_idx] = net_id
# owned_count += 1
# 2. COMMITTED VIA BARRELS: Mark nodes occupied by routed vias
for net_name, path in self.net_paths.items():
if not path or len(path) < 2:
continue
net_id = self._get_net_id(net_name)
# Walk path and find layer transitions (vias)
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
xu, yu, zu = self.lattice.idx_to_coord(u)
xv, yv, zv = self.lattice.idx_to_coord(v)
# Via: same (x,y), different z
if xu == xv and yu == yv and zu != zv:
# Mark ALL nodes in the via barrel span
z_lo, z_hi = (zu, zv) if zu < zv else (zv, zu)
for z in range(z_lo, z_hi + 1):
node_idx = self.lattice.node_idx(xu, yu, z)
self.node_owner[node_idx] = net_id
owned_count += 1
# 3. ESCAPE VIA COLUMNS: Mark internal-layer nodes from via keepouts (escape vias!)
# This is THE missing piece - escape vias in _via_keepouts_map weren't in node_owner!
if hasattr(self, '_via_keepouts_map') and self._via_keepouts_map:
escape_owned = 0
for (z, xu, yu), owner_net in self._via_keepouts_map.items():
# Skip F.Cu (z=0) so source seeds aren't blocked
if z <= 0:
continue
node_idx = self.lattice.node_idx(xu, yu, z)
net_id_int = self._get_net_id(owner_net)
# First owner wins (don't thrash ownership)
if self.node_owner[node_idx] == -1:
self.node_owner[node_idx] = net_id_int
owned_count += 1
escape_owned += 1
if escape_owned > 0:
logger.info(f"[NODE-OWNER] Escape via columns marked: {escape_owned:,} internal layer nodes")
logger.info(f"[NODE-OWNER] Marked {owned_count:,} nodes as owned ({owned_count*100//self.lattice.num_nodes}% of graph)")
# NOTE: Node ownership enforced via bitmap filtering in GPU kernels, not cost penalties!
def _get_net_id(self, net_name: str) -> int:
"""Map net name to integer ID for node ownership"""
if net_name not in self.net_id_map:
self.net_id_map[net_name] = self.next_net_id
self.next_net_id += 1
return self.net_id_map[net_name]
def _ensure_edge_src_map(self):
"""
Build mapping from edge index to source node (once per routing).
This is critical for barrel conflict detection - we need to know both
endpoints of each edge. The CSR graph gives us destinations (indices),
but we need to reconstruct sources from indptr.
Performance: O(num_edges), done once at start of routing.
"""
if hasattr(self, "_edge_src"):
return
import numpy as np
# Get indptr from graph (handle both CPU and GPU)
indptr = self.graph.indptr
if hasattr(indptr, 'get'):
indptr = indptr.get()
# Total number of edges
num_edges = len(self.graph.indices)
# Build reverse mapping: edge_idx → source_node
edge_src = np.empty(num_edges, dtype=np.int32)
for u in range(len(indptr) - 1):
edge_start = int(indptr[u])
edge_end = int(indptr[u + 1])
edge_src[edge_start:edge_end] = u
self._edge_src = edge_src
logger.info(f"[EDGE-SRC-MAP] Built mapping for {num_edges:,} edges")
def _mark_via_barrel_ownership_for_path(self, net_name: str, path: List[int]) -> None:
"""
Mark via barrel nodes as owned by this net IMMEDIATELY after commit.
CRITICAL: This must be called AFTER each net commits, not just at iteration start!
Without this, later nets in the same iteration don't see earlier nets' via barrels.
"""
if not path or len(path) < 2:
return
net_id = self._get_net_id(net_name)
# Walk path and find via transitions
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
xu, yu, zu = self.lattice.idx_to_coord(u)
xv, yv, zv = self.lattice.idx_to_coord(v)
# Via: same (x,y), different z
if xu == xv and yu == yv and zu != zv:
# Mark ALL nodes in via barrel span
z_lo, z_hi = (zu, zv) if zu < zv else (zv, zu)
for z in range(z_lo, z_hi + 1):
node_idx = self.lattice.node_idx(xu, yu, z)
self.node_owner[node_idx] = net_id
def _build_owner_bitmap_for_fullgraph(self, current_net: str, force_allow_nodes=None) -> np.ndarray:
"""
Build owner-aware bitmap for full-graph routing.
Returns uint32 bitmap where bit=1 if node is free OR owned by current net.
This allows the GPU wavefront kernel to skip nodes owned by other nets.
CRITICAL: Force-allows source/dest seeds even if ownership bookkeeping lags!
Performance: O(N/32) vectorized bitmap operations = milliseconds per net
Memory: ~14k uint32 words (~56KB) per bitmap
Args:
current_net: Net currently being routed
force_allow_nodes: Source/dest nodes to force-allow (even if owned by others)
Returns:
uint32 bitmap array (words = ceil(num_nodes/32))
"""
net_id = self._get_net_id(current_net)
owners = self.node_owner # np.int32[num_nodes]
# Vectorized: allowed = (free OR owned by current net)
allowed = (owners == -1) | (owners == net_id)
# CRITICAL: Force-allow seeds (prevents frontier empty!)
if force_allow_nodes is not None and len(force_allow_nodes) > 0:
allowed[force_allow_nodes] = True
n = int(allowed.size)
words = (n + 31) // 32
bitmap = np.zeros(words, dtype=np.uint32)
# Pack bits into words (vectorized)
idx = np.nonzero(allowed)[0].astype(np.int64)
if len(idx) > 0:
word_indices = (idx >> 5).astype(np.int32) # idx // 32
bit_positions = (idx & 31).astype(np.int32) # idx % 32
bit_values = (1 << bit_positions).astype(np.uint32)
# OR bits into words (use add.at since bits don't overlap per index)
np.add.at(bitmap, word_indices, bit_values)
return bitmap
def _filter_roi_by_ownership(self, roi_nodes: np.ndarray, current_net: str) -> np.ndarray:
"""
Filter ROI nodes to exclude nodes owned by OTHER nets (owner-aware).
This prevents routing through via barrels occupied by other nets.
Fast vectorized operation: O(ROI_size) instead of O(N×M)!
Args:
roi_nodes: Array of node indices in ROI
current_net: Net currently being routed
Returns:
Filtered roi_nodes array (nodes owned by other nets removed)
"""
if not hasattr(self, 'node_owner'):
return roi_nodes
current_net_id = self._get_net_id(current_net)
# Vectorized ownership check (FAST!)
owners = self.node_owner[roi_nodes]
# Keep nodes that are free (-1) OR owned by current net
keep_mask = (owners == -1) | (owners == current_net_id)
filtered_roi = roi_nodes[keep_mask]
blocked = len(roi_nodes) - len(filtered_roi)
if blocked > 0:
logger.debug(f"[NODE-OWNER] Net {current_net}: filtered {blocked:,}/{len(roi_nodes):,} ROI nodes owned by other nets")
return filtered_roi
def _track_escape_vias_in_via_usage(self):
"""
Register escape vias in via spatial tracking arrays.
This ensures that via columns and segments used by pad escape vias
are properly tracked, preventing routing conflicts.
"""
if not hasattr(self, '_escape_vias') or not self._escape_vias:
return
if not hasattr(self, 'via_col_use') and not hasattr(self, 'via_seg_use'):
# Via spatial tracking not enabled
return
tracked_count = 0
for via_dict in self._escape_vias:
# Extract via information
x_mm = via_dict.get('x')
y_mm = via_dict.get('y')
from_layer_name = via_dict.get('from_layer')
to_layer_name = via_dict.get('to_layer')
if x_mm is None or y_mm is None or from_layer_name is None or to_layer_name is None:
continue
# Convert world coordinates to lattice indices
xu, yu = self.lattice.world_to_lattice(x_mm, y_mm)
if xu < 0 or xu >= self.lattice.x_steps or yu < 0 or yu >= self.lattice.y_steps:
continue
# Convert layer names to indices
z_lo = self._layer_name_to_index(from_layer_name)
z_hi = self._layer_name_to_index(to_layer_name)
if z_lo is None or z_hi is None:
continue
# Ensure z_lo < z_hi
if z_lo > z_hi:
z_lo, z_hi = z_hi, z_lo
# Track in column usage
if hasattr(self, 'via_col_use'):
self.via_col_use[xu, yu] += 1
# Track in segment usage
if hasattr(self, 'via_seg_use'):
for z in range(z_lo, z_hi):
seg_idx = z - 1 # Segments indexed from 0
if 0 <= seg_idx < self._segZ:
self.via_seg_use[xu, yu, seg_idx] += 1
# Register via keepouts for ALL layers (including endpoints) to block tracks
if not hasattr(self, '_via_keepouts_map'):
self._via_keepouts_map = {}
net_id = via_dict.get('net', 'escape_via')
for z in range(z_lo, z_hi + 1): # Include both endpoints!
key = (z, xu, yu)
self._via_keepouts_map.setdefault(key, net_id)
tracked_count += 1
if tracked_count > 0:
logger.info(f"[ESCAPE-VIA] Tracked {tracked_count} escape vias in via spatial arrays")
def _layer_name_to_index(self, layer_name: str) -> Optional[int]:
"""Convert layer name to layer index, or None if not found"""
if not hasattr(self.config, 'layer_names'):
return None
try:
return self.config.layer_names.index(layer_name)
except (ValueError, AttributeError):
# Try numeric layer format like "L5"
if layer_name.startswith('L') and layer_name[1:].isdigit():
return int(layer_name[1:])
return None
def _apply_via_keepouts_to_graph(self):
"""
Apply via keepouts to the graph by blocking planar routing at via locations.
This prevents tracks from routing through via locations when using full-graph routing.
The via keepouts are stored in _via_keepouts_map as (z, x, y) -> net_id.
For each keepout location, we block PLANAR edges (same-layer horizontal/vertical edges)
but allow VIA edges (inter-layer edges) so that other vias can still use the same column.
Note: This is a global blocking approach. For per-net owner-aware keepouts, see ROI extraction.
This method is called after via usage tracking is rebuilt, ensuring all current vias are blocked.
"""
if not hasattr(self, '_via_keepouts_map') or not self._via_keepouts_map:
logger.debug("[VIA-KEEPOUT] No via keepouts to apply")
return
if not hasattr(self, 'graph') or self.graph is None:
logger.warning("[VIA-KEEPOUT] Graph not initialized, cannot apply keepouts")
return
# Get the base cost array (this is where we'll block edges)
if not hasattr(self.graph, 'base_costs') or self.graph.base_costs is None:
logger.warning("[VIA-KEEPOUT] Base cost array not available, cannot apply keepouts")
return
base_cost = self.graph.base_costs
is_gpu = hasattr(base_cost, 'get') # Check if it's a GPU array
# Convert to CPU for modification if needed
if is_gpu:
base_cost_cpu = base_cost.get()
else:
base_cost_cpu = base_cost
# Get graph structure
indptr = self.graph.indptr
indices = self.graph.indices
# Convert to CPU if on GPU
if hasattr(indptr, 'get'):
indptr = indptr.get()
if hasattr(indices, 'get'):
indices = indices.get()
blocked_planar_edges = 0
keepout_block_cost = 1e9 # Very high cost to effectively block the edge
# For each via keepout location, block PLANAR edges only (not via edges)
for (z, x, y), owner_net in self._via_keepouts_map.items():
# Convert lattice coordinates to node index
node_idx = self.lattice.node_idx(x, y, z)
# Get source coordinates
src_x, src_y, src_z = self.lattice.idx_to_coord(node_idx)
# Block planar outgoing edges from this node
start = int(indptr[node_idx])
end = int(indptr[node_idx + 1])
for edge_idx in range(start, end):
dst_node = int(indices[edge_idx])
dst_x, dst_y, dst_z = self.lattice.idx_to_coord(dst_node)
# Only block PLANAR edges (same layer), allow VIA edges
if src_z == dst_z:
# This is a planar edge (horizontal or vertical track)
base_cost_cpu[edge_idx] = keepout_block_cost
blocked_planar_edges += 1
# Via edges (src_z != dst_z) are NOT blocked to allow other vias in same column
# Update the base cost array (copy back to GPU if needed)
if is_gpu:
try:
import cupy as cp
self.graph.base_costs = cp.asarray(base_cost_cpu)
except ImportError:
self.graph.base_costs = base_cost_cpu
else:
self.graph.base_costs = base_cost_cpu
logger.info(f"[VIA-KEEPOUT] Applied {len(self._via_keepouts_map)} via keepouts, blocked {blocked_planar_edges} planar edges in full graph")
def _apply_owner_aware_via_keepouts(self, current_net_id: str, costs) -> int:
"""
Apply via keepouts for full-graph routing (owner-aware).
Temporarily blocks planar edges at via locations owned by OTHER nets.
This prevents the current net from routing tracks through other nets' via barrels.
Args:
current_net_id: Net currently being routed
costs: Cost array (CuPy or NumPy)
Returns:
Number of edges blocked
"""
if not hasattr(self, '_via_keepouts_map') or not self._via_keepouts_map:
return 0
# Store original costs for restoration
if not hasattr(self, '_via_keepout_backup'):
self._via_keepout_backup = {}
is_gpu = hasattr(costs, 'device')
xp = cp if is_gpu else np
# Get graph structure (already on appropriate device)
indptr = self.graph.indptr
indices = self.graph.indices
blocked_count = 0
keepout_block_cost = 1e9
# Convert indptr/indices to CPU for indexing (they're small, cached once)
if is_gpu:
if not hasattr(self, '_indptr_cpu_cache'):
self._indptr_cpu_cache = indptr.get()
self._indices_cpu_cache = indices.get()
indptr_cpu = self._indptr_cpu_cache
indices_cpu = self._indices_cpu_cache
else:
indptr_cpu = indptr
indices_cpu = indices
# Block via locations owned by OTHER nets
# CRITICAL: Via occupies NODE, so block ALL edges to/from that node!
for (z, x, y), owner_net in self._via_keepouts_map.items():
# Skip vias owned by current net (owner-aware!)
if owner_net == current_net_id:
continue
# Get node index - this node is OCCUPIED by another net's via barrel
node_idx = self.lattice.node_idx(x, y, z)
# Block ALL OUTGOING edges from this via node (no routing through via!)
start, end = int(indptr_cpu[node_idx]), int(indptr_cpu[node_idx + 1])
for edge_idx in range(start, end):
dst_node = int(indices_cpu[edge_idx])
dst_x, dst_y, dst_z = self.lattice.idx_to_coord(dst_node)
# Block ALL edges except vias in the same column (allow stacked vias)
# Only allow via edges if both nodes in same (x,y) column
if not (dst_x == x and dst_y == y and dst_z != z):
# This is either a planar edge OR a via to different (x,y) - block it!
if edge_idx not in self._via_keepout_backup:
if is_gpu:
self._via_keepout_backup[edge_idx] = float(costs[edge_idx])
else:
self._via_keepout_backup[edge_idx] = float(costs[edge_idx])
costs[edge_idx] = keepout_block_cost
blocked_count += 1
# NOTE: Owner-aware blocking removed - O(N×M) doesn't scale to 16k+ keepouts
# Causes 6+ second overhead per net (512 nets = 50+ minutes!)
#if blocked_count > 0:
# logger.info(f"[VIA-KEEPOUT-OWNER] Blocked {blocked_count} edges for net {current_net_id}")
return blocked_count
def _restore_via_keepout_costs(self, costs):
"""Restore original costs after owner-aware via keepout blocking"""
if not hasattr(self, '_via_keepout_backup') or not self._via_keepout_backup:
return
is_gpu = hasattr(costs, 'device')
for edge_idx, original_cost in self._via_keepout_backup.items():
costs[edge_idx] = original_cost
# Clear backup for next net
self._via_keepout_backup.clear()
def _apply_via_pooling_penalties(self, pres_fac: float):
"""
Apply via column and segment pooling penalties to vertical edge costs.
Uses GPU-accelerated CUDA kernel when available (800ms → <2ms speedup!)
Falls back to CPU vectorized implementation if GPU unavailable.
"""
import time
t0 = time.perf_counter()
if not hasattr(self, 'via_col_pres') and not hasattr(self, 'via_seg_pres'):
return
# Check if metadata is available
if not hasattr(self, '_via_edge_metadata') or self._via_edge_metadata is None:
logger.warning("[VIA-POOL] Metadata not built, falling back to sequential")
self._apply_via_pooling_penalties_sequential(pres_fac)
return
# Try GPU kernel first if available
if hasattr(self, 'via_kernel_manager') and self.via_kernel_manager.use_gpu:
try:
# Check if there are any penalties to apply (GPU can check this fast)
xp = cp if hasattr(self.via_col_pres, 'device') else np
if hasattr(self, 'via_col_pres'):
col_max = float(xp.max(self.via_col_pres))
if col_max == 0 and hasattr(self, 'via_seg_pres'):
seg_max = float(xp.max(self.via_seg_pres))
if seg_max == 0:
return # No penalties needed
col_weight = float(getattr(self.config, "via_column_weight", 1.0))
seg_weight = float(getattr(self.config, "via_segment_weight", 1.0))
penalty_count = self.via_kernel_manager.apply_via_penalties(
via_metadata=self._via_edge_metadata,
via_col_pres_gpu=self.via_col_pres,
via_seg_pres_gpu=self.via_seg_pres if hasattr(self, 'via_seg_pres') else None,
col_weight=col_weight * pres_fac,
seg_weight=seg_weight * pres_fac,
total_cost_gpu=self.accounting.total_cost,
Ny=self._Ny,
segZ=self._segZ if hasattr(self, '_segZ') else 0
)
return # GPU kernel succeeded
except Exception as e:
logger.warning(f"[VIA-POOL] GPU kernel failed: {e}, falling back to CPU")
# CPU fallback - original vectorized implementation
col_weight = float(getattr(self.config, "via_column_weight", 1.0))
seg_weight = float(getattr(self.config, "via_segment_weight", 1.0))
# Get cost array
total_cost = self.accounting.total_cost
if self.accounting.use_gpu:
total_cost_cpu = total_cost.get()
else:
total_cost_cpu = total_cost
# Get precomputed metadata
via_edge_indices = self._via_edge_metadata['indices']
via_xy_coords = self._via_edge_metadata['xy_coords']
z_lo = self._via_edge_metadata['z_lo']
z_hi = self._via_edge_metadata['z_hi']
num_via_edges = len(via_edge_indices)
if num_via_edges == 0:
return
# Initialize penalties array
penalties = np.zeros(num_via_edges, dtype=np.float32)
# Vectorized column penalty computation
if hasattr(self, 'via_col_pres'):
col_penalties = self.via_col_pres[via_xy_coords[:, 0], via_xy_coords[:, 1]]
penalties += col_weight * col_penalties
# Vectorized segment penalty computation (using prefix sums)
if hasattr(self, 'via_seg_prefix'):
# Compute prefix indices for range queries
# Segment index mapping: z-1→z is stored at index z-2
hi_idx = z_hi - 2 # Index for upper bound
lo_idx = z_lo - 2 # Index for lower bound
# Create masks for valid indices
valid_mask = z_hi > z_lo # Only process edges spanning multiple layers
hi_valid = (hi_idx >= 0) & (hi_idx < self._segZ)
lo_valid = (lo_idx >= 0) & (lo_idx < self._segZ)
# Fetch prefix values with bounds checking
pref_hi = np.zeros(num_via_edges, dtype=np.float32)
pref_lo = np.zeros(num_via_edges, dtype=np.float32)
# Use advanced indexing for valid entries
if np.any(hi_valid):
valid_hi_edges = hi_valid
pref_hi[valid_hi_edges] = self.via_seg_prefix[
via_xy_coords[valid_hi_edges, 0],
via_xy_coords[valid_hi_edges, 1],
hi_idx[valid_hi_edges]
]
if np.any(lo_valid):
valid_lo_edges = lo_valid
pref_lo[valid_lo_edges] = self.via_seg_prefix[
via_xy_coords[valid_lo_edges, 0],
via_xy_coords[valid_lo_edges, 1],
lo_idx[valid_lo_edges]
]
# Compute segment penalties: prefix[hi] - prefix[lo]
seg_penalties = (pref_hi - pref_lo) * valid_mask
penalties += seg_weight * seg_penalties
# STEP 2.7: Apply "leave-hot-layer" via discount using layer bias
if hasattr(self, 'layer_bias'):
k = float(getattr(self.config, 'via_hot_layer_discount', 0.20))
# Get source and destination layer biases for each via edge
src_bias = self.layer_bias[z_lo]
dst_bias = self.layer_bias[z_hi]
# Cheaper to leave hot layers, more expensive to land on hot layers
via_discount = (1.0 - k * np.maximum(src_bias, 0.0)) * (1.0 + 0.5 * k * np.maximum(dst_bias, 0.0))
penalties *= via_discount # Apply discount/markup to penalties
# Log discount statistics if significant
avg_discount = float(np.mean(via_discount))
if abs(avg_discount - 1.0) > 0.05:
logger.debug(f"[VIA-DISCOUNT] Average via discount factor: {avg_discount:.3f}")
# Apply penalties to cost array (vectorized)
penalty_mask = penalties > 0
total_cost_cpu[via_edge_indices[penalty_mask]] += pres_fac * penalties[penalty_mask]
penalties_applied = np.sum(penalty_mask)
# Update GPU if needed
if self.accounting.use_gpu:
self.accounting.total_cost[:] = cp.asarray(total_cost_cpu)
elapsed = time.perf_counter() - t0
if penalties_applied > 0:
logger.info(f"[VIA-POOL-PERF] Vectorized penalty application: {num_via_edges} edges, {penalties_applied} penalties in {elapsed:.3f}s")
else:
logger.debug(f"[VIA-POOL-PERF] No penalties applied ({num_via_edges} edges checked in {elapsed:.3f}s)")
def _apply_via_pooling_penalties_sequential(self, pres_fac: float):
"""Sequential fallback for via pooling penalties (for debugging/comparison)"""
col_weight = float(getattr(self.config, "via_column_weight", 1.0))
seg_weight = float(getattr(self.config, "via_segment_weight", 1.0))
# Get cost array
total_cost = self.accounting.total_cost
if self.accounting.use_gpu:
total_cost_cpu = total_cost.get()
else:
total_cost_cpu = total_cost
# Get graph data
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
idx_to_coord = self.lattice.idx_to_coord
penalties_applied = 0
# Find via edge indices (where _via_edges is True)
via_edge_indices = np.where(self._via_edges)[0]
for ei in via_edge_indices:
u = int(np.searchsorted(indptr, ei, side='right') - 1)
if 0 <= u < len(indptr) - 1 and indptr[u] <= ei < indptr[u + 1]:
v = int(indices[ei])
xu, yu, zu = idx_to_coord(u)
xv, yv, zv = idx_to_coord(v)
penalty = 0.0
# Column penalty
if hasattr(self, 'via_col_pres'):
penalty += col_weight * self.via_col_pres[xu, yu]
# Segment penalty (use prefix for fast range sum)
if hasattr(self, 'via_seg_prefix'):
z_lo, z_hi = (zu, zv) if zu < zv else (zv, zu)
z_lo = max(1, min(z_lo, self._Nz - 2))
z_hi = max(1, min(z_hi, self._Nz - 2))
if z_hi >= z_lo: # Allow equal (single-segment vias)
hi_idx = z_hi - 2
lo_idx = z_lo - 2
pref_hi = self.via_seg_prefix[xu, yu, hi_idx] if 0 <= hi_idx < self._segZ else 0.0
pref_lo = self.via_seg_prefix[xu, yu, lo_idx] if 0 <= lo_idx < self._segZ else 0.0
seg_sum = pref_hi - pref_lo
penalty += seg_weight * seg_sum
if penalty > 0:
total_cost_cpu[ei] += pres_fac * penalty
penalties_applied += 1
# Update GPU if needed
if self.accounting.use_gpu:
self.accounting.total_cost[:] = cp.asarray(total_cost_cpu)
if penalties_applied > 0:
logger.debug(f"[VIA-POOL] Sequential: Applied pooling penalties to {penalties_applied} via edges")
def _block_via_edges_with_collisions(self):
"""
Hard-block via edges with spatial collisions by setting costs to infinity.
This prevents PathFinder from using via edges where the column or any
spanned segment is already at capacity. Combined with soft penalties,
this ensures no via spatial violations in the final routing.
Uses GPU-accelerated CUDA kernel when available (30s → <1ms speedup!)
"""
if not hasattr(self, '_via_edge_metadata') or self._via_edge_metadata is None:
logger.warning("[HARD-BLOCK] No via edge metadata, skipping")
return
if not hasattr(self, 'via_col_use') and not hasattr(self, 'via_seg_use'):
# No via spatial tracking enabled
return
# Try GPU kernel first if available
if hasattr(self, 'via_kernel_manager') and self.via_kernel_manager.use_gpu:
try:
blocked_count = self.via_kernel_manager.hard_block_via_edges(
via_metadata=self._via_edge_metadata,
via_col_use_gpu=self.via_col_use,
via_col_cap_gpu=self.via_col_cap,
via_seg_use_gpu=self.via_seg_use if hasattr(self, 'via_seg_use') else None,
via_seg_cap_gpu=self.via_seg_cap if hasattr(self, 'via_seg_cap') else None,
total_cost_gpu=self.accounting.total_cost,
Ny=self._Ny,
segZ=self._segZ if hasattr(self, '_segZ') else 0
)
return # GPU kernel succeeded
except Exception as e:
logger.warning(f"[HARD-BLOCK] GPU kernel failed: {e}, falling back to CPU")
# CPU fallback
via_edges = self._via_edge_metadata
edge_indices = via_edges['indices']
xy_coords = via_edges['xy_coords']
z_lo = via_edges['z_lo']
z_hi = via_edges['z_hi']
# Convert from GPU to CPU if needed
if hasattr(edge_indices, 'get'):
edge_indices = edge_indices.get()
if hasattr(xy_coords, 'get'):
xy_coords = xy_coords.get()
if hasattr(z_lo, 'get'):
z_lo = z_lo.get()
if hasattr(z_hi, 'get'):
z_hi = z_hi.get()
# Get cost array
total_cost = self.accounting.total_cost
if self.accounting.use_gpu:
total_cost_cpu = total_cost.get()
else:
total_cost_cpu = total_cost
# Get via arrays (convert from GPU if needed)
via_col_use = self.via_col_use.get() if hasattr(self.via_col_use, 'get') else self.via_col_use
via_col_cap = self.via_col_cap.get() if hasattr(self.via_col_cap, 'get') else self.via_col_cap
via_seg_use = self.via_seg_use.get() if hasattr(self, 'via_seg_use') and hasattr(self.via_seg_use, 'get') else getattr(self, 'via_seg_use', None)
via_seg_cap = self.via_seg_cap.get() if hasattr(self, 'via_seg_cap') and hasattr(self.via_seg_cap, 'get') else getattr(self, 'via_seg_cap', None)
blocked_count = 0
for i in range(len(edge_indices)):
xu, yu = int(xy_coords[i, 0]), int(xy_coords[i, 1])
z_start, z_end = int(z_lo[i]), int(z_hi[i])
edge_idx = edge_indices[i]
# Check column capacity
col_blocked = False
if via_col_use is not None and via_col_cap is not None:
if via_col_use[xu, yu] >= via_col_cap[xu, yu]:
col_blocked = True
# Check segment capacity for all spanned segments
seg_blocked = False
if via_seg_use is not None and via_seg_cap is not None:
for z in range(z_start, z_end):
seg_idx = z - 1 # Segments indexed from 0
if 0 <= seg_idx < self._segZ:
if via_seg_use[xu, yu, seg_idx] >= via_seg_cap[xu, yu, seg_idx]:
seg_blocked = True
break
# Hard-block if at capacity
if col_blocked or seg_blocked:
total_cost_cpu[edge_idx] = np.float32('inf')
blocked_count += 1
# Copy back to GPU if needed
if self.accounting.use_gpu:
total_cost[:len(total_cost_cpu)] = cp.asarray(total_cost_cpu)
if blocked_count > 0:
logger.info(f"[HARD-BLOCK-CPU] Blocked {blocked_count} via edges at capacity (using CPU fallback)")
def _pathfinder_negotiation(self, tasks: Dict[str, Tuple[int, int]], progress_cb=None, iteration_cb=None) -> Dict:
"""CORE PATHFINDER ALGORITHM WITH AUTO-CONFIGURATION"""
cfg = self.config
# ====================================================================
# STEP 0: AUTO-CONFIGURE FROM BOARD CHARACTERISTICS
# ====================================================================
# Analyze board and derive optimal parameters (no manual tuning!)
board_chars = analyze_board_characteristics(self.lattice, tasks)
derived_params = derive_routing_parameters(board_chars)
# Apply derived parameters to config (can be overridden by env vars)
apply_derived_parameters(cfg, derived_params)
# Store H/V layer assignments for later use
self.h_layers = board_chars.h_layers
self.v_layers = board_chars.v_layers
# Load params into local variables with defaults (ensures new config values used)
# Scale parameters by layer count for self-tuning
n_sig_layers = self._Nz - 2 # Exclude F.Cu and B.Cu (layers 0 and Nz-1)
# Base parameters from config
pres_fac = float(getattr(cfg, 'pres_fac_init', 1.0))
pres_fac_mult = float(getattr(cfg, 'pres_fac_mult', 1.15))
pres_fac_max_base = float(getattr(cfg, 'pres_fac_max', 8.0))
hist_gain = float(getattr(cfg, 'hist_gain', 0.8))
# Scale by layer count (fewer layers = stronger penalties needed)
if n_sig_layers <= 12:
pres_fac_max = 32.0 # Increased from 6.0 to allow convergence
hist_cost_weight_mult = 1.2 # 12.0 for few layers
elif n_sig_layers <= 20:
pres_fac_max = 64.0 # Increased from 8.0 to allow convergence
hist_cost_weight_mult = 1.0 # 10.0
else:
pres_fac_max = 128.0 # Increased from 10.0 to allow convergence
hist_cost_weight_mult = 0.8 # 8.0 for many layers
# Allow env overrides for testing
pres_fac_mult = float(os.getenv('ORTHO_PRES_FAC_MULT', pres_fac_mult))
pres_fac_max = float(os.getenv('ORTHO_PRES_FAC_MAX', pres_fac_max))
hist_gain = float(os.getenv('ORTHO_HIST_GAIN', hist_gain))
best_overuse = float('inf')
stagnant = 0
prev_over_sum = float('inf')
self._negotiation_ran = True
logger.info(f"[NEGOTIATE] {len(tasks)} nets, {cfg.max_iterations} iters")
logger.info(f"[PARAMS] layers={n_sig_layers} pres_fac_init={pres_fac:.2f} pres_fac_mult={pres_fac_mult:.2f} pres_fac_max={pres_fac_max:.0f} hist_gain={hist_gain:.2f} hist_weight={cfg.hist_cost_weight * hist_cost_weight_mult:.1f}")
# Build edge→src mapping for barrel conflict detection
logger.warning("[BARREL-CONFLICT-INIT] Building edge_src_map once before routing")
self._ensure_edge_src_map()
for it in range(1, cfg.max_iterations + 1):
self.iteration = it
logger.info(f"[ITER {it}] pres_fac={pres_fac:.2f}")
# Log iteration 1 always-connect policy
if it == 1 and cfg.iter1_always_connect:
logger.info("[ITER-1-POLICY] Always-connect mode: soft costs only (no hard blocks)")
# STEP 0: Clean accounting rebuild (iter 2+)
if it > 1 and cfg.reroute_only_offenders:
# Rebuild usage from all currently routed nets before building hotset
committed_nets = {nid for nid, path in self.net_paths.items() if path}
self._rebuild_usage_from_committed_nets(committed_nets)
else:
# STEP 1: Refresh (iter 1 only, or if not using hotsets)
self.accounting.refresh_from_canonical()
# STEP 1.5: Rebuild via column/segment usage from committed paths
self._rebuild_via_usage_from_committed()
# STEP 1.6: Smooth via present costs and build segment prefix
alpha = float(getattr(cfg, "via_present_alpha", 0.6))
beta = float(getattr(cfg, "via_present_beta", 0.4))
if hasattr(self, 'via_col_use'):
over = np.maximum(0, self.via_col_use - self.via_col_cap).astype(np.float32)
self.via_col_pres = alpha * over + beta * self.via_col_pres
if hasattr(self, 'via_seg_use'):
over = np.maximum(0, self.via_seg_use - self.via_seg_cap).astype(np.float32)
self.via_seg_pres = alpha * over + beta * self.via_seg_pres
# Build cumulative prefix along z axis for fast range queries
np.cumsum(self.via_seg_pres, axis=2, out=self.via_seg_prefix)
# OPTIMIZATION: Cache GPU arrays to avoid redundant transfers (200MB × 4-5 times = slow!)
# This saves 2-4 seconds per iteration by transferring once instead of 4-5 times
if self.accounting.use_gpu:
present_cpu_cache = self.accounting.present.get()
cap_cpu_cache = self.accounting.capacity.get()
else:
present_cpu_cache = self.accounting.present
cap_cpu_cache = self.accounting.capacity
# STEP 1.7: Update present EMA for stable convergence
# Initialize present_ema on first iteration
if it == 1:
self.accounting.present_ema = self.accounting.present.copy()
# Heavy smoothing for stability (40% new, 60% old)
present_ema_beta = float(getattr(cfg, 'present_ema_beta', 0.40))
self.accounting.update_present_ema(beta=present_ema_beta)
# STEP 1.8: Compute layer bias for layer balancing
layer_bias = self._compute_layer_bias(
self.accounting, self.graph,
num_layers=self.lattice.layers,
alpha=0.88, # Balanced smoothing
max_boost=1.80 # Conservative penalty (baseline from document)
)
# STEP 2: Update costs (with history weight and via annealing)
# Via policy: anneal via cost when pres_fac >= 64 (lowered to trigger earlier)
via_cost_mult = 1.0
if pres_fac >= 64:
# Check if >70% of overuse is on vias
# OPTIMIZATION: Use cached GPU transfers
present = present_cpu_cache
cap = cap_cpu_cache
over = np.maximum(0, present - cap)
# Use numpy boolean indexing for efficient via overuse calculation
via_overuse = float(over[self._via_edges[:len(over)]].sum())
total_overuse = float(over.sum())
if total_overuse > 0:
via_ratio = via_overuse / total_overuse
if via_ratio > 0.7:
# Most overuse is on vias: increase via cost to widen horizontal corridors
via_cost_mult = 1.5
logger.info(f"[VIA POLICY] {via_ratio*100:.1f}% via overuse → increasing via cost by 1.5x")
else:
# Normal case: reduce via cost to enable layer hopping
via_cost_mult = 0.5
logger.info(f"[VIA POLICY] Late-stage annealing: via_cost *= 0.5")
# Get edge_layer and edge_kind for layer balancing
edge_layer_arr = (self.graph.edge_layer_gpu if self.accounting.use_gpu
else self.graph.edge_layer) if hasattr(self.graph, 'edge_layer') else None
edge_kind_arr = (self.graph.edge_kind_gpu if self.accounting.use_gpu
else self.graph.edge_kind) if hasattr(self.graph, 'edge_kind') else None
# CRITICAL: Cost update ONCE per iteration (PathFinder design)
# Toggle incremental updates via env var INCREMENTAL_COST_UPDATE=1
if os.getenv("INCREMENTAL_COST_UPDATE") == "1" and hasattr(self, '_changed_edges_previous_iteration'):
# INCREMENTAL: Only update edges that changed in previous iteration
changed_edges = self._changed_edges_previous_iteration
if hasattr(self.accounting, 'update_costs_incremental'):
self.accounting.update_costs_incremental(
changed_edges,
self.graph.base_costs, pres_fac,
hist_weight=cfg.hist_cost_weight,
via_cost_multiplier=via_cost_mult,
base_cost_weight=cfg.base_cost_weight,
edge_layer=edge_layer_arr,
layer_bias_per_layer=layer_bias,
edge_kind=edge_kind_arr
)
logger.debug(f"[ITER {it}] Incremental cost update: {len(changed_edges)} edges")
else:
# Fallback if incremental not implemented
self.accounting.update_costs(
self.graph.base_costs, pres_fac, cfg.hist_cost_weight,
via_cost_multiplier=via_cost_mult,
base_cost_weight=cfg.base_cost_weight,
edge_layer=edge_layer_arr,
layer_bias_per_layer=layer_bias,
edge_kind=edge_kind_arr
)
else:
# FULL: Update all edges (default PathFinder behavior) - use hist_cost_weight with layer scaling
hist_weight_scaled = cfg.hist_cost_weight * hist_cost_weight_mult
self.accounting.update_costs(
self.graph.base_costs, pres_fac, hist_weight_scaled,
via_cost_multiplier=via_cost_mult,
base_cost_weight=cfg.base_cost_weight,
edge_layer=edge_layer_arr,
layer_bias_per_layer=layer_bias,
edge_kind=edge_kind_arr
)
# STEP 2.5: Apply via column/segment pooling penalties
# NOTE: Layer balancing now enabled (vectorized, applied in update_costs)
self._apply_via_pooling_penalties(pres_fac)
# STEP 2.6: Hard-block via edges with spatial collisions (iteration 2+)
# This prevents PathFinder from using edges that would cause via collisions
if it > 1: # Allow greedy first pass without hard blocking
self._block_via_edges_with_collisions()
# STEP 2.9: Exclude persistently failing nets (CRITICAL for large boards)
# Track nets that fail repeatedly and exclude them to prevent thrashing
# ONLY ACTIVATE AFTER ITERATION 4 to allow greedy routing to complete
if not hasattr(self, '_net_failure_count'):
self._net_failure_count = {} # net_id -> consecutive failures
self._excluded_nets = set() # nets we've given up on
# Every 10 iterations, give excluded nets another chance (conditions may have improved)
# This prevents permanent exclusion of temporarily unroutable nets
if it > 10 and it % 10 == 0 and self._excluded_nets:
num_to_retry = len(self._excluded_nets)
logger.warning(f"[EXCLUDE-RETRY] Iteration {it}: Giving {num_to_retry} excluded nets another chance")
self._excluded_nets.clear()
self._net_failure_count.clear()
# Only start tracking failures after iteration 4
if it > 4:
# Update failure counts based on which nets have paths
for net_id in tasks.keys():
has_path = bool(self.net_paths.get(net_id))
if not has_path:
# Net failed to route
self._net_failure_count[net_id] = self._net_failure_count.get(net_id, 0) + 1
else:
# Net successfully routed - reset failure count
self._net_failure_count[net_id] = 0
# Exclude nets that have failed 5+ consecutive times
FAILURE_THRESHOLD = 5
newly_excluded = set()
for net_id, failures in self._net_failure_count.items():
if failures >= FAILURE_THRESHOLD and net_id not in self._excluded_nets:
self._excluded_nets.add(net_id)
newly_excluded.add(net_id)
if newly_excluded:
logger.warning(f"[EXCLUDE] Giving up on {len(newly_excluded)} nets after {FAILURE_THRESHOLD} failed attempts: {list(newly_excluded)[:5]}...")
if self._excluded_nets:
logger.info(f"[EXCLUDE] {len(self._excluded_nets)} nets permanently excluded from routing")
# STEP 3: Route (hotset incremental after iter 1)
# DIAGNOSTIC OVERRIDE: Test with hotset disabled
FORCE_ROUTE_ALL = False # Set to False to restore hotset behavior (TESTING: hotset ENABLED)
if FORCE_ROUTE_ALL:
sub_tasks = {k: v for k, v in tasks.items() if k not in self._excluded_nets}
logger.info(f" [DIAGNOSTIC] Routing ALL {len(sub_tasks)} nets every iteration (hotset DISABLED for testing)")
elif cfg.reroute_only_offenders and it > 1:
# Pass ripped set to _build_hotset (Fix 2)
offenders = self._build_hotset(tasks, ripped=getattr(self, "_last_ripped", set()))
# Exclude permanently failed nets from offenders
offenders = offenders - self._excluded_nets
sub_tasks = {k: v for k, v in tasks.items() if k in offenders and k not in self._excluded_nets}
logger.info(f" Hotset: {len(offenders)}/{len(tasks)} nets")
# Clear _last_ripped after use
self._last_ripped = set()
else:
sub_tasks = {k: v for k, v in tasks.items() if k not in self._excluded_nets}
# ANTI-OSCILLATION: Shuffle net order each iteration to break deterministic patterns
# This prevents the same nets from always winning/losing in the same order
if it > 1:
import random
net_ids = list(sub_tasks.keys())
random.seed(42 + it) # Deterministic but different each iteration
random.shuffle(net_ids)
sub_tasks = {nid: sub_tasks[nid] for nid in net_ids}
logger.debug(f"[SHUFFLE] Randomized net order for iteration {it}")
routed, failed = self._route_all(sub_tasks, all_tasks=tasks, pres_fac=pres_fac, iteration=it)
# Track changed edges for next iteration's incremental cost update
# Collect all edges used by nets routed in this iteration
changed_edges = set()
for net_id in sub_tasks.keys():
if net_id in self._net_to_edges:
changed_edges.update(self._net_to_edges[net_id])
# Store for next iteration (will be used by incremental cost update)
self._changed_edges_previous_iteration = changed_edges
# CRITICAL: Refresh present to reflect committed paths
self.accounting.refresh_from_canonical()
# ACCOUNTING SANITY CHECK: Verify present matches canonical
if not self.accounting.verify_present_matches_canonical():
logger.warning(f"[ITER {it}] Accounting mismatch detected - potential bug")
# STEP 3.5: CRITICAL - Detect via barrel conflicts BEFORE computing overuse
# TWO-PHASE APPROACH:
# Phase 1 (iters 1-50): Aggressively penalize barrel conflicts - check every iteration
# Phase 2 (iters 51+): Disable penalties - only check every 10 iterations (for reporting)
BARREL_PHASE_1_ITERS = 50
# Detect barrel conflicts (expensive on large boards - skip in Phase 2 except every 10 iters)
if it <= BARREL_PHASE_1_ITERS or it % 10 == 0:
conflict_edge_indices, conflict_count = self._detect_barrel_conflicts()
else:
# Skip detection, use last known count
conflict_edge_indices = np.array([], dtype=np.int32)
conflict_count = getattr(self, '_last_barrel_conflict_count', 0) # Aggressive barrel conflict reduction
# Store barrel conflict count for iteration summary
self._last_barrel_conflict_count = conflict_count
if conflict_count > 0:
logger.debug(f"[BARREL-CONFLICT] Detected {conflict_count} barrel conflicts in iteration {it}")
if it <= BARREL_PHASE_1_ITERS:
# PHASE 1: Apply penalty to reduce barrel conflicts
pres = self.accounting.present
conflict_penalty = min(10.0 * pres_fac, 100.0) # Cap at 100
if hasattr(pres, 'get'):
# GPU array
import cupy as cp
conflict_indices_gpu = cp.asarray(conflict_edge_indices)
pres[conflict_indices_gpu] += conflict_penalty
else:
# CPU array
pres[conflict_edge_indices] += conflict_penalty
logger.debug(f"[BARREL-CONFLICT] PHASE 1: Applied penalty {conflict_penalty:.1f} to {conflict_count} edges")
else:
# PHASE 2: Detection only, no penalty - let PathFinder optimize
if it == BARREL_PHASE_1_ITERS + 1:
logger.info(f"[BARREL-CONFLICT] PHASE 2: Barrel penalties disabled (iter > {BARREL_PHASE_1_ITERS})")
# STEP 4: Overuse (now includes barrel conflicts as usage)
over_sum, over_cnt = self.accounting.compute_overuse(router_instance=self)
# Instrumentation: via overuse ratio
# OPTIMIZATION: Use cached GPU transfers
present = present_cpu_cache
cap = cap_cpu_cache
over = np.maximum(0, present - cap)
# Use numpy boolean indexing for efficient via overuse calculation
via_overuse = float(over[self._via_edges[:len(over)]].sum())
via_ratio = (via_overuse / over_sum * 100) if over_sum > 0 else 0.0
# Check barrel conflicts
barrel_conflicts = getattr(self, '_last_barrel_conflict_count', 0)
# Clean consolidated iteration summary (WARNING level so it shows in console)
status = "✓ CONVERGED" if over_sum == 0 else f"overuse={over_sum}"
barrel_info = f" barrel={barrel_conflicts}" if barrel_conflicts > 0 else ""
logger.warning(f"[ITER {it:3d}] nets={routed}/{routed+failed} {status} edges={over_cnt} via_overuse={via_ratio:.0f}%{barrel_info}")
# DIAGNOSTIC: Verify history is growing (not capped at 1.0) - only first 3 iterations
if it <= 3:
hist_max = float(self.accounting.history.max())
if hist_max <= 1.1:
logger.warning(f"[HISTORY] Iter {it}: hist_max={hist_max:.1f} (still capped?)")
else:
logger.debug(f"[HISTORY] Iter {it}: hist_max={hist_max:.1f} (growing)")
# Convergence diagnostics (every 10 iterations, reduced from every 5)
if it % 10 == 0:
hist_sum = float(self.accounting.history.sum())
pres_ema_sum = float(self.accounting.present_ema.sum())
cost_ratio = hist_sum / (pres_fac * pres_ema_sum + 1e-9)
logger.debug(f"[CONVERGENCE] pres_fac={pres_fac:.2f} hist_gain={hist_gain:.2f} balance={cost_ratio:.2f}")
if cost_ratio < 0.1 or cost_ratio > 10.0:
logger.warning(f"[CONVERGENCE] Cost imbalance: ratio={cost_ratio:.3f} (target: 0.5-2.0)")
# INSTRUMENTATION: Report hard wall count for iter-1 (must be 0)
if it == 1 and hasattr(self, '_iter1_inf_writes'):
inf_total = self._iter1_inf_writes
if inf_total == 0:
logger.info(f"[ITER-1-HARDWALLS] ✓ count=0 (no infinite costs in iteration 1)")
else:
logger.error(f"[ITER-1-HARDWALLS] ✗ count={inf_total} (BUG: infinite costs found in iteration 1!)")
self._iter1_inf_writes = 0 # Reset for next test
# Instrumentation: Via pooling statistics
# OPTIMIZATION: Reduced frequency to every 10 iterations (0.5-1s speedup)
if it % 10 == 0 and (hasattr(self, 'via_col_use') or hasattr(self, 'via_seg_use')):
if hasattr(self, 'via_col_use'):
cols_used = np.sum(self.via_col_use > 0)
cols_over = np.sum(self.via_col_use > self.via_col_cap)
max_col_use = int(np.max(self.via_col_use))
max_col_pres = float(np.max(self.via_col_pres))
mean_col_pres = float(np.mean(self.via_col_pres[self.via_col_pres > 0])) if np.any(self.via_col_pres > 0) else 0.0
logger.info(f"[VIA-POOL] Columns: used={cols_used}, over_cap={cols_over}, max_use={max_col_use}, max_pres={max_col_pres:.2f}, mean_pres={mean_col_pres:.2f}")
if hasattr(self, 'via_seg_use'):
segs_used = np.sum(self.via_seg_use > 0)
segs_over = np.sum(self.via_seg_use > self.via_seg_cap)
max_seg_use = int(np.max(self.via_seg_use))
max_seg_pres = float(np.max(self.via_seg_pres))
max_seg_prefix = float(np.max(self.via_seg_prefix))
logger.info(f"[VIA-POOL] Segments: used={segs_used}, over_cap={segs_over}, max_use={max_seg_use}, max_pres={max_seg_pres:.2f}, max_prefix={max_seg_prefix:.2f}")
# Instrumentation: Per-layer congestion breakdown (GPU-vectorized - fast!)
overuse_by_layer = None
if over_sum > 0:
overuse_by_layer = self._log_per_layer_congestion(over)
# Top-10 channels only every 10 iters (still expensive due to coordinate conversion)
if over_sum > 0 and it % 10 == 0:
self._log_top_overused_channels(over, top_k=10)
# Update layer bias from horizontal overuse (EWMA for stability)
if hasattr(self, 'layer_bias') and hasattr(self.graph, 'edge_layer'):
if overuse_by_layer is None and over_sum > 0:
overuse_by_layer = self._log_per_layer_congestion(over)
if overuse_by_layer and sum(overuse_by_layer.values()) > 0:
# Compute pressure per layer (normalized to mean)
layer_overuse = np.array([overuse_by_layer.get(z, 0.0) for z in range(self._Nz)])
mean_overuse = np.mean(layer_overuse[layer_overuse > 0]) + 1e-9
# Pressure ratio: >1.0 = hotter than average, <1.0 = cooler
pressure = layer_overuse / mean_overuse
# Target bias (1.0 = neutral, <1.0 = cheaper, >1.0 = more expensive)
# Scale alpha by layer count: fewer layers need stronger balancing
n_sig_layers = self._Nz - 2 # Exclude F.Cu and B.Cu
if n_sig_layers <= 12:
alpha = 0.20 # Strong balancing for sparse stacks
bias_min, bias_max = 0.60, 1.80
elif n_sig_layers <= 20:
alpha = 0.12
bias_min, bias_max = 0.75, 1.50
else:
alpha = 0.08
bias_min, bias_max = 0.85, 1.20
target_bias = 1.0 + alpha * (pressure - 1.0)
# EWMA smoothing
self.layer_bias = 0.85 * self.layer_bias + 0.15 * target_bias
self.layer_bias = np.clip(self.layer_bias, bias_min, bias_max)
# Log top biases
top_layers = sorted(enumerate(self.layer_bias), key=lambda x: x[1], reverse=True)[:3]
if any(abs(bias - 1.0) > 0.03 for _, bias in top_layers):
logger.info(f"[LAYER-BIAS] Hot layers: " +
", ".join([f"L{z}:{bias:.3f}" for z, bias in top_layers if 1 <= z < self._Nz-1]))
# Layer jam breaker: Detect if one layer dominates overuse
total_layer_overuse = sum(overuse_by_layer.values())
if total_layer_overuse > 0:
# Find layer with highest overuse percentage
max_layer = max(overuse_by_layer.items(), key=lambda x: x[1])
max_layer_idx, max_layer_overuse = max_layer
jam_percentage = (max_layer_overuse / total_layer_overuse) * 100
# Track consecutive jams
if not hasattr(self, '_layer_jam_tracker'):
self._layer_jam_tracker = {'layer': None, 'count': 0, 'boost_until': 0}
if jam_percentage >= 60.0:
if self._layer_jam_tracker['layer'] == max_layer_idx:
self._layer_jam_tracker['count'] += 1
else:
self._layer_jam_tracker = {'layer': max_layer_idx, 'count': 1, 'boost_until': 0}
# If jammed for 3+ iterations, temporarily boost that layer's bias
if self._layer_jam_tracker['count'] >= 3 and it > self._layer_jam_tracker['boost_until']:
# Boost toward bias_max to make layer more expensive
# Since bias < 1.0 means cheaper, we need to increase it toward bias_max
old_bias = self.layer_bias[max_layer_idx]
self.layer_bias[max_layer_idx] = min(old_bias * 1.8, bias_max) # Stronger boost
self._layer_jam_tracker['boost_until'] = it + 3 # Hold longer
logger.warning(f"[LAYER-JAM] Layer {max_layer_idx} jammed at {jam_percentage:.1f}% for {self._layer_jam_tracker['count']} iters → boosting bias {old_bias:.3f}{self.layer_bias[max_layer_idx]:.3f} for 3 iters")
else:
# Reset if no longer jammed
self._layer_jam_tracker = {'layer': None, 'count': 0, 'boost_until': 0}
# Clean-phase: if overuse==0, freeze good nets and finish stragglers
if over_sum == 0:
unrouted = {nid for nid in tasks.keys() if not self.net_paths.get(nid)}
if not unrouted:
logger.info("[CLEAN] All nets routed with zero overuse")
break
# Freeze placed nets and lower pressure for stragglers
placed = {nid for nid in tasks.keys() if self.net_paths.get(nid)}
pres_fac = min(pres_fac, 10.0)
logger.info(f"[CLEAN] Overuse=0, {len(unrouted)} unrouted left → freeze {len(placed)} nets, pres_fac={pres_fac:.2f}")
# STEP 5: History (use local hist_gain variable, optionally boost after iter 8)
hist_gain_eff = hist_gain
if it >= 8 and over_sum > 0:
# Strengthen history memory after learning phase
hist_gain_eff = min(1.2, hist_gain * 1.25)
# TEST: Allow env override to disable history cap and use raw present
use_history_cap = os.getenv('ORTHO_NO_HISTORY_CAP', '0') == '0'
hist_base_costs = self.graph.base_costs if use_history_cap else None
hist_cap_mult = 15.0 if use_history_cap else 1.0 # Reduced from 100.0 to prevent history dominance
# FIX: Use raw present for history (present_ema lags significantly in early iterations)
use_raw_present = True # was: os.getenv('ORTHO_RAW_PRESENT_FOR_HIST', '0') == '1'
# Apply history decay after iteration 10 to prevent oscillation
# Early iterations (1-10): No decay (1.0) to build up history
# Later iterations (11+): Gentle decay (0.98) allows route redistribution
history_decay = 0.98 if it >= 10 else 1.0
self.accounting.update_history(
hist_gain_eff,
base_costs=hist_base_costs,
history_cap_multiplier=hist_cap_mult,
decay_factor=history_decay,
use_raw_present=use_raw_present
)
# Progress callback
if progress_cb:
try:
progress_cb(it, cfg.max_iterations, f"Iteration {it}")
except:
pass
# Iteration callback for screenshots (generate provisional geometry)
if iteration_cb:
try:
# Generate current routing state for visualization
provisional_tracks, provisional_vias = self._generate_geometry_from_paths()
# CRITICAL: Merge escape geometry for complete board state visualization
# Without this, iteration screenshots only show routed nets, not pad escapes
if hasattr(self, '_escape_tracks') and self._escape_tracks:
# Deduplicate helper
def _dedupe(items, key_fn):
seen, out = set(), []
for item in items:
k = key_fn(item)
if k not in seen:
seen.add(k)
out.append(item)
return out
# Merge escapes + routed geometry
combined_tracks = self._escape_tracks + provisional_tracks
combined_vias = self._escape_vias + provisional_vias
# Deduplicate by geometric signature (safe key access to avoid None subscripting)
def safe_track_key(t):
start = t.get("start") or t.get("x1", 0), t.get("y1", 0)
end = t.get("end") or t.get("x2", 0), t.get("y2", 0)
return (t.get("net"), t.get("layer"),
round(start[0] if isinstance(start, (list, tuple)) else 0, 3),
round(start[1] if isinstance(start, (list, tuple)) else 0, 3),
round(end[0] if isinstance(end, (list, tuple)) else 0, 3),
round(end[1] if isinstance(end, (list, tuple)) else 0, 3),
round(t.get("width", 0), 3))
def safe_via_key(v):
at_pos = v.get("at") or v.get("x", 0), v.get("y", 0)
return (v.get("net"),
round(at_pos[0] if isinstance(at_pos, (list, tuple)) else 0, 3),
round(at_pos[1] if isinstance(at_pos, (list, tuple)) else 0, 3),
v.get("layers") or (v.get("from_layer"), v.get("to_layer")),
round(v.get("size", 0), 3),
round(v.get("drill", 0), 3),
round(v.get("diameter", 0), 3))
provisional_tracks = _dedupe(combined_tracks, safe_track_key)
provisional_vias = _dedupe(combined_vias, safe_via_key)
logger.debug(f"[ITER {it}] Screenshot: escapes={len(self._escape_tracks)} + "
f"routed={len(provisional_tracks) - len(self._escape_tracks)}"
f"total={len(provisional_tracks)} tracks, {len(provisional_vias)} vias")
iteration_cb(it, provisional_tracks, provisional_vias, over_sum, over_cnt)
except Exception as e:
logger.warning(f"[ITER {it}] Iteration callback failed: {e}")
# STEP 6: Terminate?
if failed == 0 and over_sum == 0:
barrel_conflicts = getattr(self, '_last_barrel_conflict_count', 0)
if barrel_conflicts == 0:
logger.info("[SUCCESS] Zero overuse, zero failed nets, AND zero barrel conflicts!")
else:
logger.info(f"[SUCCESS] Zero overuse and zero failed nets ({barrel_conflicts} barrel conflicts remain)")
# Final collision detection validation
edges_over_capacity = [(e, usage) for e, usage in self.accounting.edge_usage.items() if usage > 1]
if edges_over_capacity:
logger.error(f"[COLLISION] {len(edges_over_capacity)} edges over capacity (SHOULD BE ZERO!)")
for e, usage in edges_over_capacity[:10]: # Show first 10
logger.error(f"[COLLISION] Edge {e}: {usage} nets (capacity=1)")
else:
logger.info("[COLLISION] 0 edges over capacity ✓ PERFECT!")
# Check barrel conflicts before declaring full convergence
if barrel_conflicts > 0:
logger.warning(f"[CONVERGENCE] Edge overuse=0 but {barrel_conflicts} barrel conflicts remain")
logger.warning(f"[CONVERGENCE] Continuing to iteration {it+1} to resolve barrel conflicts...")
# Don't return yet - continue iterating to resolve barrel conflicts
else:
# TRUE convergence: zero edge overuse AND zero barrel conflicts
logger.info("[COLLISION] 0 edges over capacity ✓ PERFECT!")
# Log GPU vs CPU pathfinding statistics
total_paths = self._gpu_path_count + self._cpu_path_count
if total_paths > 0:
gpu_pct = (self._gpu_path_count / total_paths) * 100
logger.info(f"[GPU-STATS] GPU: {self._gpu_path_count} paths ({gpu_pct:.1f}%), CPU: {self._cpu_path_count} paths ({100-gpu_pct:.1f}%)")
return {'success': True, 'paths': self.net_paths, 'converged': True}
if over_sum < best_overuse:
best_overuse = over_sum
stagnant = 0
else:
stagnant += 1
# Plateau kicker: If stuck for 2 iterations, boost present pressure
if stagnant == 2:
old_pres_fac = pres_fac
pres_fac = min(pres_fac * 1.5, pres_fac_max)
logger.info(f"[PLATEAU-KICK] Stagnant for 2 iters, boosting pres_fac: {old_pres_fac:.2f}{pres_fac:.2f}")
if stagnant >= cfg.stagnation_patience:
self.stagnation_counter += 1 # Track cumulative stagnation events
victims = self._rip_top_k_offenders(k=20) # Only rip 16-24 worst nets
self._last_ripped = victims # Store for next hotset build (Fix 2)
# Freeze pres_fac for next 2 iterations to let smaller hotset settle (Fix 4)
self._freeze_pres_fac_until = it + 2
logger.warning(f"[STAGNATION {self.stagnation_counter}] Ripped {len(victims)} nets, "
f"holding pres_fac for 2 iters, ROI margin now +{self.stagnation_counter*0.6:.1f}mm")
stagnant = 0
continue
# STEP 7: Escalate with anti-thrash damper
if it <= getattr(self, "_freeze_pres_fac_until", 0):
logger.debug(f"[ITER {it}] Holding pres_fac={pres_fac:.2f} post-rip")
else:
# DISABLED: Anti-thrash backoff creates oscillation - use simple exponential growth
# if over_sum >= 0.995 * prev_over_sum:
# stagnant += 1
# else:
# stagnant = 0
# if stagnant >= 2:
# pres_fac = max(1.0, pres_fac * 0.90)
# stagnant = 0
# logger.info(f"[ANTI-THRASH] stagnant=2 → pres_fac reduced to {pres_fac:.2f}")
# else:
# pres_fac = min(pres_fac * pres_fac_mult, pres_fac_max)
# Simple exponential growth - no backoff
pres_fac = min(pres_fac * pres_fac_mult, pres_fac_max)
logger.debug(f"[ESCALATE] pres_fac={pres_fac:.2f}")
# CRITICAL: Update prev_over_sum for next iteration's anti-thrash check
prev_over_sum = over_sum
# If we exited with low overuse (<100), run detail pass
if 0 < over_sum <= 100:
logger.info(f"[DETAIL PASS] Overuse={over_sum} at max_iters, running detail refinement")
detail_result = self._detail_pass(tasks, over_sum, over_cnt)
if detail_result['success']:
return detail_result
# SOFT-FAIL: Analyze if more layers needed
layer_recommendation = self._analyze_layer_requirements(failed, over_cnt, over_sum)
# Log GPU vs CPU pathfinding statistics
total_paths = self._gpu_path_count + self._cpu_path_count
if total_paths > 0:
gpu_pct = (self._gpu_path_count / total_paths) * 100
logger.info(f"[GPU-STATS] GPU: {self._gpu_path_count} paths ({gpu_pct:.1f}%), CPU: {self._cpu_path_count} paths ({100-gpu_pct:.1f}%)")
# Only show warning if routing is actually incomplete
if failed > 0 or over_sum > 0:
logger.warning("="*80)
logger.warning(f"ROUTING INCOMPLETE: {failed}/{len(tasks)} nets failed ({failed/len(tasks)*100:.1f}%)")
logger.warning(f" Overuse: {over_cnt} edges with {over_sum} total conflicts")
if layer_recommendation['needs_more']:
logger.warning(f" RECOMMENDATION: Add {layer_recommendation['additional']} more layers (→{layer_recommendation['recommended_total']} total)")
logger.warning(f" Reason: {layer_recommendation['reason']}")
else:
logger.warning(f" Current layer count ({self.lattice.layers}) appears adequate")
logger.warning(f" Convergence may improve with tuning or may have reached practical limit")
logger.warning("="*80)
else:
logger.info("="*80)
logger.info(f"ROUTING COMPLETE: All {len(tasks)} nets routed successfully with zero overuse!")
logger.info("="*80)
# Determine success based on edge convergence
# Note: Barrel conflicts are reported but don't affect success/convergence
final_barrel_conflicts = getattr(self, '_last_barrel_conflict_count', 0)
excluded_nets = getattr(self, '_excluded_nets', set())
success = (failed == 0 and over_sum == 0)
if success and final_barrel_conflicts > 0:
logger.info(f"[FINAL] Edge routing converged ({final_barrel_conflicts} barrel conflicts remain - acceptable)")
if excluded_nets:
logger.warning(f"[FINAL] {len(excluded_nets)} nets excluded as unroutable: {list(excluded_nets)[:10]}...")
return {
'success': success,
'converged': success, # Edge convergence = success
'barrel_conflicts': final_barrel_conflicts,
'excluded_nets': len(excluded_nets),
'excluded_net_ids': list(excluded_nets),
'error_code': None if success else 'ROUTING-FAILED',
'message': 'Complete' if success else f'{failed} unrouted, {over_cnt} overused',
'overuse_sum': over_sum,
'overuse_edges': over_cnt,
'failed_nets': failed,
'layer_recommendation': layer_recommendation
}
def _analyze_layer_requirements(self, failed_nets: int, overuse_edges: int, overuse_sum: int) -> Dict:
"""Analyze if board needs more layers based on routing failures"""
current_layers = self.lattice.layers
total_nets = len(self.net_pad_ids)
# Calculate failure rate and congestion density
fail_rate = failed_nets / max(1, total_nets)
congestion_per_edge = overuse_sum / max(1, overuse_edges) if overuse_edges > 0 else 0
# Heuristics for layer requirement
if fail_rate > 0.4 and overuse_edges > 200:
# High failure rate with significant congestion
# Estimate: 1 layer per 50 failed nets
additional = max(4, int(failed_nets / 50))
return {
'needs_more': True,
'additional': additional,
'recommended_total': current_layers + additional,
'reason': f'High failure rate ({fail_rate*100:.1f}%) with {overuse_edges} congested edges suggests insufficient layer capacity'
}
elif overuse_sum > 800 and overuse_edges > 400:
# Severe congestion even with partial routing
return {
'needs_more': True,
'additional': 6,
'recommended_total': current_layers + 6,
'reason': f'Severe congestion ({overuse_sum} conflicts across {overuse_edges} edges) indicates board density exceeds layer capacity'
}
elif fail_rate > 0.3:
# Moderate failure, try 2-4 more layers
additional = 4 if fail_rate > 0.35 else 2
return {
'needs_more': True,
'additional': additional,
'recommended_total': current_layers + additional,
'reason': f'Moderate routing failure ({fail_rate*100:.1f}%) suggests {additional} additional layers may help'
}
else:
return {
'needs_more': False,
'additional': 0,
'recommended_total': current_layers,
'reason': f'Failure rate ({fail_rate*100:.1f}%) and overuse ({overuse_edges} edges) within acceptable range for current layer count'
}
def _detail_pass(self, tasks: Dict[str, Tuple[int, int]], initial_overuse: int, initial_edges: int) -> Dict:
"""
Detail pass: extract conflict subgraph and route only affected nets
with fine ROI, lower via cost, higher history gain, and max 60 hotset.
"""
logger.info("[DETAIL] Extracting conflict subgraph...")
present = self.accounting.present.get() if self.accounting.use_gpu else self.accounting.present
cap = self.accounting.capacity.get() if self.accounting.use_gpu else self.accounting.capacity
over = np.maximum(0, present - cap)
# Find overused edges and their neighborhoods (radius ~5-10 edges)
conflict_edges = set(int(ei) for ei in range(len(over)) if over[ei] > 0)
# Collect nets that use any conflict edge
conflict_nets = set()
for net_id, path in self.net_paths.items():
if path and any(ei in conflict_edges for ei in self._path_to_edges(path)):
conflict_nets.add(net_id)
logger.info(f"[DETAIL] Found {len(conflict_nets)} nets in conflict zone")
if not conflict_nets:
return {'success': False, 'error_code': 'NO-CONFLICT-NETS'}
# Build subset of tasks for conflict nets
conflict_tasks = {nid: tasks[nid] for nid in conflict_nets if nid in tasks}
# Detail loop: max 10 iterations with aggressive settings
cfg = self.config
pres_fac = cfg.pres_fac_max * 0.5 # Start high
best_overuse = initial_overuse
for detail_it in range(1, 11):
logger.info(f"[DETAIL {detail_it}/10] pres_fac={pres_fac:.1f}")
self.accounting.refresh_from_canonical()
# Update costs with lower via cost and higher history gain
via_cost_mult = 0.3 # Much lower via cost for detail pass
self.accounting.update_costs(
self.graph.base_costs, pres_fac, cfg.hist_cost_weight * 1.5,
via_cost_multiplier=via_cost_mult,
base_cost_weight=cfg.base_cost_weight
)
# Build hotset (capped at 60 for detail pass)
detail_hotset = self._build_hotset(conflict_tasks)
detail_hotset = set(list(detail_hotset)[:60]) # Hard cap at 60
if not detail_hotset:
detail_hotset = conflict_nets # Route all if hotset empty
detail_sub_tasks = {k: v for k, v in conflict_tasks.items() if k in detail_hotset}
# Route with wider ROI (stagnation bonus for fine search)
old_stagnation = self.stagnation_counter
self.stagnation_counter += 3 # Temporarily increase for wider ROI
routed, failed = self._route_all(detail_sub_tasks, all_tasks=tasks, pres_fac=pres_fac)
self.stagnation_counter = old_stagnation
self.accounting.refresh_from_canonical()
over_sum, over_cnt = self.accounting.compute_overuse(router_instance=self)
logger.info(f"[DETAIL {detail_it}/10] overuse={over_sum} edges={over_cnt}")
if over_sum == 0:
logger.info("[DETAIL] SUCCESS: Zero overuse achieved")
# Log GPU vs CPU pathfinding statistics
total_paths = self._gpu_path_count + self._cpu_path_count
if total_paths > 0:
gpu_pct = (self._gpu_path_count / total_paths) * 100
logger.info(f"[GPU-STATS] GPU: {self._gpu_path_count} paths ({gpu_pct:.1f}%), CPU: {self._cpu_path_count} paths ({100-gpu_pct:.1f}%)")
return {'success': True, 'paths': self.net_paths}
if over_sum < best_overuse:
best_overuse = over_sum
else:
# No improvement: escalate and continue
pass
# Update history for conflict edges only
self.accounting.update_history(
cfg.hist_gain * 2.0, # Double history gain in detail pass
base_costs=self.graph.base_costs,
history_cap_multiplier=15.0,
decay_factor=0.98 # Use decay in detail pass to allow redistribution
)
pres_fac = min(pres_fac * 1.5, cfg.pres_fac_max)
# Detail pass exhausted
logger.warning(f"[DETAIL] Failed to reach zero: final overuse={best_overuse}")
return {'success': False, 'error_code': 'DETAIL-INCOMPLETE', 'overuse_sum': best_overuse}
def _order_nets_by_difficulty(self, tasks: Dict[str, Tuple[int, int]]) -> List[str]:
"""
Order nets by difficulty score = distance * (pin_degree + 1) * (congestion + 1).
Route hardest first. Apply slight shuffle each iteration.
"""
import random
present = self.accounting.present.get() if self.accounting.use_gpu else self.accounting.present
cap = self.accounting.capacity.get() if self.accounting.use_gpu else self.accounting.capacity
over = np.maximum(0, present - cap)
scores = []
for net_id, (src, dst) in tasks.items():
# Distance estimate (Manhattan in lattice space)
sx, sy, sz = self.lattice.idx_to_coord(src)
dx, dy, dz = self.lattice.idx_to_coord(dst)
distance = abs(dx - sx) + abs(dy - sy) + abs(dz - sz)
# Pin degree (for point-to-point, degree=2; could be extended for multi-pin)
pin_degree = 2
# Congestion: average overuse along prior path (if exists)
congestion = 0.0
if net_id in self._net_to_edges:
# Use cached edge mapping (O(1) lookup) instead of recomputing (O(M) scan)
edges = self._net_to_edges[net_id]
congestion = sum(float(over[ei]) for ei in edges) / max(1, len(edges))
difficulty = distance * (pin_degree + 1) * (congestion + 1)
scores.append((difficulty, net_id))
scores.sort(reverse=True) # hardest first
# Apply slight shuffle: rotate order by small random amount
rotation = random.randint(0, min(5, len(scores) // 10))
ordered = [nid for _, nid in scores]
if rotation > 0:
ordered = ordered[rotation:] + ordered[:rotation]
return ordered
def _route_all(self, tasks: Dict[str, Tuple[int, int]], all_tasks: Dict[str, Tuple[int, int]] = None, pres_fac: float = 1.0, iteration: int = 0) -> Tuple[int, int]:
"""Route nets with adaptive ROI extraction and intra-iteration cost updates"""
if all_tasks is None:
all_tasks = tasks
routed_this_pass = 0
failed_this_pass = 0
total = len(tasks)
cfg = self.config
# Reset full-graph fallback counter at start of iteration
self.full_graph_fallback_count = 0
# ROI margin grows with stagnation: +0.6mm per stagnation mark
roi_margin_bonus = self.stagnation_counter * 0.6
# Order nets by difficulty: hardest first, with slight shuffle per iteration
ordered_nets = self._order_nets_by_difficulty(tasks)
# Start with costs from iteration-level update
# NOTE: For GPU, keep as CuPy array (don't .get() to CPU)
# Per-net updates will refresh this reference
costs = self.accounting.total_cost # Keep on GPU if use_gpu=True
# PathFinder design: Sequential routing with FIXED costs throughout iteration
# Costs are updated ONCE per iteration (in _pathfinder_negotiation), not per net
# This allows all nets to route with same congestion view, preventing unfair advantages
# CRITICAL: Verify sequential mode (SEQUENTIAL_ALL env var check)
force_sequential = os.getenv("SEQUENTIAL_ALL") == "1"
if force_sequential:
logger.info(f"[SEQUENTIAL_ALL] Sequential mode ENABLED via environment variable")
logger.info(f"[ROUTING MODE] Using sequential routing for {total} nets (PathFinder algorithm)")
for idx, net_id in enumerate(ordered_nets):
net_start_time = time.time()
src, dst = tasks[net_id]
# Show progress more frequently in iteration 1 (greedy routing)
if iteration == 1:
# Show every 25 nets in iteration 1
if (idx + 1) % 25 == 0 or idx == 0:
logger.warning(f"[ITER 1 - GREEDY] Routing {idx+1}/{total} nets...")
elif idx % 50 == 0 and total > 50:
logger.debug(f" Routing net {idx+1}/{total}")
# Only clear if we're actually re-routing this net
if net_id in self.net_paths and self.net_paths[net_id]:
# Use cached edges if available, otherwise compute
if net_id in self._net_to_edges:
old_edges = self._net_to_edges[net_id]
else:
old_edges = self._path_to_edges(self.net_paths[net_id])
self.accounting.clear_path(old_edges)
# Clear old tracking before re-routing
self._clear_net_edge_tracking(net_id)
# Calculate adaptive ROI radius based on net length
src_x, src_y, src_z = self.lattice.idx_to_coord(src)
dst_x, dst_y, dst_z = self.lattice.idx_to_coord(dst)
manhattan_dist = abs(dst_x - src_x) + abs(dst_y - src_y)
# Adaptive radius: 120% of Manhattan distance + minimum 10-step buffer
# Increased max from 150 to 800 to handle full-board routes
adaptive_radius = max(40, min(int(manhattan_dist * 1.2) + 10, 800))
if manhattan_dist > 800:
logger.warning(f"Net {net_id}: manhattan_dist={manhattan_dist} exceeds max radius=800!")
# Check if we have portals for this net
use_portals = cfg.portal_enabled and net_id in self.net_pad_ids
src_seeds = []
dst_targets = []
if use_portals:
src_pad_id, dst_pad_id = self.net_pad_ids[net_id]
src_portal = self.portals.get(src_pad_id)
dst_portal = self.portals.get(dst_pad_id)
if src_portal and dst_portal:
# Get multi-layer seeds at both portals
src_seeds = self._get_portal_seeds(src_portal)
dst_targets = self._get_portal_seeds(dst_portal)
else:
use_portals = False
# ═══════════════════════════════════════════════════════════════
# GPU SUPERSOURCE FAST PATH (ATTEMPT BEFORE ROI EXTRACTION)
# ═══════════════════════════════════════════════════════════════
# Try full-graph GPU pathfinding with supersource/supersink seeds
# This skips ROI extraction entirely and routes on full graph
# If it fails or GPU not available, fall back to standard ROI routing
gpu_fast_path_used = False
# Full-graph routing with owner-aware bitmap filtering
if use_portals and hasattr(self.solver, 'gpu_solver') and self.solver.gpu_solver:
try:
import numpy as np
import cupy as cp
# Check if costs are on GPU
costs_on_gpu = hasattr(costs, 'device')
if costs_on_gpu:
logger.info(f"[GPU-SEEDS] Attempting GPU supersource routing for net {net_id}")
# Convert portal seeds to plain node arrays
src_node_array = self._build_routing_seeds(src_seeds)
dst_node_array = self._build_routing_seeds(dst_targets)
# Build owner-aware bitmap - FORCE-ALLOW source/dest seeds!
force_allow = np.unique(np.concatenate([src_node_array, dst_node_array])).astype(np.int32)
owner_bitmap = self._build_owner_bitmap_for_fullgraph(net_id, force_allow_nodes=force_allow)
if len(src_node_array) > 0 and len(dst_node_array) > 0:
# Call GPU supersource pathfinding with owner-aware bitmap
gpu_start = time.time()
path = self.solver.gpu_solver.find_path_fullgraph_gpu_seeds(
costs=costs,
src_seeds=src_node_array,
dst_targets=dst_node_array,
ub_hint=None,
allowed_bitmap=owner_bitmap,
use_bitmap=True
)
gpu_time = time.time() - gpu_start
if path and len(path) > 1:
logger.info(f"[GPU-SEEDS] SUCCESS! Path found in {gpu_time:.3f}s ({len(path)} nodes)")
gpu_fast_path_used = True
# Determine entry and exit layers from path
if self.solver.plane_size:
entry_layer = path[0] // self.solver.plane_size
exit_layer = path[-1] // self.solver.plane_size
else:
entry_layer = exit_layer = 0
# Commit path and continue to next net
edge_indices = self._path_to_edges(path)
self.accounting.commit_path(edge_indices)
# NOTE: Do NOT update costs here! PathFinder requires fixed costs per iteration.
self.net_paths[net_id] = path
# CRITICAL: Mark via barrel ownership IMMEDIATELY for next net in same iteration!
self._mark_via_barrel_ownership_for_path(net_id, path)
if entry_layer is not None and exit_layer is not None:
self.net_portal_layers[net_id] = (entry_layer, exit_layer)
self._update_net_edge_tracking(net_id, edge_indices)
routed_this_pass += 1
continue # Skip ROI extraction and CPU routing
else:
logger.info(f"[GPU-SEEDS] No path found, skipping net (will be marked as failed)")
failed_this_pass += 1
continue # Skip CPU fallback, let exclusion handle it
else:
logger.warning(f"[GPU-SEEDS] Empty seed arrays, skipping GPU fast path")
except Exception as e:
logger.warning(f"[GPU-SEEDS] GPU fast path failed: {e}, skipping net (will be marked as failed)")
failed_this_pass += 1
continue # Skip CPU fallback, let exclusion handle it
# If GPU fast path succeeded, we already continued to next net above
# Otherwise, proceed with standard ROI routing below
# ═══════════════════════════════════════════════════════════════
# HYBRID ROI/FULL-GRAPH ROUTING (BACKPLANE-OPTIMIZED)
# ═══════════════════════════════════════════════════════════════
# Strategy:
# - SHORT NETS (<125 steps = 50mm): Use ROI → 10× faster, <50 iterations
# - LONG NETS (≥125 steps): Use full graph → necessary for board-spanning
# Rationale: Backplanes have power/ground/clock spanning entire board
roi_start = time.time()
# Distance threshold: 125 steps @ 0.4mm = 50mm
ROI_THRESHOLD_STEPS = 125
use_roi_extraction = manhattan_dist < ROI_THRESHOLD_STEPS
import numpy as np
if use_roi_extraction:
# SHORT NET: Use focused ROI for fast routing
# Determine src/dst for ROI center
if use_portals and src_portal and dst_portal:
roi_src_node = self.lattice.node_idx(src_portal.x_idx, src_portal.y_idx, src_portal.entry_layer)
roi_dst_node = self.lattice.node_idx(dst_portal.x_idx, dst_portal.y_idx, dst_portal.entry_layer)
else:
roi_src_node = src
roi_dst_node = dst
# Adaptive radius: 150% of distance for detours
adaptive_radius = max(40, int(manhattan_dist * 1.5) + 10)
# Gather portal seeds if available
portal_seeds = None
if use_portals and src_portal and dst_portal:
s_seeds = self._get_portal_seeds(src_portal)
d_seeds = self._get_portal_seeds(dst_portal)
portal_seeds = (s_seeds or []) + (d_seeds or [])
# Extract ROI
roi_nodes, global_to_roi = self.roi_extractor.extract_roi(
roi_src_node, roi_dst_node,
initial_radius=adaptive_radius,
stagnation_bonus=roi_margin_bonus,
portal_seeds=portal_seeds
)
if idx % 100 == 0:
logger.info(f"[HYBRID] Net {net_id}: SHORT ({manhattan_dist} steps) → ROI ({len(roi_nodes):,} nodes)")
else:
# LONG NET: Use full graph (board-spanning)
roi_nodes = np.arange(self.N, dtype=np.int32)
global_to_roi = np.arange(self.N, dtype=np.int32)
if idx % 100 == 0:
logger.info(f"[HYBRID] Net {net_id}: LONG ({manhattan_dist} steps) → FULL GRAPH ({self.N:,} nodes)")
roi_time = time.time() - roi_start
# Ensure portal seeds are in ROI (important for multi-layer routing)
portal_setup_start = time.time()
nodes_to_add = []
# Add portal seed nodes if they're not already in ROI
if use_portals and src_seeds and dst_targets:
for global_node, _ in src_seeds:
if global_to_roi[global_node] < 0:
nodes_to_add.append(global_node)
for global_node, _ in dst_targets:
if global_to_roi[global_node] < 0:
nodes_to_add.append(global_node)
# Add missing portal nodes to ROI
if nodes_to_add:
roi_nodes = np.append(roi_nodes, nodes_to_add)
# Rebuild mapping to include new nodes
global_to_roi = np.full(len(costs), -1, dtype=np.int32)
global_to_roi[roi_nodes] = np.arange(len(roi_nodes), dtype=np.int32)
# OWNER-AWARE FILTERING: Remove nodes owned by OTHER nets
# This prevents routing through via barrels - THE FIX for dangling vias!
roi_nodes = self._filter_roi_by_ownership(roi_nodes, net_id)
# Rebuild mapping after ownership filtering
global_to_roi = np.full(len(costs), -1, dtype=np.int32)
global_to_roi[roi_nodes] = np.arange(len(roi_nodes), dtype=np.int32)
# Final sanity check
if global_to_roi[src] < 0:
logger.error(f"BUG: src {src} not in ROI after all additions!")
if global_to_roi[dst] < 0:
logger.error(f"BUG: dst {dst} not in ROI after all additions!")
portal_setup_time = time.time() - portal_setup_start
# Log ROI sizes periodically
if idx % 50 == 0:
logger.debug(f" ROI size={len(roi_nodes)} for net {net_id} (margin_bonus={roi_margin_bonus:.1f}mm)")
# Debug first net
if idx == 0:
logger.info(f"[DEBUG] First net: portal_enabled={cfg.portal_enabled}, net_id={net_id}, use_portals={use_portals}")
if use_portals:
logger.info(f"[DEBUG] src_seeds count={len(src_seeds)}, dst_targets count={len(dst_targets)}")
logger.info(f"[DEBUG] ROI size={len(roi_nodes)}")
path = None
entry_layer = exit_layer = None
pathfinding_start = time.time()
if use_portals:
# Route with multi-source/multi-sink using portal seeds
result = self.solver.find_path_multisource_multisink(
src_seeds, dst_targets, costs, roi_nodes, global_to_roi
)
if result:
path, entry_layer, exit_layer = result
# Fallback to normal routing if portals not available or failed
if not use_portals or not path:
if idx == 0 and use_portals:
logger.info(f"[DEBUG] Portal routing failed, falling back to normal routing")
path = self.solver.find_path_roi(src, dst, costs, roi_nodes, global_to_roi)
pathfinding_time = time.time() - pathfinding_start
# If ROI fails and we haven't exhausted fallback quota, try larger ROI
if (not path or len(path) <= 1) and self.full_graph_fallback_count < 5:
# Fallback: 1.5× adaptive radius (capped at 200)
fallback_radius = min(int(adaptive_radius * 1.5), 200)
logger.debug(f" ROI failed for {net_id}, trying larger ROI radius={fallback_radius} (fallback {self.full_graph_fallback_count+1}/5)")
if use_portals:
# Use portal nodes for larger ROI
src_pad_id, dst_pad_id = self.net_pad_ids[net_id]
src_portal = self.portals.get(src_pad_id)
dst_portal = self.portals.get(dst_pad_id)
if src_portal and dst_portal:
# CRITICAL FIX: Use entry_layer (routing layer) not pad_layer (F.Cu)!
src_portal_node = self.lattice.node_idx(src_portal.x_idx, src_portal.y_idx, src_portal.entry_layer)
dst_portal_node = self.lattice.node_idx(dst_portal.x_idx, dst_portal.y_idx, dst_portal.entry_layer)
roi_nodes, global_to_roi = self.roi_extractor.extract_roi(
src_portal_node, dst_portal_node, initial_radius=fallback_radius, stagnation_bonus=roi_margin_bonus * 2
)
result = self.solver.find_path_multisource_multisink(
src_seeds, dst_targets, costs, roi_nodes, global_to_roi
)
if result:
path, entry_layer, exit_layer = result
else:
# Use pad nodes for larger ROI
roi_nodes, global_to_roi = self.roi_extractor.extract_roi(
src, dst, initial_radius=fallback_radius, stagnation_bonus=roi_margin_bonus * 2
)
path = self.solver.find_path_roi(src, dst, costs, roi_nodes, global_to_roi)
self.full_graph_fallback_count += 1
if path and len(path) > 1:
edge_indices = self._path_to_edges(path)
self.accounting.commit_path(edge_indices) # bumps present for next iteration
# NOTE: Do NOT update costs here! PathFinder requires fixed costs per iteration.
# Costs are updated ONCE per iteration in _pathfinder_negotiation()
self.net_paths[net_id] = path
# CRITICAL: Mark via barrel ownership IMMEDIATELY for next net in same iteration!
self._mark_via_barrel_ownership_for_path(net_id, path)
# Store portal entry/exit layers if using portals
if use_portals and entry_layer is not None and exit_layer is not None:
self.net_portal_layers[net_id] = (entry_layer, exit_layer)
# Update edge-to-nets tracking
self._update_net_edge_tracking(net_id, edge_indices)
routed_this_pass += 1
else:
failed_this_pass += 1
self.net_paths[net_id] = []
# Clear tracking for failed nets
self._clear_net_edge_tracking(net_id)
# Track portal failures and retarget if needed
if cfg.portal_enabled and net_id in self.net_pad_ids:
self.net_portal_failures[net_id] += 1
if self.net_portal_failures[net_id] >= cfg.portal_retarget_patience:
# Retarget portals for this net
self._retarget_portals_for_net(net_id)
self.net_portal_failures[net_id] = 0 # Reset counter
# Log timing for first net and periodically thereafter
net_total_time = time.time() - net_start_time
if idx == 0 or (idx < 10) or (idx % 50 == 0):
logger.info(f"[TIMING] Net {idx+1}/{total} ({net_id}): ROI={roi_time:.3f}s (size={len(roi_nodes)}), Portal={portal_setup_time:.3f}s, Path={pathfinding_time:.3f}s, Total={net_total_time:.3f}s")
# Count total routed/failed across all nets
total_routed = sum(1 for path in self.net_paths.values() if path)
total_failed = len(all_tasks) - total_routed
return total_routed, total_failed
def _compute_layer_bias(self, accountant, graph, num_layers: int, alpha: float = 0.9, max_boost: float = 1.8):
"""
Compute per-layer multiplicative bias (shape [L]) based on overuse distribution.
Hot layers get bias > 1.0, cool layers stay at 1.0.
Args:
accountant: EdgeAccountant with present/capacity arrays
graph: CSRGraph with edge_layer mapping
num_layers: Number of layers
alpha: EMA smoothing factor (0..1, higher = smoother)
max_boost: Maximum bias multiplier (1.0 to max_boost)
Returns:
Layer bias array or None if layer mapping unavailable
"""
xp = accountant.xp
# Check if edge_layer mapping exists
edge_layer = getattr(graph, "edge_layer_gpu", None) if accountant.use_gpu else getattr(graph, "edge_layer", None)
if edge_layer is None:
return None
# Get overuse array - use smoothed present if available
present_for_bias = getattr(accountant, 'present_ema', accountant.present)
over = xp.maximum(0, present_for_bias - accountant.capacity)
# Sum overuse per layer (ONE bincount - very fast)
per_layer_over = xp.bincount(edge_layer, weights=over, minlength=num_layers)
# Normalize to create bias factors
maxv = float(per_layer_over.max().get() if accountant.use_gpu else per_layer_over.max())
if maxv <= 1e-9:
raw_bias = xp.ones(num_layers, dtype=xp.float32)
else:
shortfall = per_layer_over / maxv
# Baseline bias from document
raw_bias = 1.0 + 0.75 * shortfall # Bias = 1.0 to 1.75
# EMA smoothing to prevent oscillation
if not hasattr(self, "_layer_bias_ema"):
self._layer_bias_ema = raw_bias.astype(xp.float32)
else:
self._layer_bias_ema = (alpha * self._layer_bias_ema + (1.0 - alpha) * raw_bias).astype(xp.float32)
# Clamp to prevent extreme penalties
self._layer_bias_ema = xp.clip(self._layer_bias_ema, 1.0, max_boost)
return self._layer_bias_ema
def _via_depart_discount(self, z_from: int, z_to: int) -> float:
"""
Compute via cost discount for leaving hot layers.
Encourages routing to move off congested layers toward cooler layers.
AGGRESSIVE discounting to force layer spreading.
Args:
z_from: Source layer
z_to: Target layer
Returns:
Discount multiplier (0.2-1.0), lower = cheaper via
"""
# Only apply discount if we have layer bias data
if not hasattr(self, '_layer_bias_ema') or self._layer_bias_ema is None:
return 1.0
# Get bias values (on CPU for simplicity, these are small arrays)
if hasattr(self._layer_bias_ema, 'get'):
bias_cpu = self._layer_bias_ema.get()
else:
bias_cpu = self._layer_bias_ema
if z_from >= len(bias_cpu) or z_to >= len(bias_cpu):
return 1.0
bf = float(bias_cpu[z_from]) # Source layer bias
bt = float(bias_cpu[z_to]) # Target layer bias
# Check if both layers have same orientation (both H or both V)
# Layers alternate: even=H, odd=V (or vice versa, depends on stackup)
same_orientation = (z_from % 2) == (z_to % 2)
# Calculate discount based on bias difference
delta = max(0.0, bf - bt)
if same_orientation and delta > 0.15:
# Moderate: If leaving hot H layer for cool H layer (or V→V), give good discount
# This encourages spreading within same routing direction
discount = 1.0 - min(0.45, 0.8 * delta) # Up to 45% off
elif delta > 0.1:
# Standard: Leaving any hot layer for cooler layer
discount = 1.0 - min(0.35, 0.6 * delta) # Up to 35% off
else:
# Small benefit
discount = 1.0 - min(0.15, 0.4 * delta) # Up to 15% off
return max(0.55, discount) # Never go below 0.55 (45% off maximum)
def _retarget_portals_for_net(self, net_id: str):
"""Retarget portals when a net fails repeatedly"""
if net_id not in self.net_pad_ids:
return
src_pad_id, dst_pad_id = self.net_pad_ids[net_id]
# Retarget source portal
if src_pad_id in self.portals:
portal = self.portals[src_pad_id]
# Try flipping direction first
if portal.retarget_count == 0:
portal.direction = -portal.direction
portal.y_idx = portal.y_idx - 2 * portal.direction * portal.delta_steps # Flip to other side
portal.retarget_count += 1
logger.debug(f"Retargeted portal for {src_pad_id}: flipped direction")
# Then try different delta
elif portal.retarget_count == 1:
# Try delta closer to preferred
new_delta = self.config.portal_delta_pref
if new_delta != portal.delta_steps:
delta_change = new_delta - portal.delta_steps
portal.y_idx += portal.direction * delta_change
portal.delta_steps = new_delta
portal.retarget_count += 1
logger.debug(f"Retargeted portal for {src_pad_id}: adjusted delta to {new_delta}")
# Retarget destination portal (same logic)
if dst_pad_id in self.portals:
portal = self.portals[dst_pad_id]
if portal.retarget_count == 0:
portal.direction = -portal.direction
portal.y_idx = portal.y_idx - 2 * portal.direction * portal.delta_steps
portal.retarget_count += 1
elif portal.retarget_count == 1:
new_delta = self.config.portal_delta_pref
if new_delta != portal.delta_steps:
delta_change = new_delta - portal.delta_steps
portal.y_idx += portal.direction * delta_change
portal.delta_steps = new_delta
portal.retarget_count += 1
def _rebuild_usage_from_committed_nets(self, keep_net_ids: Set[str]):
"""
Rebuild present usage from scratch based on committed nets.
Prevents ghost usage accumulation.
"""
# Clear accounting
self.accounting.canonical.clear()
self.accounting.present.fill(0)
# Rebuild from nets we're keeping
for net_id in keep_net_ids:
if net_id in self._net_to_edges:
for ei in self._net_to_edges[net_id]:
self.accounting.canonical[ei] = self.accounting.canonical.get(ei, 0) + 1
self.accounting.present[ei] += 1
logger.debug(f"[USAGE] Rebuilt from {len(keep_net_ids)} committed nets")
def _update_net_edge_tracking(self, net_id: str, edge_indices: List[int]):
"""Update edge-to-nets tracking when a net is routed"""
# Clear old tracking for this net
self._clear_net_edge_tracking(net_id)
# Store new edges for this net
self._net_to_edges[net_id] = edge_indices
# Update reverse mapping
for ei in edge_indices:
self._edge_to_nets[ei].add(net_id)
def _clear_net_edge_tracking(self, net_id: str):
"""Clear edge-to-nets tracking for a net"""
if net_id in self._net_to_edges:
# Remove this net from all edge mappings
for ei in self._net_to_edges[net_id]:
self._edge_to_nets[ei].discard(net_id)
del self._net_to_edges[net_id]
def _build_hotset(self, tasks: Dict[str, Tuple[int, int]], ripped: Optional[Set[str]] = None) -> Set[str]:
"""
Build hotset: ONLY nets touching overused edges, with adaptive capping.
Prevents thrashing by limiting hotset size based on actual overuse.
Implements freeze-clean: nets clean for 3+ iterations are excluded from hotset.
"""
if ripped is None:
ripped = set()
present = self.accounting.present.get() if self.accounting.use_gpu else self.accounting.present
cap = self.accounting.capacity.get() if self.accounting.use_gpu else self.accounting.capacity
hist = self.accounting.history.get() if self.accounting.use_gpu else self.accounting.history
# Include history in hotset selection: edges are "hot" if they have present OR historical congestion
over = np.maximum(0, present - cap)
hot_edges = over + 0.1 * hist # Gentle history influence (was 0.5 - too aggressive!)
over_idx = set(map(int, np.where(hot_edges > 0.5)[0])) # Higher threshold to be more selective
# Initialize clean iteration tracking
if not hasattr(self, '_net_clean_iters'):
self._net_clean_iters = {}
freeze_after_clean = int(getattr(self.config, 'freeze_after_clean', 3))
# NO OVERUSE CASE: only route unrouted nets
if len(over_idx) == 0:
unrouted = {nid for nid in tasks.keys() if not self.net_paths.get(nid)}
hotset = unrouted | ripped
logger.info(f"[HOTSET] no-overuse; unrouted={len(unrouted)} ripped={len(ripped)} → hotset={len(hotset)}")
return hotset
# OVERUSE EXISTS: collect nets touching overused edges using fast lookup
offenders = set()
for ei in over_idx:
offenders.update(self._edge_to_nets.get(ei, set()))
# Update clean iteration counters
for net_id in tasks.keys():
if net_id in offenders:
# Net is touching overused edges - reset counter
self._net_clean_iters[net_id] = 0
else:
# Net is clean - increment counter
self._net_clean_iters[net_id] = self._net_clean_iters.get(net_id, 0) + 1
# Filter out frozen nets (clean for freeze_after_clean+ iterations)
frozen_nets = {nid for nid in offenders if self._net_clean_iters.get(nid, 0) >= freeze_after_clean}
offenders -= frozen_nets
if frozen_nets:
logger.debug(f"[FREEZE-CLEAN] Excluded {len(frozen_nets)} nets clean for {freeze_after_clean}+ iterations")
# Add ripped nets
offenders |= ripped
# Add unrouted nets (small priority, will be at end after sorting)
unrouted = {nid for nid in tasks.keys() if not self.net_paths.get(nid)}
# Score offenders by total overuse they contribute
scores = []
for net_id in offenders:
if net_id in self._net_to_edges:
impact = sum(float(over[ei]) for ei in self._net_to_edges[net_id] if ei in over_idx)
scores.append((impact, net_id))
# Add unrouted with low priority
for net_id in unrouted:
if net_id not in offenders:
scores.append((0.0, net_id))
# Sort by impact (highest first)
scores.sort(reverse=True)
# ADAPTIVE CAP: Scale with overuse severity to prevent both thrashing and stagnation
# Formula balances: enough rip-up to make progress, not so much we destabilize
total_overuse = sum(float(over[ei]) for ei in over_idx)
base_target = max(64, min(180, len(over_idx) // 25)) # Scale with # of overused edges
# Track improvement trend for adaptive hotset sizing
if not hasattr(self, '_prev_overuse_for_hotset'):
self._prev_overuse_for_hotset = float('inf')
# DISABLED: Adaptive sizing creates disruption spikes when hotset enlarges
# delta = (self._prev_overuse_for_hotset - total_overuse) / max(1, self._prev_overuse_for_hotset)
# if delta < 0.02: # <2% improvement - enlarge hotset (stuck, need more rip-up)
# base_target = min(240, int(base_target * 1.25))
# logger.debug(f"[HOTSET-TREND] Slow progress ({delta*100:.1f}%) → enlarging hotset to {base_target}")
# elif delta > 0.08: # >8% improvement - shrink hotset (good progress, focus efforts)
# base_target = max(64, int(base_target * 0.80))
# logger.debug(f"[HOTSET-TREND] Good progress ({delta*100:.1f}%) → shrinking hotset to {base_target}")
# Use FIXED hotset size for stable convergence
base_target = 100 # Fixed size (20% of nets) - smaller for smoother convergence
self._prev_overuse_for_hotset = total_overuse
# Adjust based on progress (optional: track prev_overuse for this)
adaptive_cap = min(self.config.hotset_cap, base_target)
# 60/40 mix: 60% top scorers + 40% random to break phase-locking
import random
rng = random.Random(42 + self.iteration)
K_top = int(0.6 * adaptive_cap)
top_scorers = [nid for _, nid in scores[:K_top]]
rest = [nid for _, nid in scores[K_top:]]
K_random = min(adaptive_cap - K_top, len(rest))
random_sample = rng.sample(rest, K_random) if rest else []
hotset_list = top_scorers + random_sample
# Cooldown: exclude nets rerouted in previous iteration (prevents immediate re-routing)
if not hasattr(self, '_last_reroute_iter'):
self._last_reroute_iter = {}
hotset_with_cooldown = [nid for nid in hotset_list
if self.iteration - self._last_reroute_iter.get(nid, -999) > 1]
# Update last reroute iteration for selected nets
for nid in hotset_with_cooldown:
self._last_reroute_iter[nid] = self.iteration
hotset = set(hotset_with_cooldown)
unique_frac = len(hotset - getattr(self, '_prev_hotset', set())) / max(1, len(hotset))
self._prev_hotset = hotset.copy()
logger.info(f"[HOTSET] overuse_edges={len(over_idx)} total_overuse={int(total_overuse)}, "
f"offenders={len(offenders)}, cap={adaptive_cap} → hotset={len(hotset)}/{len(tasks)} "
f"(60% top + 40% random, unique={unique_frac:.1%})")
return hotset
def _log_top_overused_channels(self, over: np.ndarray, top_k: int = 10):
"""Log top-K overused channels with spatial info"""
# Find top-K overused edges
overused_edges = [(ei, float(over[ei])) for ei in range(len(over)) if over[ei] > 0]
if not overused_edges:
return
overused_edges.sort(key=lambda x: x[1], reverse=True)
top_edges = overused_edges[:top_k]
logger.info(f"[INSTRUMENTATION] Top-{len(top_edges)} overused channels:")
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
for rank, (ei, overuse_val) in enumerate(top_edges, 1):
# Find source node for this edge using binary search: O(log N) instead of O(N)
u = int(np.searchsorted(indptr, ei, side='right') - 1)
if 0 <= u < len(indptr) - 1 and indptr[u] <= ei < indptr[u + 1]:
v = int(indices[ei])
ux, uy, uz = self.lattice.idx_to_coord(u)
vx, vy, vz = self.lattice.idx_to_coord(v)
# Convert to mm for spatial context
ux_mm, uy_mm = self.lattice.geom.lattice_to_world(ux, uy)
vx_mm, vy_mm = self.lattice.geom.lattice_to_world(vx, vy)
edge_type = "VIA" if uz != vz else "TRACK"
layer_info = f"L{uz}" if uz == vz else f"L{uz}→L{vz}"
# Count nets using this edge (use cached reverse lookup)
nets_on_edge = len(self._edge_to_nets.get(ei, set()))
logger.info(f" {rank:2d}. {edge_type:5s} {layer_info:6s} "
f"({ux_mm:6.2f},{uy_mm:6.2f})→({vx_mm:6.2f},{vy_mm:6.2f}) "
f"overuse={overuse_val:.1f} nets={nets_on_edge}")
def _log_per_layer_congestion(self, over: np.ndarray):
"""Log overuse breakdown by layer (GPU-accelerated version)"""
import time
start_time = time.time()
# Use edge_layer array if available (pre-computed), otherwise compute from graph
if hasattr(self.graph, 'edge_layer'):
edge_layer = self.graph.edge_layer.get() if hasattr(self.graph.edge_layer, 'get') else self.graph.edge_layer
logger.debug(f"[LAYER-DIAG] Using pre-computed edge_layer array ({len(edge_layer)} edges)")
else:
# Fallback: compute layers from nodes (slower but works)
logger.debug(f"[LAYER-DIAG] Computing edge layers from graph (fallback mode)")
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
plane_size = self.lattice.Nx * self.lattice.Ny
# Vectorized: find source nodes for all edges
edge_indices = np.arange(len(over), dtype=np.int32)
src_nodes = np.searchsorted(indptr, edge_indices, side='right') - 1
dst_nodes = indices[edge_indices]
src_layers = src_nodes // plane_size
dst_layers = dst_nodes // plane_size
edge_layer = src_layers # Use source layer for classification
logger.debug(f"[LAYER-DIAG] Computed layers for {len(edge_layer)} edges in {time.time()-start_time:.2f}s")
# GPU-accelerated aggregation
layer_count = self.config.layer_count
overuse_horiz = {}
# Filter to overused edges only
overused_mask = over > 0
num_overused = int(np.sum(overused_mask))
if num_overused == 0:
logger.debug(f"[LAYER-DIAG] No overused edges, skipping")
return {}
logger.debug(f"[LAYER-DIAG] Processing {num_overused}/{len(over)} overused edges")
overused_values = over[overused_mask]
overused_layers = edge_layer[overused_mask]
# Sum by layer (vectorized)
for z in range(1, layer_count + 1):
layer_mask = overused_layers == z
if np.any(layer_mask):
overuse_horiz[z] = float(np.sum(overused_values[layer_mask]))
elapsed = time.time() - start_time
logger.debug(f"[LAYER-DIAG] Completed in {elapsed:.3f}s (vectorized)")
# Log horizontal overuse by layer
total_horiz = sum(overuse_horiz.values())
if total_horiz > 0:
logger.info(f"[LAYER-CONGESTION] Horizontal overuse by layer:")
for z in sorted(overuse_horiz.keys()):
if overuse_horiz[z] > 0:
pct = (overuse_horiz[z] / total_horiz) * 100
logger.info(f" Layer {z:2d}: {overuse_horiz[z]:7.1f} ({pct:5.1f}%)")
return overuse_horiz # Return for layer balancing
def _update_layer_bias(self, overuse_by_layer: dict, layer_bias: dict) -> dict:
"""
Update layer bias based on overuse distribution to encourage load balancing.
Hot layers get positive bias (higher cost), cool layers get negative bias (lower cost).
Uses EMA to smooth changes and prevent whiplash.
"""
if not overuse_by_layer or sum(overuse_by_layer.values()) == 0:
return layer_bias
# Calculate mean overuse (ignoring zero layers to avoid dilution)
nonzero_overuse = [v for v in overuse_by_layer.values() if v > 0]
if not nonzero_overuse:
return layer_bias
mu = sum(nonzero_overuse) / len(nonzero_overuse) + 1e-6
# Calculate normalized error for each layer (positive = hotter than average)
for z in overuse_by_layer.keys():
err = (overuse_by_layer[z] / mu) - 1.0
# EMA update with small gain (0.02) to prevent oscillation
# Multiply by small factor (0.05) for gentle nudging
old_bias = layer_bias.get(z, 0.0)
new_bias = 0.9 * old_bias + 0.1 * (0.05 * err)
layer_bias[z] = new_bias
# Log top 5 biases for debugging
top_biases = sorted(layer_bias.items(), key=lambda x: abs(x[1]), reverse=True)[:5]
if any(abs(bias) > 0.01 for _, bias in top_biases):
bias_str = ", ".join([f"L{z}:{bias:+.3f}" for z, bias in top_biases])
logger.info(f"[LAYER-BIAS] Top biases: {bias_str}")
return layer_bias
def _apply_layer_bias_to_costs(self, layer_bias: dict):
"""Apply layer bias to edge costs (only horizontal edges on each layer)"""
if not layer_bias or not hasattr(self, '_layer_edges'):
return
# Get current costs
total_cost = self.accounting.total_cost
if self.accounting.use_gpu:
total_cost_cpu = total_cost.get()
else:
total_cost_cpu = total_cost
# Apply bias to each layer's horizontal edges
for z, bias in layer_bias.items():
if abs(bias) < 0.001: # Skip negligible biases
continue
if z in self._layer_edges:
edge_indices = self._layer_edges[z]
total_cost_cpu[edge_indices] += bias
# Update GPU if needed
if self.accounting.use_gpu:
self.accounting.total_cost[:] = cp.asarray(total_cost_cpu)
def _rip_top_k_offenders(self, k=20) -> Set[str]:
"""
Rip only the worst 16-24 nets to break stagnation (not the world).
Respect locked nets - don't rip unless they touch new overuse.
Returns the set of ripped net IDs.
"""
present = self.accounting.present.get() if self.accounting.use_gpu else self.accounting.present
cap = self.accounting.capacity.get() if self.accounting.use_gpu else self.accounting.capacity
over = np.maximum(0, present - cap)
over_idx = set(map(int, np.where(over > 0)[0]))
# Score nets by impact on worst edges (use fast lookup)
scores = []
for net_id, path in self.net_paths.items():
if not path or net_id in self.locked_nets:
continue
if net_id in self._net_to_edges:
impact = sum(float(over[ei]) for ei in self._net_to_edges[net_id] if ei in over_idx)
if impact > 0:
scores.append((impact, net_id))
scores.sort(reverse=True)
victims = {nid for _, nid in scores[:k]}
for net_id in victims:
if self.net_paths.get(net_id) and net_id in self._net_to_edges:
# Use cached edges for efficiency
self.accounting.clear_path(self._net_to_edges[net_id])
self.net_paths[net_id] = []
# Clear edge tracking for ripped nets
self._clear_net_edge_tracking(net_id)
# Reset clean streak so they can't immediately lock again
self.net_clean_streak[net_id] = 0
logger.info(f"[STAGNATION] Ripped {len(victims)} nets (locked={len(self.locked_nets)} preserved)")
return victims
def _apply_portal_discount(self):
"""Apply portal discount to span-1 vias adjacent to terminals"""
if self.config.portal_discount >= 1.0:
return # No discount
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
base_costs = self.graph.base_costs.get() if hasattr(self.graph.base_costs, 'get') else self.graph.base_costs
# Get terminal nodes
terminal_nodes = set(self.pad_to_node.values())
plane_size = self.lattice.x_steps * self.lattice.y_steps
discount_count = 0
for terminal in terminal_nodes:
tz = terminal // plane_size
# Find via edges from this terminal
for ei in range(int(indptr[terminal]), int(indptr[terminal+1])):
v = int(indices[ei])
vz = v // plane_size
span = abs(vz - tz)
# Apply discount only to span-1 vias (adjacent layers)
if span == 1 and self._via_edges[ei]:
base_costs[ei] *= self.config.portal_discount
discount_count += 1
logger.info(f"Applied portal discount ({self.config.portal_discount}x) to {discount_count} escape vias")
def _identify_via_edges(self):
"""Mark which edges are vias (vertical transitions between layers)"""
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
# Use numpy boolean array instead of Python set for memory efficiency
# With 27M edges, this uses ~30MB instead of ~750MB
num_edges = int(indptr[-1])
self._via_edges = np.zeros(num_edges, dtype=bool)
# Use arithmetic instead of idx_to_coord for speed
plane_size = self.lattice.x_steps * self.lattice.y_steps
for u in range(len(indptr) - 1):
uz = u // plane_size # Fast arithmetic instead of idx_to_coord
for ei in range(int(indptr[u]), int(indptr[u+1])):
v = int(indices[ei])
vz = v // plane_size
# Via edge: different layer (same x,y is implicit in Manhattan CSR construction)
self._via_edges[ei] = (uz != vz)
logger.info(f"Identified {int(self._via_edges.sum())} via edges")
def _build_via_edge_metadata(self):
"""
Precompute via edge metadata for vectorized penalty application.
Keeps metadata on GPU if available for zero-copy kernel execution.
"""
import time
t0 = time.perf_counter()
# Get via edge indices
via_edge_indices = np.where(self._via_edges)[0]
num_via_edges = len(via_edge_indices)
if num_via_edges == 0:
self._via_edge_metadata = None
return
# Get graph data
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
# Precompute u (source node) for each via edge using searchsorted
u_indices = np.searchsorted(indptr, via_edge_indices, side='right') - 1
# Get v (destination node) for each via edge
v_indices = indices[via_edge_indices]
# Convert to coordinates (vectorized)
plane_size = self.lattice.x_steps * self.lattice.y_steps
# u coordinates
xu = (u_indices % plane_size) % self.lattice.x_steps
yu = (u_indices % plane_size) // self.lattice.x_steps
zu = u_indices // plane_size
# v coordinates
xv = (v_indices % plane_size) % self.lattice.x_steps
yv = (v_indices % plane_size) // self.lattice.x_steps
zv = v_indices // plane_size
# For via edges, x,y should be same (sanity check in debug mode)
# Store just one (x,y) coordinate per via edge
via_xy_coords = np.stack([xu, yu], axis=1).astype(np.int32)
# Store z ranges (lo, hi) for each via edge
z_lo = np.minimum(zu, zv)
z_hi = np.maximum(zu, zv)
# Clamp z values to valid routing layers (1..Nz-2)
z_lo = np.clip(z_lo, 1, self.lattice.layers - 2)
z_hi = np.clip(z_hi, 1, self.lattice.layers - 2)
# Store metadata - on GPU if kernels available, CPU otherwise
use_via_gpu = self.config.use_gpu and GPU_AVAILABLE
if use_via_gpu:
# Keep on GPU for zero-copy kernel execution
self._via_edge_metadata = {
'indices': cp.asarray(via_edge_indices.astype(np.int32)),
'xy_coords': cp.asarray(via_xy_coords),
'z_lo': cp.asarray(z_lo.astype(np.int32)),
'z_hi': cp.asarray(z_hi.astype(np.int32)),
}
logger.info(f"[VIA-METADATA] Built metadata for {num_via_edges} via edges on GPU in {time.perf_counter() - t0:.3f}s")
else:
# Keep on CPU
self._via_edge_metadata = {
'indices': via_edge_indices.astype(np.int32),
'xy_coords': via_xy_coords,
'z_lo': z_lo.astype(np.int32),
'z_hi': z_hi.astype(np.int32),
}
logger.info(f"[VIA-METADATA] Built metadata for {num_via_edges} via edges on CPU in {time.perf_counter() - t0:.3f}s")
def _path_to_edges(self, node_path: List[int]) -> List[int]:
"""Nodes → edge indices via on-the-fly CSR scan (no dict needed)"""
out = []
indptr = self.graph.indptr.get() if hasattr(self.graph.indptr, 'get') else self.graph.indptr
indices = self.graph.indices.get() if hasattr(self.graph.indices, 'get') else self.graph.indices
for u, v in zip(node_path, node_path[1:]):
s, e = int(indptr[u]), int(indptr[u+1])
# Linear scan over neighbors (small degree in Manhattan lattice, so fast)
for ei in range(s, e):
if int(indices[ei]) == v:
out.append(ei)
break
return out
def _detect_barrel_conflicts(self) -> Tuple[np.ndarray, int]:
"""
Detect via barrel conflicts across all committed paths (GPU-accelerated).
This is THE critical fix for shorting_items violations!
Detects when committed edges touch via barrel nodes owned by other nets.
Returns:
(conflict_edge_indices, conflict_count)
"""
import numpy as np
logger.debug("[BARREL-CONFLICT] Checking for via barrel conflicts...")
# Bail out if node_owner not initialized
if not hasattr(self, 'node_owner') or self.node_owner is None:
logger.info("[BARREL-CONFLICT] Skipping: node_owner not initialized")
return np.array([], dtype=np.int32), 0
# Ensure edge_src mapping exists
self._ensure_edge_src_map()
# Use _net_paths (with underscore) - this is what the negotiation loop uses!
paths_dict = getattr(self, '_net_paths', {})
if not paths_dict:
# Fallback to net_paths if _net_paths doesn't exist
paths_dict = getattr(self, 'net_paths', {})
if not paths_dict:
logger.info("[BARREL-CONFLICT] Skipping: no committed paths found")
return np.array([], dtype=np.int32), 0
logger.info(f"[BARREL-CONFLICT] Found {len(paths_dict)} committed paths")
# Collect all committed edges with their net IDs
all_edge_indices = []
all_net_ids = []
for net_name, path in paths_dict.items():
if not path or len(path) < 2:
continue
net_id = self._get_net_id(net_name)
edge_indices = self._path_to_edges(path)
all_edge_indices.extend(edge_indices)
all_net_ids.extend([net_id] * len(edge_indices))
if len(all_edge_indices) == 0:
logger.info("[BARREL-CONFLICT] No edges found in paths")
return np.array([], dtype=np.int32), 0
logger.info(f"[BARREL-CONFLICT] Checking {len(all_edge_indices)} edges across {len(paths_dict)} nets")
# Convert to arrays
edge_indices = np.array(all_edge_indices, dtype=np.int32)
edge_net_ids = np.array(all_net_ids, dtype=np.int32)
# Get graph data (handle CPU/GPU)
graph_indices = self.graph.indices
if hasattr(graph_indices, 'get'):
graph_indices_cpu = graph_indices.get()
else:
graph_indices_cpu = graph_indices
# VECTORIZED CONFLICT DETECTION (GPU-accelerated!)
# Get source and destination nodes for all edges at once
src_nodes = self._edge_src[edge_indices] # Vectorized lookup
dst_nodes = graph_indices_cpu[edge_indices] # Vectorized lookup
# Get ownership for all nodes at once
src_owners = self.node_owner[src_nodes] # Vectorized lookup
dst_owners = self.node_owner[dst_nodes] # Vectorized lookup
# Vectorized conflict check:
# Conflict if src owned by different net OR dst owned by different net
src_conflict = (src_owners != -1) & (src_owners != edge_net_ids)
dst_conflict = (dst_owners != -1) & (dst_owners != edge_net_ids)
conflict_mask = src_conflict | dst_conflict # Element-wise OR
logger.info(f"[BARREL-CONFLICT] Vectorized check of {len(edge_indices)} edges completed")
# Get the actual edge indices that have conflicts
conflict_edge_indices = edge_indices[conflict_mask]
conflict_count = len(conflict_edge_indices)
if conflict_count > 0:
logger.info(f"[BARREL-CONFLICT] Detected {conflict_count} conflicts (checked {len(edge_indices)} edges)")
else:
logger.info(f"[BARREL-CONFLICT] No conflicts found (checked {len(edge_indices)} edges)")
return conflict_edge_indices, conflict_count
def _path_is_manhattan(self, path: List[int]) -> bool:
"""Validate that path obeys Manhattan routing discipline"""
for a, b in zip(path, path[1:]):
x0, y0, z0 = self.lattice.idx_to_coord(a)
x1, y1, z1 = self.lattice.idx_to_coord(b)
if z0 == z1:
# Planar move: must be adjacent (Manhattan distance = 1)
if (abs(x1 - x0) + abs(y1 - y0)) != 1:
logger.error(f"[PATH-INVALID-DETAIL] Planar non-adjacent: ({x0},{y0},{z0}) -> ({x1},{y1},{z1}), dist={abs(x1-x0)+abs(y1-y0)}")
return False
else:
# Via move: same X,Y, any Z distance (allow multi-layer vias for portals)
if not ((x1 == x0) and (y1 == y0)):
logger.error(f"[PATH-INVALID-DETAIL] Via with X/Y change: ({x0},{y0},{z0}) -> ({x1},{y1},{z1})")
return False
return True
def map_all_pads(self, board: Board) -> None:
"""Legacy API: pad mapping (already done in initialize_graph)"""
logger.info(f"map_all_pads: Already mapped {len(self.pad_to_node)} pads")
def prepare_routing_runtime(self):
"""Legacy API: prepare for routing (no-op, already ready)"""
logger.info("prepare_routing_runtime: Ready")
def _segment_world(self, a_idx: int, b_idx: int, layer: int, net: str):
ax, ay, _ = self.lattice.idx_to_coord(a_idx)
bx, by, _ = self.lattice.idx_to_coord(b_idx)
(ax_mm, ay_mm) = self.lattice.geom.lattice_to_world(ax, ay)
(bx_mm, by_mm) = self.lattice.geom.lattice_to_world(bx, by)
# QUANTIZE: Round to grid to prevent float drift
pitch = self.lattice.geom.pitch
origin_x = self.lattice.geom.grid_min_x
origin_y = self.lattice.geom.grid_min_y
ax_mm = origin_x + round((ax_mm - origin_x) / pitch) * pitch
ay_mm = origin_y + round((ay_mm - origin_y) / pitch) * pitch
bx_mm = origin_x + round((bx_mm - origin_x) / pitch) * pitch
by_mm = origin_y + round((by_mm - origin_y) / pitch) * pitch
return {
'net': net,
'layer': self.config.layer_names[layer] if layer < len(self.config.layer_names) else f"L{layer}",
'x1': ax_mm, 'y1': ay_mm, 'x2': bx_mm, 'y2': by_mm,
'width': self.config.grid_pitch * 0.6,
}
def precompute_all_pad_escapes(self, board: Board, nets_to_route: List = None) -> Tuple[List, List]:
"""
Delegate to PadEscapePlanner for precomputing all pad escapes.
NEW: Pre-registers portal locations as via keepouts for iteration 1.
This prevents tracks from routing through portal columns before escape vias are created.
Returns (tracks, vias) for visualization.
"""
if not self.escape_planner:
logger.error("Escape planner not initialized! Call initialize_graph first.")
return ([], [])
result = self.escape_planner.precompute_all_pad_escapes(board, nets_to_route)
# CRITICAL: Copy portals from escape_planner to self.portals
# The pathfinder needs these to route from portal positions, not pad positions
self.portals = self.escape_planner.portals.copy()
logger.info(f"Copied {len(self.portals)} portals from escape planner to pathfinder")
# Cache escape geometry for merging into final payload
self._escape_tracks, self._escape_vias = result
logger.info(f"Cached {len(self._escape_tracks)} escape tracks and {len(self._escape_vias)} escape vias")
# Track escape vias in via spatial arrays to prevent routing collisions
self._track_escape_vias_in_via_usage()
# NOTE: Portal via keepout pre-registration removed - too slow for full-graph
# Via barrel conflicts exist but owner-aware blocking doesn't scale
# TODO: Investigate actual root cause of dangling vias
return result
def _via_world(self, at_idx: int, net: str, from_layer: int, to_layer: int):
x, y, _ = self.lattice.idx_to_coord(at_idx)
(x_mm, y_mm) = self.lattice.geom.lattice_to_world(x, y)
# CRITICAL FIX: Quantize via coordinates to grid (same as _segment_world)
# This ensures via centers EXACTLY match track endpoints (no epsilon mismatch!)
pitch = self.lattice.geom.pitch
origin_x = self.lattice.geom.grid_min_x
origin_y = self.lattice.geom.grid_min_y
x_mm = origin_x + round((x_mm - origin_x) / pitch) * pitch
y_mm = origin_y + round((y_mm - origin_y) / pitch) * pitch
# Normalize layer order (consistent output, KiCad accepts either way)
if from_layer > to_layer:
from_layer, to_layer = to_layer, from_layer
return {
'net': net,
'x': x_mm, 'y': y_mm,
'from_layer': self.config.layer_names[from_layer] if from_layer < len(self.config.layer_names) else f"L{from_layer}",
'to_layer': self.config.layer_names[to_layer] if to_layer < len(self.config.layer_names) else f"L{to_layer}",
'diameter': 0.25, # hole (0.15) + 2×annular (0.05) = 0.25mm
'drill': 0.15, # hole diameter
}
def emit_geometry(self, board: Board) -> Tuple[int, int]:
"""
Convert routed node paths into drawable segments and vias.
- Clean geometry (for KiCad export): only if overuse == 0
- Provisional geometry (for GUI feedback): always generated
CRITICAL: Escape geometry is ALWAYS merged, even with overuse.
Escapes are the connection from pads to the routing grid and must be exported.
"""
# Generate provisional geometry from routing paths
provisional_tracks, provisional_vias = self._generate_geometry_from_paths()
# ALWAYS merge escape geometry with routed geometry
# Deduplicate helper
def _dedupe(items, key_fn):
seen, out = set(), []
for it in items:
k = key_fn(it)
if k in seen:
continue
seen.add(k)
out.append(it)
return out
final_tracks = provisional_tracks
final_vias = provisional_vias
if hasattr(self, '_escape_tracks') and self._escape_tracks:
# Merge escapes first (so they're visually "underneath")
combined_tracks = self._escape_tracks + provisional_tracks
combined_vias = self._escape_vias + provisional_vias
# Deduplicate by geometric signature
final_tracks = _dedupe(
combined_tracks,
lambda t: (t["net"], t["layer"],
round(t["x1"], 3), round(t["y1"], 3),
round(t["x2"], 3), round(t["y2"], 3),
round(t["width"], 3))
)
final_vias = _dedupe(
combined_vias,
lambda v: (v["net"], round(v["x"], 3), round(v["y"], 3),
v.get("from_layer"), v.get("to_layer"),
round(v.get("drill", 0), 3),
round(v.get("diameter", 0), 3))
)
logger.info(f"[ESCAPE-MERGE] escapes={len(self._escape_tracks)} + "
f"routed={len(provisional_tracks)}"
f"total={len(final_tracks)} tracks after dedup")
logger.info(f"[ESCAPE-MERGE] escape_vias={len(self._escape_vias)} + "
f"routed_vias={len(provisional_vias)}"
f"total={len(final_vias)} vias after dedup")
# Store merged geometry as provisional (for GUI display)
self._provisional_geometry = GeometryPayload(final_tracks, final_vias)
# Check for overuse (include via spatial violations)
over_sum, over_cnt = self.accounting.compute_overuse(router_instance=self)
if over_sum > 0:
logger.warning(f"[EMIT] Overuse={over_sum}: showing merged geometry in GUI but not exporting to KiCad")
self._geometry_payload = GeometryPayload([], []) # No clean geometry for export
# Return merged counts so GUI shows escapes + routes
return (len(final_tracks), len(final_vias))
# No overuse: emit clean geometry for KiCad export
logger.info("[EMIT] Routing converged! Exporting clean geometry with escapes")
self._geometry_payload = GeometryPayload(final_tracks, final_vias)
return (len(final_tracks), len(final_vias))
def _generate_geometry_from_paths(self) -> Tuple[List, List]:
"""Generate tracks and vias from net_paths"""
tracks, vias = [], []
for net_id, path in self.net_paths.items():
if not path:
continue
# NOTE: Escape geometry is pre-computed by PadEscapePlanner and cached.
# It will be merged with routed geometry in emit_geometry().
# Generate tracks/vias from main path
run_start = path[0]
prev = path[0]
prev_dir = None
prev_layer = self.lattice.idx_to_coord(prev)[2]
for node in path[1:]:
x0, y0, z0 = self.lattice.idx_to_coord(prev)
x1, y1, z1 = self.lattice.idx_to_coord(node)
# Drop any planar segment on outer layers (shouldn't happen once graph/ROI are fixed)
if z0 == z1 and (z0 == 0 or z0 == self.lattice.layers - 1):
logger.error(f"[EMIT-GUARD] refusing planar segment on outer layer {z0} for net {net_id}")
prev = node
prev_layer = z1
run_start = node
continue
# VALIDATION: Check if nodes are adjacent (Manhattan distance should be 1)
dx = abs(x1 - x0)
dy = abs(y1 - y0)
dz = abs(z1 - z0)
if dz == 0: # Same layer - enforce H/V discipline
# Must be adjacent
if (dx + dy) != 1:
logger.error(f"[GEOMETRY-BUG] Non-adjacent nodes in path for net {net_id}: "
f"({x0},{y0},{z0}) → ({x1},{y1},{z1}), Manhattan dist = {dx+dy}")
logger.error(f"[GEOMETRY-BUG] Path indices: prev={prev}, node={node}")
logger.error(f"[GEOMETRY-BUG] This creates diagonal segment! GPU parent pointers are CORRUPT!")
continue # Skip illegal segment
# Check layer direction discipline
layer_axis = self.lattice.get_legal_axis(z0)
if layer_axis == 'h':
# H layer: y must be constant (horizontal movement)
if dy != 0:
logger.error(f"[LAYER-VIOLATION] H-layer {z0} has vertical move: "
f"({x0},{y0})→({x1},{y1}), dy={dy}")
continue
else: # 'v'
# V layer: x must be constant (vertical movement)
if dx != 0:
logger.error(f"[LAYER-VIOLATION] V-layer {z0} has horizontal move: "
f"({x0},{y0})→({x1},{y1}), dx={dx}")
continue
if z1 != z0:
# flush any pending straight run before via
if prev != run_start:
tracks.append(self._segment_world(run_start, prev, prev_layer, net_id))
vias.append(self._via_world(prev, net_id, z0, z1))
run_start = node
prev_dir = None
else:
dir_vec = (np.sign(x1 - x0), np.sign(y1 - y0))
if prev_dir is None or dir_vec == prev_dir:
# keep extending run
pass
else:
# direction changed: flush previous run
tracks.append(self._segment_world(run_start, prev, prev_layer, net_id))
run_start = prev
prev_dir = dir_vec
prev = node
prev_layer = z1
# flush final run
if prev != run_start:
tracks.append(self._segment_world(run_start, prev, prev_layer, net_id))
# FINAL VALIDATION: Check all tracks are axis-aligned
violations = []
for i, track in enumerate(tracks):
x1, y1 = track['x1'], track['y1']
x2, y2 = track['x2'], track['y2']
# Must be axis-aligned (one coordinate must be constant)
dx = abs(x1 - x2)
dy = abs(y1 - y2)
if dx > 0.001 and dy > 0.001:
violations.append((i, track, dx, dy))
if violations:
logger.error(f"[EMIT-VALIDATION] Found {len(violations)} diagonal segments!")
for i, track, dx, dy in violations[:5]: # Show first 5
logger.error(f" Track {i}: ({track['x1']:.2f},{track['y1']:.2f})->({track['x2']:.2f},{track['y2']:.2f}), "
f"Delta=({dx:.2f},{dy:.2f}) on {track['layer']}")
# In debug mode, raise error
if __debug__:
raise RuntimeError(f"{len(violations)} diagonal segments detected at emission")
else:
logger.info(f"[EMIT-VALIDATION] All {len(tracks)} tracks are axis-aligned ✓")
# Count tracks by layer and direction
layer_stats = {}
for track in tracks:
layer = track['layer']
x1, y1 = track['x1'], track['y1']
x2, y2 = track['x2'], track['y2']
is_horizontal = (abs(y1 - y2) < 0.001)
is_vertical = (abs(x1 - x2) < 0.001)
if layer not in layer_stats:
layer_stats[layer] = {'h': 0, 'v': 0}
if is_horizontal:
layer_stats[layer]['h'] += 1
elif is_vertical:
layer_stats[layer]['v'] += 1
# Log per-layer statistics and check direction discipline
for layer in sorted(layer_stats.keys()):
h_count = layer_stats[layer]['h']
v_count = layer_stats[layer]['v']
logger.info(f"[LAYER-STATS] {layer}: {h_count} horizontal, {v_count} vertical")
# Check if layer has wrong direction (extract layer index from name)
# Assuming layer names like "In1.Cu", "In2.Cu", etc.
if 'In' in layer:
try:
layer_str = layer.replace('In', '').replace('.Cu', '')
layer_num = int(layer_str)
# Odd layers (In1, In3) = H, Even layers (In2, In4) = V
expected_dir = 'h' if layer_num % 2 == 1 else 'v'
if expected_dir == 'h' and v_count > h_count:
logger.warning(f"[LAYER-DIRECTION] {layer} should be H-dominant but has more V traces!")
elif expected_dir == 'v' and h_count > v_count:
logger.warning(f"[LAYER-DIRECTION] {layer} should be V-dominant but has more H traces!")
except (ValueError, IndexError):
pass # Skip if layer name doesn't match expected pattern
return (tracks, vias)
def get_geometry_payload(self):
"""
Get geometry payload for GUI/export.
Returns clean geometry if available (no overuse),
otherwise returns provisional geometry so GUI can still display/export.
"""
# If clean geometry is empty but provisional exists, return provisional
if (not self._geometry_payload.tracks and not self._geometry_payload.vias
and hasattr(self, '_provisional_geometry')
and (self._provisional_geometry.tracks or self._provisional_geometry.vias)):
return self._provisional_geometry
return self._geometry_payload
def get_provisional_geometry(self):
"""Get provisional geometry for GUI feedback (always available)"""
return self._provisional_geometry
# ═══════════════════════════════════════════════════════════════════════════════
# LEGACY API
# ═══════════════════════════════════════════════════════════════════════════════
UnifiedPathFinder = PathFinderRouter
logger.info(f"PathFinder loaded (GPU={'YES' if GPU_AVAILABLE else 'NO'})")