6
mirror of https://github.com/AllSpiceIO/cofactr-cogs.git synced 2025-04-21 16:13:21 +00:00
cofactr-cogs/entrypoint.py
2024-05-13 19:02:10 -04:00

246 lines
8.0 KiB
Python
Executable File

#! /usr/bin/env python3
# Compute the Cost of Goods Sold for a PrjPcb file.
# This example doesn't depend on py-allspice, but requires a BOM CSV file to
# run. You can use the `generate_bom.py` script in this directory to generate
# a BOM CSV.
from argparse import ArgumentParser
from contextlib import ExitStack
import csv
import os
import sys
import requests
def fetch_price_for_part(part_number: str) -> dict[int, float]:
"""
Get the price of a component per n units.
The return value of this function is a mapping of number of units to the
price in dollars per unit if you purchase that many units. For example::
{
1: 0.5,
10: 0.45,
500: 0.4,
100: 0.35,
}
In this case, the price per unit is 0.5, the price per unit if you buy 10
or more is 0.45, the price per unit if you buy 50 or more is 0.4, and so on.
Your breakpoints can be any positive integer.
The implementation of this function depends on the API you are using to get
pricing data. This is an example implementation that uses the cofactr API,
and will not work unless you have a cofactr API key. You will need to
replace this function with your own implementation if you use some other
API, such as Octopart or TrustedParts. You have access to the `requests`
python library to perform HTTP requests.
:param part_number: A part number by which to search for the component.
:returns: A mapping of price breakpoints to the price at that breakpoint.
"""
if part_number.startswith("NOTAPART"):
return {}
api_key = os.environ.get("COFACTR_API_KEY")
client_id = os.environ.get("COFACTR_CLIENT_ID")
if api_key is None or client_id is None:
raise ValueError(
"Please set the COFACTR_API_KEY and COFACTR_CLIENT_ID environment variables"
)
search_response = requests.get(
"https://graph.cofactr.com/products/",
headers={
"X-API-KEY": api_key,
"X-CLIENT-ID": client_id,
},
params=(
("q", part_number),
("schema", "product-offers-v0"),
("external", True),
("limit", 1),
),
)
if search_response.status_code != 200:
print(
f"Warning: Received status code {search_response.status_code} for {part_number}",
file=sys.stderr,
)
return {}
search_results = search_response.json()
try:
reference_prices = search_results.get("data", [])[0].get("reference_prices")
except IndexError:
print(
f"Warning: No results found for {part_number}",
file=sys.stderr,
)
return {}
prices = {int(price["quantity"]): float(price["price"]) for price in reference_prices}
return prices
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"bom_file",
help="The path to the BOM file.",
)
parser.add_argument(
"--quantities",
help=(
"A comma-separated list of quantities of PCBs to compute the COGS "
+ "for. Defaults to '%(default)s'."
),
default="1,10,100,1000",
)
parser.add_argument(
"--bom-part-number-column",
help="The name of the part number column in the BOM file. Defaults to '%(default)s'.",
default="Part Number",
)
parser.add_argument(
"--bom-quantity-column",
help="The name of the quantity column in the BOM file. Defaults to '%(default)s'.",
default="Quantity",
)
parser.add_argument(
"--output-file",
help="The path to the output file. Defaults to stdout, i.e. printing to the console.",
)
args = parser.parse_args()
quantities = [int(quantity) for quantity in args.quantities.split(",")]
part_number_columns = (
args.bom_part_number_column.split(",") if args.bom_part_number_column else []
)
if not part_number_columns:
print("No part number columns given", file=sys.stderr)
sys.exit(1)
quantity_column = args.bom_quantity_column
with open(args.bom_file, "r") as bom_file:
bom_csv = csv.DictReader(bom_file)
parts = [
part
for part in bom_csv
if any(
part[part_number_column] and part[part_number_column] != ""
for part_number_column in part_number_columns
)
]
print(f"Computing COGS for {len(parts)} parts", file=sys.stderr)
print(f"Fetching prices for {len(parts)} parts", file=sys.stderr)
prices_for_parts = {}
for part in parts:
# Try each part number.
for part_number_column in part_number_columns:
part_number = part[part_number_column]
# Fetch this part number's prices from the API.
fetched_prices = fetch_price_for_part(part_number)
if fetched_prices and len(fetched_prices) > 0:
prices_for_parts[part_number] = fetched_prices
# A price was found for this part number. We can stop trying
# other part numbers.
break
print(f"Found prices for {len(prices_for_parts)} parts", file=sys.stderr)
if len(prices_for_parts) == 0:
print("No prices found for any parts", file=sys.stderr)
sys.exit(1)
headers = [
"Part Number",
"Quantity",
]
for quantity in quantities:
headers.append(f"Per Unit at {quantity}")
headers.append(f"Total at {quantity}")
rows = []
totals: dict[int, float] = {quantity: 0 for quantity in quantities}
for part in parts:
part_quantity = int(part[quantity_column])
first_part_number = None
prices_found = False
for part_number_column in part_number_columns:
part_number = part[part_number_column]
if first_part_number is None:
first_part_number = part_number
prices = prices_for_parts.get(part_number)
if prices is None:
# Prices not found. Try the next part number.
continue
current_row = [part_number, part_quantity]
for quantity in quantities:
try:
largest_breakpoint_less_than_qty = max(
[breakpoint for breakpoint in prices.keys() if breakpoint <= quantity]
)
price_at_breakpoint = prices[largest_breakpoint_less_than_qty]
current_row.append(price_at_breakpoint)
total_for_part_at_quantity = price_at_breakpoint * part_quantity
current_row.append(total_for_part_at_quantity)
totals[quantity] += total_for_part_at_quantity
except (ValueError, KeyError):
current_row.append(None)
current_row.append(None)
# We found prices for this part number. We can stop trying other part numbers.
prices_found = True
rows.append(current_row)
break
if not prices_found:
# No prices found for any part number. Add a row with the first part number and no
# prices.
assert first_part_number is not None
current_row = [first_part_number, part_quantity]
for quantity in quantities:
current_row.append(None)
current_row.append(None)
rows.append(current_row)
with ExitStack() as stack:
if args.output_file:
file = stack.enter_context(open(args.output_file, "w"))
writer = csv.writer(file)
else:
writer = csv.writer(sys.stdout)
writer.writerow(headers)
writer.writerows(rows)
totals_row = ["Totals", None]
for quantity in quantities:
totals_row.append(None)
totals_row.append(str(totals[quantity]))
writer.writerow(totals_row)
print("Computed COGS", file=sys.stderr)