bom_diff/entrypoint.py

156 lines
5.6 KiB
Python

import csv
import argparse
import re
import yaml
import logging
from collections import defaultdict
# Setup logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stdout # Force logs to stdout
)
def load_columns_config(column_file_path):
logging.debug(f"Loading columns configuration from: {column_file_path}")
with open(column_file_path, "r") as f:
config = yaml.safe_load(f)
def resolve_column(name):
for col in config.get("columns", []):
if col["name"].lower() == name.lower():
logging.debug(f"Resolved column: {name} -> {col}")
return col
logging.warning(f"Column '{name}' not found in configuration.")
return {}
return {
"manufacturer": resolve_column("Manufacturer"),
"part_number": resolve_column("Part Number"),
"designator": resolve_column("Designator"),
}
def get_first_matching(row, keys):
if isinstance(keys, str):
keys = [keys]
for key in keys:
if key in row:
return row[key]
return ""
def matches_exclude_pattern(row, part_number_column, pattern):
val = get_first_matching(row, part_number_column)
result = bool(re.search(pattern, val)) if val and pattern else False
logging.debug(f"Checking exclude pattern: value='{val}', pattern='{pattern}' -> {result}")
return result
def load_bom(file_path, columns_cfg):
logging.info(f"Loading BOM from: {file_path}")
bom = defaultdict(lambda: {"Quantity": 0, "refs": set(), "manufacturer": ""})
with open(file_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if matches_exclude_pattern(row, columns_cfg["part_number"]["part_attributes"],
columns_cfg["part_number"].get("remove_rows_matching")):
logging.debug(f"Excluding row: {row}")
continue
part = get_first_matching(row, columns_cfg["part_number"]["part_attributes"])
manufacturer = get_first_matching(row, columns_cfg["manufacturer"]["part_attributes"])
ref_str = get_first_matching(row, columns_cfg["designator"]["part_attributes"])
if not part:
logging.warning(f"Missing part number in row: {row}")
continue
refs = [r.strip() for r in ref_str.split(',') if r.strip()]
if columns_cfg["designator"].get("grouped_values_sort") == "asc":
refs.sort()
bom[part]["manufacturer"] = manufacturer
bom[part]["Quantity"] += len(refs)
bom[part]["refs"].update(refs)
logging.debug(f"Processed part: {part}, manufacturer: {manufacturer}, refs: {refs}")
return bom
def generate_diff(bom_old, bom_new):
print("Generating BOM diff")
all_parts = set(bom_old.keys()) | set(bom_new.keys())
diff = []
for part in all_parts:
old = bom_old.get(part, {"Quantity": 0, "refs": set(), "manufacturer": bom_new.get(part, {}).get("manufacturer", "")})
new = bom_new.get(part, {"Quantity": 0, "refs": set(), "manufacturer": old.get("manufacturer", "")})
added_refs = new["refs"] - old["refs"]
removed_refs = old["refs"] - new["refs"]
adjusted_quantity = len(added_refs) - len(removed_refs)
diff_entry = {
"Part Number": part,
"Manufacturer": new["manufacturer"] or old["manufacturer"],
"old_quantity": len(old["refs"]),
"old_reference_designators": ", ".join(sorted(old["refs"])),
"adjusted_quantity": adjusted_quantity,
"removed_reference_designators": ", ".join(sorted(removed_refs)),
"new_quantity": len(new["refs"]),
"added_reference_designators": ", ".join(sorted(added_refs)),
"new_reference_designators": ", ".join(sorted(new["refs"])),
}
diff.append(diff_entry)
logging.debug(f"Diff entry: {diff_entry}")
return diff
def write_csv(diff, output_path):
logging.info(f"Writing diff to: {output_path}")
fieldnames = [
"Part Number",
"Manufacturer",
"old_quantity",
"old_reference_designators",
"adjusted_quantity",
"removed_reference_designators",
"new_quantity",
"added_reference_designators",
"new_reference_designators",
]
with open(output_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in diff:
writer.writerow(row)
logging.debug(f"Wrote row to CSV: {row}")
def sort_key(item):
adj = item["adjusted_quantity"]
priority = 2 if adj == 0 else (0 if adj > 0 else 1)
return (priority, -abs(adj), item["Part Number"])
def main():
logging.info("Parsing args")
parser = argparse.ArgumentParser(description="Generate BOM diff CSV")
parser.add_argument("--bom_old", required=True, help="Path to original BOM CSV")
parser.add_argument("--bom_new", required=True, help="Path to updated BOM CSV")
parser.add_argument("--column_file", required=True, help="Path to YAML column mapping")
parser.add_argument("--out_file", required=True, help="Output diff CSV file")
args = parser.parse_args()
columns_cfg = load_columns_config(args.column_file)
bom_old = load_bom(args.bom_old, columns_cfg)
bom_new = load_bom(args.bom_new, columns_cfg)
diff = generate_diff(bom_old, bom_new)
diff.sort(key=sort_key)
write_csv(diff, args.out_file)
logging.info("Diff generation complete.")
if __name__ == "__main__":
main()