156 lines
5.6 KiB
Python
156 lines
5.6 KiB
Python
import csv
|
|
import argparse
|
|
import re
|
|
import yaml
|
|
import logging
|
|
from collections import defaultdict
|
|
|
|
# Setup logging
|
|
import sys
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
stream=sys.stdout # Force logs to stdout
|
|
)
|
|
|
|
def load_columns_config(column_file_path):
|
|
logging.debug(f"Loading columns configuration from: {column_file_path}")
|
|
with open(column_file_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
def resolve_column(name):
|
|
for col in config.get("columns", []):
|
|
if col["name"].lower() == name.lower():
|
|
logging.debug(f"Resolved column: {name} -> {col}")
|
|
return col
|
|
logging.warning(f"Column '{name}' not found in configuration.")
|
|
return {}
|
|
|
|
return {
|
|
"manufacturer": resolve_column("Manufacturer"),
|
|
"part_number": resolve_column("Part Number"),
|
|
"designator": resolve_column("Designator"),
|
|
}
|
|
|
|
def get_first_matching(row, keys):
|
|
if isinstance(keys, str):
|
|
keys = [keys]
|
|
for key in keys:
|
|
if key in row:
|
|
return row[key]
|
|
return ""
|
|
|
|
def matches_exclude_pattern(row, part_number_column, pattern):
|
|
val = get_first_matching(row, part_number_column)
|
|
result = bool(re.search(pattern, val)) if val and pattern else False
|
|
logging.debug(f"Checking exclude pattern: value='{val}', pattern='{pattern}' -> {result}")
|
|
return result
|
|
|
|
def load_bom(file_path, columns_cfg):
|
|
logging.info(f"Loading BOM from: {file_path}")
|
|
bom = defaultdict(lambda: {"Quantity": 0, "refs": set(), "manufacturer": ""})
|
|
with open(file_path, newline='') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
if matches_exclude_pattern(row, columns_cfg["part_number"]["part_attributes"],
|
|
columns_cfg["part_number"].get("remove_rows_matching")):
|
|
logging.debug(f"Excluding row: {row}")
|
|
continue
|
|
|
|
part = get_first_matching(row, columns_cfg["part_number"]["part_attributes"])
|
|
manufacturer = get_first_matching(row, columns_cfg["manufacturer"]["part_attributes"])
|
|
ref_str = get_first_matching(row, columns_cfg["designator"]["part_attributes"])
|
|
|
|
if not part:
|
|
logging.warning(f"Missing part number in row: {row}")
|
|
continue
|
|
|
|
refs = [r.strip() for r in ref_str.split(',') if r.strip()]
|
|
if columns_cfg["designator"].get("grouped_values_sort") == "asc":
|
|
refs.sort()
|
|
|
|
bom[part]["manufacturer"] = manufacturer
|
|
bom[part]["Quantity"] += len(refs)
|
|
bom[part]["refs"].update(refs)
|
|
|
|
logging.debug(f"Processed part: {part}, manufacturer: {manufacturer}, refs: {refs}")
|
|
|
|
return bom
|
|
|
|
def generate_diff(bom_old, bom_new):
|
|
print("Generating BOM diff")
|
|
all_parts = set(bom_old.keys()) | set(bom_new.keys())
|
|
diff = []
|
|
|
|
for part in all_parts:
|
|
old = bom_old.get(part, {"Quantity": 0, "refs": set(), "manufacturer": bom_new.get(part, {}).get("manufacturer", "")})
|
|
new = bom_new.get(part, {"Quantity": 0, "refs": set(), "manufacturer": old.get("manufacturer", "")})
|
|
|
|
added_refs = new["refs"] - old["refs"]
|
|
removed_refs = old["refs"] - new["refs"]
|
|
adjusted_quantity = len(added_refs) - len(removed_refs)
|
|
|
|
diff_entry = {
|
|
"Part Number": part,
|
|
"Manufacturer": new["manufacturer"] or old["manufacturer"],
|
|
"old_quantity": len(old["refs"]),
|
|
"old_reference_designators": ", ".join(sorted(old["refs"])),
|
|
"adjusted_quantity": adjusted_quantity,
|
|
"removed_reference_designators": ", ".join(sorted(removed_refs)),
|
|
"new_quantity": len(new["refs"]),
|
|
"added_reference_designators": ", ".join(sorted(added_refs)),
|
|
"new_reference_designators": ", ".join(sorted(new["refs"])),
|
|
}
|
|
diff.append(diff_entry)
|
|
logging.debug(f"Diff entry: {diff_entry}")
|
|
|
|
return diff
|
|
|
|
def write_csv(diff, output_path):
|
|
logging.info(f"Writing diff to: {output_path}")
|
|
fieldnames = [
|
|
"Part Number",
|
|
"Manufacturer",
|
|
"old_quantity",
|
|
"old_reference_designators",
|
|
"adjusted_quantity",
|
|
"removed_reference_designators",
|
|
"new_quantity",
|
|
"added_reference_designators",
|
|
"new_reference_designators",
|
|
]
|
|
with open(output_path, 'w', newline='') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in diff:
|
|
writer.writerow(row)
|
|
logging.debug(f"Wrote row to CSV: {row}")
|
|
|
|
def sort_key(item):
|
|
adj = item["adjusted_quantity"]
|
|
priority = 2 if adj == 0 else (0 if adj > 0 else 1)
|
|
return (priority, -abs(adj), item["Part Number"])
|
|
|
|
def main():
|
|
logging.info("Parsing args")
|
|
parser = argparse.ArgumentParser(description="Generate BOM diff CSV")
|
|
parser.add_argument("--bom_old", required=True, help="Path to original BOM CSV")
|
|
parser.add_argument("--bom_new", required=True, help="Path to updated BOM CSV")
|
|
parser.add_argument("--column_file", required=True, help="Path to YAML column mapping")
|
|
parser.add_argument("--out_file", required=True, help="Output diff CSV file")
|
|
|
|
args = parser.parse_args()
|
|
|
|
columns_cfg = load_columns_config(args.column_file)
|
|
bom_old = load_bom(args.bom_old, columns_cfg)
|
|
bom_new = load_bom(args.bom_new, columns_cfg)
|
|
diff = generate_diff(bom_old, bom_new)
|
|
diff.sort(key=sort_key)
|
|
|
|
write_csv(diff, args.out_file)
|
|
logging.info("Diff generation complete.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|