Files
New Design Review ReleaseNotifiableRepo/.allspice/utils/create-release-review-issues-api.py
Daniel Lindmark ac6e373824 Initial commit
2026-06-04 17:04:11 -05:00

642 lines
23 KiB
Python

import http.client
import io
import json
import os
import sys
import zipfile
from argparse import ArgumentParser
from dataclasses import dataclass
from typing import Any
from urllib.parse import parse_qsl, quote, urlencode, urljoin, urlparse, urlunparse
ISSUE_TITLE = "Release file review"
ISSUE_METHOD = "raw API (http.client)"
DEFAULT_HUB_URL = "https://hub.allspice.io"
LOG_PREFIX = "[release-review-api]"
ERROR_BODY_SNIPPET_LIMIT = 1200
FILE_PREVIEW_LIMIT = 20
def log(message: str) -> None:
print(f"{LOG_PREFIX} {message}")
def log_section(title: str) -> None:
log(f"--- {title} ---")
def redact_url(url: str) -> str:
parsed_url = urlparse(url)
safe_query = []
for key, value in parse_qsl(parsed_url.query, keep_blank_values=True):
if "token" in key.lower() or "secret" in key.lower():
safe_query.append((key, "<redacted>"))
else:
safe_query.append((key, value))
return urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
urlencode(safe_query),
"",
)
)
def body_snippet(body: bytes, limit: int = ERROR_BODY_SNIPPET_LIMIT) -> str:
text = body.decode("utf-8", errors="replace").replace("\n", "\\n")
if len(text) > limit:
return f"{text[:limit]}... <truncated>"
return text
def summarize_payload(payload: dict[str, Any] | None) -> str:
if payload is None:
return "payload=<none>"
summary = []
for key, value in payload.items():
if key == "body" and isinstance(value, str):
summary.append(f"body_chars={len(value)}")
elif key == "labels" and isinstance(value, list):
summary.append(f"labels={len(value)}")
elif key == "title" and isinstance(value, str):
summary.append(f"title={value!r}")
else:
summary.append(f"{key}={value!r}")
return ", ".join(summary)
class AllSpiceApiError(Exception):
def __init__(self, method: str, url: str, status: int, reason: str, body: bytes):
self.method = method
self.url = url
self.status = status
self.reason = reason
self.body = body
message = body_snippet(body)
super().__init__(f"{method} {redact_url(url)} returned {status} {reason}: {message}")
@dataclass
class HttpResponse:
status: int
reason: str
headers: dict[str, str]
body: bytes
class AllSpiceHttpClient:
def __init__(self, hub_url: str, token: str):
self.hub_url = hub_url.rstrip("/")
self.token = token
self.request_count = 0
def get_json(
self,
endpoint: str,
params: dict[str, Any] | None = None,
allowed_statuses: tuple[int, ...] = (200, 201),
) -> Any:
return self.request_json("GET", endpoint, params=params, allowed_statuses=allowed_statuses)
def post_json(
self,
endpoint: str,
payload: dict[str, Any],
allowed_statuses: tuple[int, ...] = (200, 201, 202),
) -> Any:
return self.request_json(
"POST", endpoint, payload=payload, allowed_statuses=allowed_statuses
)
def request_json(
self,
method: str,
endpoint: str,
payload: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
allowed_statuses: tuple[int, ...] = (200, 201),
) -> Any:
url = self._api_url(endpoint, params)
response = self._request(method, url, payload=payload)
if response.status not in allowed_statuses:
raise AllSpiceApiError(method, url, response.status, response.reason, response.body)
if not response.body:
return {}
try:
return json.loads(response.body.decode("utf-8"))
except json.JSONDecodeError as exc:
log(f"ERROR: JSON decode failed for {method} {redact_url(url)}: {exc}")
log(f"ERROR: response body snippet: {body_snippet(response.body)}")
raise
def download(self, url_or_endpoint: str) -> bytes:
url = self._resolve_download_url(url_or_endpoint)
response = self._request("GET", url, accept="application/octet-stream")
if response.status != 200:
raise AllSpiceApiError("GET", url, response.status, response.reason, response.body)
return response.body
def _api_url(self, endpoint: str, params: dict[str, Any] | None = None) -> str:
path = endpoint if endpoint.startswith("/") else f"/{endpoint}"
url = f"{self.hub_url}/api/v1{path}"
if params:
separator = "&" if "?" in url else "?"
url = f"{url}{separator}{urlencode(params)}"
return url
def _resolve_download_url(self, url_or_endpoint: str) -> str:
if url_or_endpoint.startswith("http://") or url_or_endpoint.startswith("https://"):
return url_or_endpoint
if url_or_endpoint.startswith("/api/v1/"):
return f"{self.hub_url}{url_or_endpoint}"
if url_or_endpoint.startswith("/repos/"):
return f"{self.hub_url}/api/v1{url_or_endpoint}"
if url_or_endpoint.startswith("/"):
return f"{self.hub_url}{url_or_endpoint}"
return f"{self.hub_url}/{url_or_endpoint}"
def _request(
self,
method: str,
url: str,
payload: dict[str, Any] | None = None,
accept: str = "application/json",
redirect_count: int = 0,
) -> HttpResponse:
if redirect_count > 5:
raise RuntimeError(f"Too many redirects while requesting {url}")
parsed_url = urlparse(url)
if parsed_url.scheme not in {"http", "https"}:
raise ValueError(f"Unsupported URL scheme in {url}")
self.request_count += 1
request_id = self.request_count
body = None
headers = {
"Accept": accept,
"Authorization": f"token {self.token}",
"User-Agent": "allspice-release-review-action",
}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
headers["Content-Length"] = str(len(body))
connection_class = (
http.client.HTTPSConnection
if parsed_url.scheme == "https"
else http.client.HTTPConnection
)
connection = connection_class(
parsed_url.hostname,
parsed_url.port,
timeout=60,
)
request_path = urlunparse(
("", "", parsed_url.path or "/", parsed_url.params, parsed_url.query, "")
)
log(
f"HTTP {request_id}: {method} {request_path} "
f"host={parsed_url.netloc} accept={accept}"
)
if payload is not None:
log(f"HTTP {request_id}: payload {summarize_payload(payload)}")
try:
connection.request(method, request_path, body=body, headers=headers)
response = connection.getresponse()
response_body = response.read()
response_reason = response.reason
response_headers = {key.lower(): value for key, value in response.getheaders()}
finally:
connection.close()
content_type = response_headers.get("content-type", "<none>")
log(
f"HTTP {request_id}: response {response.status} {response_reason} "
f"content-type={content_type} bytes={len(response_body)}"
)
if response.status >= 400 and response_body:
log(f"HTTP {request_id}: error body snippet: {body_snippet(response_body)}")
if response.status in {301, 302, 303, 307, 308} and "location" in response_headers:
redirect_url = urljoin(url, response_headers["location"])
redirect_method = "GET" if response.status == 303 else method
redirect_payload = None if redirect_method == "GET" else payload
log(f"HTTP {request_id}: redirecting to {redact_url(redirect_url)}")
return self._request(
redirect_method,
redirect_url,
payload=redirect_payload,
accept=accept,
redirect_count=redirect_count + 1,
)
return HttpResponse(response.status, response_reason, response_headers, response_body)
def quoted(value: str) -> str:
return quote(value, safe="")
def parse_repository_reference(repo_reference: str) -> tuple[str, str]:
repo_reference = repo_reference.strip().removesuffix(".git")
if not repo_reference:
raise ValueError("Repository reference is empty")
if repo_reference.startswith("git@") and ":" in repo_reference:
repo_path = repo_reference.split(":", 1)[1]
elif "://" in repo_reference:
repo_path = urlparse(repo_reference).path.strip("/")
else:
repo_path = repo_reference.strip("/")
repo_path = repo_path.removesuffix(".git")
path_parts = [part for part in repo_path.split("/") if part]
if len(path_parts) < 2:
raise ValueError(f"Could not parse repository owner/name from {repo_reference}")
return path_parts[-2], path_parts[-1]
def read_notification_repositories(path: str) -> list[tuple[str, str]]:
repositories = []
with open(path, "r", encoding="utf-8") as file:
for line in file:
stripped_line = line.strip()
if not stripped_line or stripped_line.startswith("#"):
continue
repositories.append(parse_repository_reference(stripped_line))
return repositories
def get_release(
client: AllSpiceHttpClient,
owner: str,
repo: str,
tag: str,
) -> dict[str, Any]:
endpoint = f"/repos/{quoted(owner)}/{quoted(repo)}/releases/tags/{quoted(tag)}"
return client.get_json(endpoint)
def release_zip_sources(
release: dict[str, Any],
owner: str,
repo: str,
tag: str,
) -> list[tuple[str, str]]:
sources = []
for asset in release.get("assets", []) or []:
name = asset.get("name") or asset.get("browser_download_url") or "release-asset.zip"
content_type = asset.get("content_type") or ""
download_url = asset.get("browser_download_url") or asset.get("download_url")
if not download_url:
continue
if name.lower().endswith(".zip") or content_type in {
"application/zip",
"application/x-zip-compressed",
}:
sources.append((name, download_url))
if sources:
return sources
zipball_url = release.get("zipball_url")
if zipball_url:
return [(f"{tag}-source.zip", zipball_url)]
archive_endpoint = (
f"/repos/{quoted(owner)}/{quoted(repo)}/archive/{quoted(tag)}.zip"
)
return [(f"{tag}-source.zip", archive_endpoint)]
def list_zip_files(archive_bytes: bytes) -> list[str]:
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive:
return [entry.filename for entry in archive.infolist() if not entry.is_dir()]
def build_file_manifest(
client: AllSpiceHttpClient,
release: dict[str, Any],
owner: str,
repo: str,
tag: str,
) -> list[dict[str, Any]]:
manifest = []
archive_sources = release_zip_sources(release, owner, repo, tag)
log(f"Archive sources to inspect: {len(archive_sources)}")
for archive_name, download_reference in archive_sources:
log(f"Downloading archive {archive_name!r} from {redact_url(download_reference)}")
try:
archive_bytes = client.download(download_reference)
files = list_zip_files(archive_bytes)
log(
f"Archive {archive_name!r}: downloaded {len(archive_bytes)} bytes "
f"and found {len(files)} files"
)
for file_path in files[:FILE_PREVIEW_LIMIT]:
log(f"Archive {archive_name!r}: file {file_path}")
if len(files) > FILE_PREVIEW_LIMIT:
log(
f"Archive {archive_name!r}: "
f"{len(files) - FILE_PREVIEW_LIMIT} more files omitted from log preview"
)
manifest.append({"archive": archive_name, "files": files})
except Exception as exc:
log(f"ERROR: archive {archive_name!r} could not be inspected: {exc}")
manifest.append({"archive": archive_name, "files": [], "error": str(exc)})
return manifest
def format_issue_body(
release_title: str,
tag: str,
release_body: str,
file_manifest: list[dict[str, Any]],
) -> str:
lines = [
f"- Release title: {release_title or '(untitled release)'}",
f"- Release tag: {tag}",
f"- Created by: {ISSUE_METHOD}",
"- Release body:",
]
if release_body:
lines.extend(f" {line}" if line else " " for line in release_body.splitlines())
else:
lines.append(" (empty)")
lines.append("- File manifest:")
if not file_manifest:
lines.append(" - (no release zip files found)")
for archive in file_manifest:
lines.append(f" - {archive['archive']}:")
if archive.get("error"):
lines.append(f" - Error: {archive['error']}")
continue
if not archive["files"]:
lines.append(" - (zip file contains no files)")
continue
lines.extend(f" - {file_path}" for file_path in archive["files"])
return "\n".join(lines) + "\n"
def get_repository_labels(
client: AllSpiceHttpClient,
owner: str,
repo: str,
) -> list[dict[str, Any]]:
labels = []
endpoint = f"/repos/{quoted(owner)}/{quoted(repo)}/labels"
for page in range(1, 101):
log(f"Loading labels for {owner}/{repo}, page {page}")
page_labels = client.get_json(endpoint, params={"page": page, "limit": 50})
if not page_labels:
log(f"Labels for {owner}/{repo}, page {page}: empty response")
break
if isinstance(page_labels, dict) and "data" in page_labels:
page_labels = page_labels["data"]
elif isinstance(page_labels, dict) and "labels" in page_labels:
page_labels = page_labels["labels"]
elif isinstance(page_labels, dict):
log(
f"Labels for {owner}/{repo}, page {page}: unexpected keys "
f"{sorted(page_labels.keys())}; skipping labels"
)
break
if not isinstance(page_labels, list):
log(
f"Labels for {owner}/{repo}, page {page}: unexpected response "
f"type {type(page_labels).__name__}; skipping labels"
)
break
log(f"Labels for {owner}/{repo}, page {page}: {len(page_labels)} labels")
labels.extend(page_labels)
if len(page_labels) < 50:
break
return labels
def resolve_label_ids(
client: AllSpiceHttpClient,
owner: str,
repo: str,
label_names: list[str],
) -> list[int]:
if not label_names:
log(f"No issue labels requested for {owner}/{repo}")
return []
try:
labels = get_repository_labels(client, owner, repo)
except Exception as exc:
print(f"Warning: could not load labels for {owner}/{repo}: {exc}", file=sys.stderr)
return []
labels_by_name = {
label.get("name"): label for label in labels if isinstance(label, dict)
}
log(f"Loaded {len(labels_by_name)} labels for {owner}/{repo}")
label_ids = []
for label_name in label_names:
label = labels_by_name.get(label_name)
if label is None:
print(
f"Warning: label {label_name!r} was not found in {owner}/{repo}; skipping it",
file=sys.stderr,
)
continue
label_ids.append(label["id"])
log(f"Resolved label {label_name!r} to id {label['id']} in {owner}/{repo}")
return label_ids
def create_issue(
client: AllSpiceHttpClient,
owner: str,
repo: str,
body: str,
label_names: list[str],
) -> dict[str, Any]:
payload: dict[str, Any] = {
"title": ISSUE_TITLE,
"body": body,
"closed": False,
}
log(
f"Preparing issue for {owner}/{repo}: title={ISSUE_TITLE!r}, "
f"body_chars={len(body)}, body_lines={len(body.splitlines())}, "
f"requested_labels={label_names}"
)
label_ids = resolve_label_ids(client, owner, repo, label_names)
if label_ids:
payload["labels"] = label_ids
log(f"Issue for {owner}/{repo}: sending label ids {label_ids}")
else:
log(f"Issue for {owner}/{repo}: sending no labels")
endpoint = f"/repos/{quoted(owner)}/{quoted(repo)}/issues"
return client.post_json(endpoint, payload)
def parse_args() -> Any:
parser = ArgumentParser()
parser.add_argument("repository", help="Repository that triggered the release, as owner/repo")
parser.add_argument("release_type", help="Release event type, such as published or edited")
parser.add_argument("release_tag", help="Release tag that triggered the action")
parser.add_argument("release_name", help="Release title/name that triggered the action")
parser.add_argument("release_url", help="Release URL that triggered the action")
parser.add_argument("release_body", help="Release body text that triggered the action")
parser.add_argument("notification_repo_file", help="File containing repos to notify")
parser.add_argument(
"--allspice_hub_url",
default=DEFAULT_HUB_URL,
help=f"AllSpice Hub URL. Defaults to {DEFAULT_HUB_URL}.",
)
parser.add_argument(
"--issue_label",
action="append",
default=[],
help="Issue label name to add if it exists in the target repository.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
log_section("startup")
log(f"Python: {sys.version.split()[0]} executable={sys.executable}")
log(f"Hub URL: {args.allspice_hub_url}")
log(f"Release repository argument: {args.repository}")
log(f"Release event action: {args.release_type}")
log(f"Release tag: {args.release_tag}")
log(f"Release name chars: {len(args.release_name)}")
log(f"Release URL: {args.release_url}")
log(f"Release body chars from event: {len(args.release_body)}")
log(f"Notification repo file: {args.notification_repo_file}")
log(f"Requested issue labels: {args.issue_label if args.issue_label else '<none>'}")
auth_token = os.environ.get("ALLSPICE_AUTH_TOKEN")
if not auth_token:
print("Please set the environment variable ALLSPICE_AUTH_TOKEN", file=sys.stderr)
return 1
log(f"ALLSPICE_AUTH_TOKEN: present length={len(auth_token)}")
try:
source_owner, source_repo = parse_repository_reference(args.repository)
except Exception as exc:
log(f"ERROR: could not parse source repository {args.repository!r}: {exc}")
return 1
log(f"Parsed source repository: owner={source_owner} repo={source_repo}")
client = AllSpiceHttpClient(args.allspice_hub_url, auth_token)
log_section("notification targets")
try:
repositories_to_notify = read_notification_repositories(args.notification_repo_file)
except FileNotFoundError:
log(f"ERROR: notification repos file not found: {args.notification_repo_file}")
return 1
except Exception as exc:
log(f"ERROR: could not read notification repos file: {exc}")
return 1
if not repositories_to_notify:
log("ERROR: notification repos file did not contain any repositories")
return 1
log(f"Notification target count: {len(repositories_to_notify)}")
for index, (target_owner, target_repo) in enumerate(repositories_to_notify, start=1):
log(f"Notification target {index}: {target_owner}/{target_repo}")
log_section("release metadata")
try:
log(f"Loading release by tag {args.release_tag!r} from {source_owner}/{source_repo}")
release = get_release(client, source_owner, source_repo, args.release_tag)
except Exception as exc:
log(f"WARNING: could not load release metadata from the API: {exc}")
release = {}
if release:
assets = release.get("assets", []) or []
log(f"Release metadata keys: {sorted(release.keys())}")
log(f"Release metadata name: {release.get('name')!r}")
log(f"Release metadata tag_name: {release.get('tag_name')!r}")
log(f"Release metadata html_url: {release.get('html_url') or '<none>'}")
log(f"Release metadata zipball_url present: {bool(release.get('zipball_url'))}")
log(f"Release metadata asset count: {len(assets)}")
for index, asset in enumerate(assets, start=1):
log(
f"Release asset {index}: "
f"name={asset.get('name')!r} content_type={asset.get('content_type')!r} "
f"size={asset.get('size')!r}"
)
else:
log("Release metadata unavailable; archive download will fall back to tag source zip")
release_title = release.get("name") or args.release_name
release_body = release.get("body") or args.release_body
log_section("file manifest")
file_manifest = build_file_manifest(
client,
release,
source_owner,
source_repo,
args.release_tag,
)
issue_body = format_issue_body(release_title, args.release_tag, release_body, file_manifest)
log(
f"Generated issue body: chars={len(issue_body)} lines={len(issue_body.splitlines())} "
f"archives={len(file_manifest)}"
)
for archive in file_manifest:
log(
f"Manifest archive {archive['archive']!r}: "
f"files={len(archive.get('files', []))} error={archive.get('error') or '<none>'}"
)
log_section("issue creation")
failures = []
for index, (target_owner, target_repo) in enumerate(repositories_to_notify, start=1):
log(f"Target {index}/{len(repositories_to_notify)}: creating issue in {target_owner}/{target_repo}")
try:
issue = create_issue(
client,
target_owner,
target_repo,
issue_body,
args.issue_label,
)
except Exception as exc:
failures.append(f"{target_owner}/{target_repo}: {exc}")
log(f"ERROR: failed to create issue in {target_owner}/{target_repo}: {exc}")
continue
issue_url = issue.get("html_url") or issue.get("url") or f"#{issue.get('number')}"
log(
f"Created issue in {target_owner}/{target_repo}: "
f"number={issue.get('number')!r} id={issue.get('id')!r} url={issue_url}"
)
if failures:
log_section("failure summary")
for failure in failures:
log(f"FAILED: {failure}")
return 1
log("Completed release review issue creation successfully")
return 0
if __name__ == "__main__":
sys.exit(main())