Files
New Design Review ReleaseNotifiableRepo/.allspice/utils/create-release-review-issues-py-allspice.py
Daniel Lindmark ac6e373824 Initial commit
2026-06-04 17:04:11 -05:00

296 lines
11 KiB
Python

import io
import os
import sys
import zipfile
from argparse import ArgumentParser
from typing import Any
from urllib.parse import urlparse
ISSUE_TITLE = "Release file review"
ISSUE_METHOD = "py-allspice wrapper"
DEFAULT_HUB_URL = "https://hub.allspice.io"
LOG_PREFIX = "[release-review-pyallspice]"
def log(message: str) -> None:
print(f"{LOG_PREFIX} {message}")
def log_section(title: str) -> None:
log(f"--- {title} ---")
def quoted(value: str) -> str:
return quote(value, safe="")
def parse_repository_reference(repo_reference: str) -> tuple[str, str]:
repo_reference = repo_reference.strip().removesuffix(".git")
if not repo_reference:
raise ValueError("Repository reference is empty")
if repo_reference.startswith("git@") and ":" in repo_reference:
repo_path = repo_reference.split(":", 1)[1]
elif "://" in repo_reference:
repo_path = urlparse(repo_reference).path.strip("/")
else:
repo_path = repo_reference.strip("/")
repo_path = repo_path.removesuffix(".git")
path_parts = [part for part in repo_path.split("/") if part]
if len(path_parts) < 2:
raise ValueError(f"Could not parse repository owner/name from {repo_reference}")
return path_parts[-2], path_parts[-1]
def read_notification_repositories(path: str) -> list[tuple[str, str]]:
repositories = []
with open(path, "r", encoding="utf-8") as file:
for line in file:
stripped_line = line.strip()
if not stripped_line or stripped_line.startswith("#"):
continue
repositories.append(parse_repository_reference(stripped_line))
return repositories
def list_zip_files(archive_bytes: bytes) -> list[str]:
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive:
return [entry.filename for entry in archive.infolist() if not entry.is_dir()]
def build_file_manifest(
repository: Any,
repository_class: Any,
tag: str,
) -> list[dict[str, Any]]:
archive_name = f"{tag}-source.zip"
log(f"Downloading archive {archive_name!r} via py-allspice get_archive")
try:
archive_bytes = repository.get_archive(tag, repository_class.ArchiveFormat.ZIP)
log(f"Archive {archive_name!r}: downloaded {len(archive_bytes)} bytes")
files = list_zip_files(archive_bytes)
log(f"Archive {archive_name!r}: found {len(files)} files")
for file_path in files[:20]:
log(f"Archive {archive_name!r}: file {file_path}")
if len(files) > 20:
log(f"Archive {archive_name!r}: {len(files) - 20} more files omitted from log preview")
return [{"archive": archive_name, "files": files}]
except Exception as exc:
log(f"ERROR: archive {archive_name!r} could not be inspected: {exc}")
return [{"archive": archive_name, "files": [], "error": str(exc)}]
def format_issue_body(
release_title: str,
tag: str,
release_body: str,
file_manifest: list[dict[str, Any]],
) -> str:
lines = [
f"- Release title: {release_title or '(untitled release)'}",
f"- Release tag: {tag}",
f"- Created by: {ISSUE_METHOD}",
"- Release body:",
]
if release_body:
lines.extend(f" {line}" if line else " " for line in release_body.splitlines())
else:
lines.append(" (empty)")
lines.append("- File manifest:")
if not file_manifest:
lines.append(" - (no release zip files found)")
for archive in file_manifest:
lines.append(f" - {archive['archive']}:")
if archive.get("error"):
lines.append(f" - Error: {archive['error']}")
continue
if not archive["files"]:
lines.append(" - (zip file contains no files)")
continue
lines.extend(f" - {file_path}" for file_path in archive["files"])
return "\n".join(lines) + "\n"
def create_issue(
allspice_client: Any,
target_repository: Any,
body: str,
) -> Any:
owner = target_repository.owner.username
repo = target_repository.name
log(
f"Preparing issue for {owner}/{repo}: title={ISSUE_TITLE!r}, "
f"body_chars={len(body)}, body_lines={len(body.splitlines())}"
)
log(f"Note: label assignment is not supported via the py-allspice high-level API")
from allspice import Issue
issue = Issue.create_issue(allspice_client, target_repository, ISSUE_TITLE, body)
log(f"Issue.create_issue returned: type={type(issue).__name__}")
return issue
def parse_args() -> Any:
parser = ArgumentParser()
parser.add_argument("repository", help="Repository that triggered the release, as owner/repo")
parser.add_argument("release_type", help="Release event type, such as published or edited")
parser.add_argument("release_tag", help="Release tag that triggered the action")
parser.add_argument("release_name", help="Release title/name that triggered the action")
parser.add_argument("release_url", help="Release URL that triggered the action")
parser.add_argument("release_body", help="Release body text that triggered the action")
parser.add_argument("notification_repo_file", help="File containing repos to notify")
parser.add_argument(
"--allspice_hub_url",
default=DEFAULT_HUB_URL,
help=f"AllSpice Hub URL. Defaults to {DEFAULT_HUB_URL}.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
log_section("startup")
log(f"Python: {sys.version.split()[0]} executable={sys.executable}")
log(f"Hub URL: {args.allspice_hub_url}")
log(f"Release repository argument: {args.repository}")
log(f"Release event action: {args.release_type}")
log(f"Release tag: {args.release_tag}")
log(f"Release name chars: {len(args.release_name)}")
log(f"Release URL: {args.release_url}")
log(f"Release body chars from event: {len(args.release_body)}")
log(f"Notification repo file: {args.notification_repo_file}")
log(f"Note: label assignment not supported via py-allspice high-level API")
auth_token = os.environ.get("ALLSPICE_AUTH_TOKEN")
if not auth_token:
print("Please set the environment variable ALLSPICE_AUTH_TOKEN", file=sys.stderr)
return 1
log(f"ALLSPICE_AUTH_TOKEN: present length={len(auth_token)}")
try:
from allspice import AllSpice, Repository
except ModuleNotFoundError:
print(
"The py-allspice wrapper is not installed. "
"Run `pip install -r .allspice/utils/requirements.txt` first.",
file=sys.stderr,
)
return 1
log(f"py-allspice imported successfully")
try:
source_owner, source_repo = parse_repository_reference(args.repository)
except Exception as exc:
log(f"ERROR: could not parse source repository {args.repository!r}: {exc}")
return 1
log(f"Parsed source repository: owner={source_owner} repo={source_repo}")
log_section("allspice client")
try:
allspice = AllSpice(
token_text=auth_token,
allspice_hub_url=args.allspice_hub_url,
)
log(f"AllSpice client initialized")
version = allspice.get_version()
log(f"Hub version: {version}")
user = allspice.get_user()
log(f"Authenticated as: {user.username}")
except Exception as exc:
log(f"ERROR: failed to initialize AllSpice client: {type(exc).__name__}: {exc}")
return 1
log_section("notification targets")
try:
repositories_to_notify = read_notification_repositories(args.notification_repo_file)
except FileNotFoundError:
log(f"ERROR: notification repos file not found: {args.notification_repo_file}")
return 1
except Exception as exc:
log(f"ERROR: could not read notification repos file: {exc}")
return 1
if not repositories_to_notify:
log("ERROR: notification repos file did not contain any repositories")
return 1
log(f"Notification target count: {len(repositories_to_notify)}")
for index, (target_owner, target_repo) in enumerate(repositories_to_notify, start=1):
log(f"Notification target {index}: {target_owner}/{target_repo}")
log_section("release metadata")
try:
log(f"Loading source repository {source_owner}/{source_repo}")
source_repository = allspice.get_repository(source_owner, source_repo)
log(f"Source repository loaded: {source_repository.full_name}")
except Exception as exc:
log(f"ERROR: could not load source repository: {type(exc).__name__}: {exc}")
return 1
release = None
try:
log(f"Loading release by tag {args.release_tag!r}")
release = source_repository.get_release_by_tag(args.release_tag)
log(f"Release loaded: name={release.name!r} tag={release.tag_name!r}")
log(f"Release body chars from API: {len(release.body or '')}")
except Exception as exc:
log(f"WARNING: could not load release metadata: {type(exc).__name__}: {exc}")
log("Falling back to event-provided release name and body")
release_title = getattr(release, "name", None) or args.release_name
release_body = getattr(release, "body", None) or args.release_body
log(f"Using release title: {release_title!r}")
log(f"Using release body chars: {len(release_body)}")
log_section("file manifest")
file_manifest = build_file_manifest(source_repository, Repository, args.release_tag)
issue_body = format_issue_body(release_title, args.release_tag, release_body, file_manifest)
log(
f"Generated issue body: chars={len(issue_body)} lines={len(issue_body.splitlines())} "
f"archives={len(file_manifest)}"
)
for archive in file_manifest:
log(
f"Manifest archive {archive['archive']!r}: "
f"files={len(archive.get('files', []))} error={archive.get('error') or '<none>'}"
)
log_section("issue creation")
failures = []
for index, (target_owner, target_repo) in enumerate(repositories_to_notify, start=1):
log(f"Target {index}/{len(repositories_to_notify)}: creating issue in {target_owner}/{target_repo}")
try:
log(f"Loading target repository object for {target_owner}/{target_repo}")
target_repository = allspice.get_repository(target_owner, target_repo)
log(f"Target repository loaded: {target_repository.full_name}")
issue = create_issue(allspice, target_repository, issue_body)
except Exception as exc:
failures.append(f"{target_owner}/{target_repo}: {exc}")
log(f"ERROR: failed to create issue in {target_owner}/{target_repo}: {exc}")
continue
issue_url = getattr(issue, "html_url", None) or f"#{getattr(issue, 'number', '?')}"
log(
f"Created issue in {target_owner}/{target_repo}: "
f"number={getattr(issue, 'number', None)!r} id={getattr(issue, 'id', None)!r} url={issue_url}"
)
if failures:
log_section("failure summary")
for failure in failures:
log(f"FAILED: {failure}")
return 1
log("Completed release review issue creation successfully")
return 0
if __name__ == "__main__":
sys.exit(main())