llm-review/lib.py
Shrikanth Upadhayaya 6b64bec258
Remove think tool and view bounds from JSON
The guided thinking within the tags works better and doesn't run the
risk of hitting request rate limits; the DR comment poster needs to use
SVG coordinates, not JSON.
2025-04-02 12:03:17 -04:00

212 lines
5.7 KiB
Python

# cSpell:words polylines beziers pydai
import asyncio
import logging
from typing import Any, Optional, Sequence
from xml.etree import ElementTree as ET
from playwright.async_api import async_playwright
from pydantic_ai import Agent
from pydantic_ai import exceptions as pydai_exceptions
from pydantic_ai.agent import AgentRunResult
from pydantic_ai.messages import UserContent
def filter_component_details(json: dict) -> dict:
"""
To reduce the number of tokens a single component lookup uses, this function
filters out graphical details from the JSON response, as the agent already
has access to an image of the schematic.
"""
if not json:
return {}
filtered = json.copy()
for key in [
"polylines",
"polygons",
"rectangles",
"arcs",
"ellipses",
"beziers",
"bitmaps",
"texts",
"position",
]:
if key in filtered:
del filtered[key]
if "attributes" in json:
filtered["attributes"] = {}
for key, attr in json["attributes"].items():
filtered["attributes"][attr.get("name")] = attr.get("value")
if "pins" in json:
filtered["pins"] = {}
for pin_id, pin in json["pins"].items():
filtered["pins"][pin_id] = {
"designator": pin.get("designator"),
"electrical_type": pin.get("electrical_type"),
}
return filtered
def filter_schematic_page(page_json: dict[str, Any]) -> dict[str, Any]:
page = page_json.copy()
for key in [
"title_blocks",
"graphics",
"fonts",
"border",
"ui_group_id",
"ui_group_name",
"wires",
"no_connects",
"junctions",
]:
if key in page:
del page[key]
if "components" in page:
filtered_components = {}
for comp_id, component in page["components"].items():
filtered_components[comp_id] = filter_component_details(component)
page["components"] = filtered_components
if "ports" in page:
filtered_components = {}
for comp_id, component in page["ports"].items():
filtered_components[comp_id] = filter_component_details(component)
page["ports"] = filtered_components
return page
def filter_schematic_json(schematic_json: dict[str, Any]) -> dict[str, Any]:
filtered_json = schematic_json.copy()
new_pages = []
for page in filtered_json.get("pages", []):
filtered_page = filter_schematic_page(page)
new_pages.append(filtered_page)
filtered_json["pages"] = new_pages
return filtered_json
def split_multipage_svg(svg_text: str) -> list[str]:
"""
Split a multi-page SVG into individual SVG files, one for each page.
Uses ElementTree for proper XML parsing.
Args:
svg_text (str): The content of the multi-page SVG file.
Returns:
list: List of the SVG contents for each page.
"""
ET.register_namespace("", "http://www.w3.org/2000/svg")
parser = ET.XMLParser(encoding="utf-8")
root = ET.fromstring(svg_text, parser=parser)
children = list(root)
# Each pair of <style> and <g> is one page.
page_pairs = []
for i in range(len(children) - 1):
current = children[i]
next_elem = children[i + 1]
if current.tag.endswith("}style") and next_elem.tag.endswith("}g"):
page_pairs.append((current, next_elem))
output_files = []
for i, (style_elem, g_elem) in enumerate(page_pairs):
new_svg = ET.Element("svg")
for attr, value in root.attrib.items():
new_svg.set(attr, value)
original_id = root.get("id", "")
new_svg.set("id", f"{original_id}" if original_id else f"page-{i + 1}")
width = g_elem.get("data-width")
height = g_elem.get("data-height")
view_box: str = g_elem.get("data-view-box")
if width:
new_svg.set("width", width)
if height:
new_svg.set("height", height)
new_svg.set("viewBox", view_box)
new_svg.append(style_elem)
del g_elem.attrib["transform"]
new_svg.append(g_elem)
svg_str = ET.tostring(new_svg, encoding="unicode")
output_files.append(svg_str)
return output_files
async def render_svg(svg_path: str, output_path: str) -> None:
"""
Render an SVG file to a PNG image using Playwright.
"""
async with async_playwright() as p:
browser = await p.firefox.launch()
context = await browser.new_context(viewport={"width": 1920, "height": 1080})
page = await context.new_page()
await page.goto(f"file://{svg_path}")
await page.screenshot(path=output_path, full_page=False)
await context.close()
await browser.close()
async def run_agent_with_retries[DT, RT](
agent: Agent[DT, RT],
inputs: Sequence[UserContent],
deps: DT,
max_attempts: int,
retry_delay: int,
logger: logging.Logger,
) -> Optional[AgentRunResult[RT]]:
attempts = 0
result = None
while attempts < max_attempts:
try:
result = await agent.run(inputs, deps=deps)
break
except pydai_exceptions.ModelHTTPError as e:
if e.status_code in [429, 529]:
attempts += 1
cause = "Rate limited" if e.status_code == 429 else "Overloaded"
logger.warning(
f"{cause}; sleeping {retry_delay}s before "
f"retrying attempt {attempts}"
)
logger.debug(f"Error: {e}")
await asyncio.sleep(retry_delay)
continue
else:
raise e
return result