423 lines
17 KiB
Python
Executable File
423 lines
17 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from argparse import ArgumentParser
|
|
from dataclasses import dataclass, field
|
|
import requests
|
|
import zipfile
|
|
import shutil
|
|
import json
|
|
import csv
|
|
import os
|
|
|
|
DIGIKEY_API_URL_BASE = "https://api.digikey.com"
|
|
DIGIKEY_API_AUTH_ENDPOINT = DIGIKEY_API_URL_BASE + "/v1/oauth2/token"
|
|
DIGIKEY_API_V4_KEYWORD_SEARCH_ENDPOINT = (
|
|
DIGIKEY_API_URL_BASE + "/products/v4/search/keyword"
|
|
)
|
|
|
|
|
|
################################################################################
|
|
@dataclass
|
|
class ComponentData:
|
|
associated_refdes: str = ""
|
|
part_description: str = None
|
|
mfr_name: str = None
|
|
mfr_part_number: str = None
|
|
photo_url: str = None
|
|
datasheet_url: str = None
|
|
product_url: str = None
|
|
qty_available: str = None
|
|
lifecycle_status: str = None
|
|
eol_status: str = None
|
|
discontinued_status: str = None
|
|
pricing: str = None
|
|
package_case: str = None
|
|
supplier_device_package: str = None
|
|
operating_temp: str = None
|
|
xy_size: str = None
|
|
height: str = None
|
|
thickness: str = None
|
|
rohs_status: str = None
|
|
moisture_sensitivity_level: str = None
|
|
reach_status: str = None
|
|
eccn: str = None
|
|
htsus: str = None
|
|
categories: list = field(default_factory=list)
|
|
cogs_breakdown: list = field(default_factory=list)
|
|
|
|
|
|
################################################################################
|
|
def get_access_token(url, client_id, client_secret):
|
|
# Populate request header
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
}
|
|
# Post the request and get the response
|
|
response = requests.post(
|
|
url,
|
|
data={"grant_type": "client_credentials"},
|
|
headers=headers,
|
|
auth=(client_id, client_secret),
|
|
)
|
|
# Populate the access token return value
|
|
access_token = (
|
|
response.json()["access_token"] if response.status_code == 200 else None
|
|
)
|
|
# Return response status code and access token
|
|
return (response.status_code, access_token)
|
|
|
|
|
|
################################################################################
|
|
def query_digikey_v4_API_keyword_search(
|
|
url,
|
|
client_id,
|
|
access_token,
|
|
locale_site,
|
|
locale_language,
|
|
locale_currency,
|
|
customer_id,
|
|
keyword,
|
|
):
|
|
# Populate request header
|
|
headers = {
|
|
"charset": "utf-8",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"Authorization": "Bearer " + access_token,
|
|
"X-DIGIKEY-Client-Id": client_id,
|
|
"X-DIGIKEY-Locale-Site": locale_site,
|
|
"X-DIGIKEY-Locale-Language": locale_language,
|
|
"X-DIGIKEY-Locale-Currency": locale_currency,
|
|
"X-DIGIKEY-Customer-Id": customer_id,
|
|
}
|
|
# Populate request data
|
|
request_data = {
|
|
"Keywords": keyword,
|
|
}
|
|
# Post the request and get the response
|
|
response = requests.post(
|
|
url,
|
|
data=str(json.dumps(request_data)),
|
|
headers=headers,
|
|
)
|
|
# Populate the search result return value
|
|
keyword_search_json = (
|
|
response.json() if response.status_code == 200 else response.text
|
|
)
|
|
# Return response status code and search result
|
|
return (response.status_code, keyword_search_json)
|
|
|
|
|
|
################################################################################
|
|
def extract_data_from_digikey_search_response(keyword_search_json):
|
|
# Initialize a component data object to store the extracted info
|
|
part_data = ComponentData()
|
|
# Extract product data from exact match
|
|
product_data = (
|
|
(keyword_search_json["ExactMatches"])[0]
|
|
if keyword_search_json["ExactMatches"]
|
|
else None
|
|
)
|
|
# If the product data matched with valid results, process the line item
|
|
if product_data:
|
|
# Get the product description
|
|
part_data.part_description = product_data["Description"]["ProductDescription"]
|
|
# Get the manufacturer name
|
|
part_data.mfr_name = product_data["Manufacturer"]["Name"]
|
|
# Get the manufacturer part number
|
|
part_data.mfr_part_number = product_data["ManufacturerProductNumber"]
|
|
# Save the photo from the photo URL
|
|
part_data.photo_url = product_data["PhotoUrl"]
|
|
# Get the datasheet URL
|
|
part_data.datasheet_url = product_data["DatasheetUrl"]
|
|
# Get the product URL
|
|
part_data.product_url = product_data["ProductUrl"]
|
|
# Get the in stock quantity
|
|
part_data.qty_available = product_data["QuantityAvailable"]
|
|
# Get the part lifecycle, EOL, and discontinued status
|
|
part_data.lifecycle_status = product_data["ProductStatus"]["Status"]
|
|
part_data.eol_status = product_data["EndOfLife"]
|
|
part_data.discontinued_status = product_data["Discontinued"]
|
|
# Get the pricing information
|
|
part_data.pricing = product_data["ProductVariations"]
|
|
# Remove Digi-Reel and rename as DR/CT if both Digi-Reel and Cut-Tape exist
|
|
pricing_variations = [
|
|
variation["PackageType"]["Name"] for variation in part_data.pricing
|
|
]
|
|
cut_tape_idx = digi_reel_idx = [
|
|
i
|
|
for i in range(0, len(pricing_variations))
|
|
if "Cut Tape" in pricing_variations[i]
|
|
]
|
|
digi_reel_idx = [
|
|
i
|
|
for i in range(0, len(pricing_variations))
|
|
if "Digi-Reel" in pricing_variations[i]
|
|
]
|
|
if cut_tape_idx and digi_reel_idx:
|
|
part_data.pricing[cut_tape_idx[0]]["PackageType"]["Name"] = (
|
|
"Cut Tape (CT) & Digi-Reel®"
|
|
)
|
|
del part_data.pricing[digi_reel_idx[0]]
|
|
# Initialize part parameter variables
|
|
part_data.package_case = part_data.supplier_device_package = (
|
|
part_data.operating_temp
|
|
) = part_data.xy_size = part_data.height = part_data.thickness = None
|
|
# Get product parameter information
|
|
for parameter in product_data["Parameters"]:
|
|
if "ParameterText" in parameter.keys():
|
|
# Get the package / case
|
|
if parameter["ParameterText"] == "Package / Case":
|
|
part_data.package_case = parameter["ValueText"]
|
|
# Get the supplier device package
|
|
if parameter["ParameterText"] == "Supplier Device Package":
|
|
part_data.supplier_device_package = parameter["ValueText"]
|
|
# Get the operating temperature range
|
|
if parameter["ParameterText"] == "Operating Temperature":
|
|
part_data.operating_temp = parameter["ValueText"]
|
|
# Get the package XY dimensions
|
|
if "Size" in parameter["ParameterText"]:
|
|
part_data.xy_size = parameter["ValueText"]
|
|
# Get the package height or thickness
|
|
if "Height" in parameter["ParameterText"]:
|
|
part_data.height = parameter["ValueText"]
|
|
if "Thickness" in parameter["ParameterText"]:
|
|
part_data.thickness = parameter["ValueText"]
|
|
# Get environmental and classification data
|
|
try:
|
|
part_data.rohs_status = product_data["Classifications"]["RohsStatus"]
|
|
part_data.moisture_sensitivity_level = product_data["Classifications"][
|
|
"MoistureSensitivityLevel"
|
|
]
|
|
part_data.reach_status = product_data["Classifications"]["ReachStatus"]
|
|
part_data.eccn = product_data["Classifications"]["ExportControlClassNumber"]
|
|
part_data.htsus = product_data["Classifications"]["HtsusCode"]
|
|
except KeyError:
|
|
pass
|
|
# Get the category chain
|
|
part_data.categories.append(product_data["Category"]["Name"])
|
|
child_categories = product_data["Category"]["ChildCategories"]
|
|
while True:
|
|
if child_categories:
|
|
part_data.categories.append(child_categories[0]["Name"])
|
|
child_categories = child_categories[0]["ChildCategories"]
|
|
else:
|
|
break
|
|
|
|
# Return the extracted part data
|
|
return part_data
|
|
|
|
|
|
################################################################################
|
|
def get_prices_for_target_qtys(part_data, single_pcb_part_qty, pcb_quantities):
|
|
# Initialize list of prices to populate and return
|
|
pcb_qty_prices_by_part_type = []
|
|
# Iterate through list of quantities and pricing data if pricing data exists
|
|
if part_data.pricing:
|
|
for pricing_type in part_data.pricing:
|
|
# Get the standard pricing for this package type
|
|
std_pricing = pricing_type["StandardPricing"]
|
|
# Process the pricing type if data exists, else skip
|
|
if std_pricing:
|
|
# Initialize a pricing dictionary and populate package type
|
|
pricing_for_price_type = {}
|
|
pricing_for_price_type["package_type"] = pricing_type["PackageType"]
|
|
pricing_for_price_type["cogs"] = []
|
|
# Get price breakpoints for this pricing type
|
|
breakpoints = [
|
|
int(stdpricing["BreakQuantity"]) for stdpricing in std_pricing
|
|
]
|
|
# Iterate through the PCB quantities for COGS breakdown
|
|
for pcb_qty in pcb_quantities:
|
|
# Get the total part count for this PCB quantity
|
|
part_qty = single_pcb_part_qty * pcb_qty
|
|
# Initialize a dict for populating COGS for this PCB quantity
|
|
pricing_for_pcb_qty = {
|
|
"pcb_qty": pcb_qty,
|
|
"total_part_qty": part_qty,
|
|
}
|
|
# Set the breakpoint index to start or end of list, or as None,
|
|
# depending on the quantity. Set pricing for edge case
|
|
breakpoint_idx = (
|
|
0
|
|
if part_qty <= min(breakpoints)
|
|
else -1
|
|
if part_qty >= max(breakpoints)
|
|
else None
|
|
)
|
|
if breakpoint_idx is not None:
|
|
pricing_for_pcb_qty["break_qty"] = std_pricing[breakpoint_idx][
|
|
"BreakQuantity"
|
|
]
|
|
pricing_for_pcb_qty["price_per_unit"] = std_pricing[
|
|
breakpoint_idx
|
|
]["UnitPrice"]
|
|
pricing_for_pcb_qty["total_price"] = (
|
|
std_pricing[breakpoint_idx]["UnitPrice"] * part_qty
|
|
)
|
|
else:
|
|
# Populate break quantity and prices the target quantity
|
|
for breakpoint in std_pricing:
|
|
# If breakpoint index already set, populate
|
|
if part_qty >= breakpoint["BreakQuantity"]:
|
|
pricing_for_pcb_qty["break_qty"] = breakpoint[
|
|
"BreakQuantity"
|
|
]
|
|
pricing_for_pcb_qty["price_per_unit"] = breakpoint[
|
|
"UnitPrice"
|
|
]
|
|
pricing_for_pcb_qty["total_price"] = (
|
|
breakpoint["UnitPrice"] * part_qty
|
|
)
|
|
# Append the pricing for this PCB quantity to the list
|
|
pricing_for_price_type["cogs"].append(pricing_for_pcb_qty)
|
|
# Add pricing dict to the list of pricing types
|
|
pcb_qty_prices_by_part_type.append(pricing_for_price_type)
|
|
# Return the populated pricing data
|
|
return pcb_qty_prices_by_part_type
|
|
|
|
|
|
################################################################################
|
|
if __name__ == "__main__":
|
|
# Initialize argument parser
|
|
parser = ArgumentParser()
|
|
parser.add_argument("bom_file", help="Path to the BOM file")
|
|
parser.add_argument(
|
|
"--output_path", help="Path to the directory to output report to"
|
|
)
|
|
parser.add_argument(
|
|
"--pcb_quantities",
|
|
help=(
|
|
"A comma-separated list of quantities of PCBs to compute the COGS "
|
|
+ "for. Defaults to '%(default)s'."
|
|
),
|
|
default="1,10,100,500,1000",
|
|
)
|
|
parser.add_argument(
|
|
"--report_type",
|
|
choices=("html", "md"),
|
|
help=(
|
|
"Report type as html or md. html reports are uploaded as artifacts"
|
|
+ " to the Action run whereas md reports are added to the"
|
|
+ " repository's Wiki. Defaults to html."
|
|
),
|
|
default="html",
|
|
const="html",
|
|
nargs="?",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Read the BOM file into list
|
|
with open(args.bom_file, newline="") as bomfile:
|
|
# Comma delimited file with " as quote character to be included
|
|
bomreader = csv.reader(bomfile, delimiter=",", quotechar='"')
|
|
# Save as a list
|
|
bom_line_items = list(bomreader)
|
|
# Save the index of the designator field
|
|
refdes_col_idx = (bom_line_items[0]).index("Designator")
|
|
# Skip the header
|
|
del bom_line_items[0]
|
|
|
|
# Get the PCB quantities, if specified
|
|
pcb_quantities = []
|
|
if args.pcb_quantities:
|
|
try:
|
|
# Get the quantities as a list of integers
|
|
pcb_quantities = [
|
|
int(quantity) for quantity in args.pcb_quantities.split(",")
|
|
]
|
|
except Exception:
|
|
pass
|
|
|
|
# Authenticate with DigiKey
|
|
digikey_client_id = os.environ.get("DIGIKEY_CLIENT_ID")
|
|
digikey_client_secret = os.environ.get("DIGIKEY_CLIENT_SECRET")
|
|
(response_code, access_token) = get_access_token(
|
|
DIGIKEY_API_AUTH_ENDPOINT, digikey_client_id, digikey_client_secret
|
|
)
|
|
|
|
# Cannot proceed with search API queries if authentication failed,
|
|
# exit gracefully.
|
|
if response_code != 200:
|
|
print("Authentication failed. Response from server:")
|
|
print(access_token)
|
|
print("Exiting...")
|
|
|
|
# Initialize list of BOM item part data
|
|
bom_items_digikey_data = []
|
|
|
|
# Fetch information for all parts in the BOM
|
|
for line_item in bom_line_items:
|
|
print("- Fetching info for " + line_item[0] + "... ", end="")
|
|
# Search for parts in DigiKey by Manufacturer Part Number as keyword
|
|
(response_code, keyword_search_json) = query_digikey_v4_API_keyword_search(
|
|
DIGIKEY_API_V4_KEYWORD_SEARCH_ENDPOINT,
|
|
digikey_client_id,
|
|
access_token,
|
|
"US",
|
|
"en",
|
|
"USD",
|
|
"0",
|
|
line_item[0],
|
|
)
|
|
print("✔" + "\n", end="", flush=True)
|
|
# Process a successful response
|
|
if response_code == 200:
|
|
# Extract the part data from the keyword search response
|
|
part_data = extract_data_from_digikey_search_response(keyword_search_json)
|
|
# Add the associated reference designators
|
|
part_data.associated_refdes = line_item[refdes_col_idx]
|
|
# Get the COGS pricing if PCB quantities specified
|
|
if args.pcb_quantities:
|
|
# Get the number of components needed for this part
|
|
part_qty = len(part_data.associated_refdes.split(","))
|
|
# Initialize a COGS breakdown dict for the different quantities
|
|
cogs_breakdown = {}
|
|
# Iterate PCB quantities and get prices for component quantities
|
|
# at each PCB quantity. Add COGS breakdown to the component data set
|
|
part_data.cogs_breakdown = get_prices_for_target_qtys(
|
|
part_data, part_qty, pcb_quantities
|
|
)
|
|
# Add the extracted data to the list of BOM items part data
|
|
bom_items_digikey_data.append(part_data)
|
|
# Print out the details of an unsuccessful response
|
|
else:
|
|
print("DigiKey API search unsuccesful:")
|
|
print(response_code, keyword_search_json)
|
|
|
|
# Load Jinja with output HTML template
|
|
template_env = Environment(loader=FileSystemLoader("/report_template/"))
|
|
template = template_env.get_template("index.html")
|
|
# Populuate the context data
|
|
context = {"bom": bom_items_digikey_data}
|
|
# Create report output folder if it doesn't exist
|
|
try:
|
|
os.makedirs("/component_report")
|
|
except FileExistsError:
|
|
pass
|
|
# Unzip the JS/CSS assets
|
|
shutil.unpack_archive("/report_template/assets.zip", "/component_report")
|
|
# Write HTML output file
|
|
with open(
|
|
"/component_report/index.html", mode="w", encoding="utf-8"
|
|
) as report_file:
|
|
print("- Outputting report")
|
|
report_file.write(template.render(context))
|
|
# Zip the component report as a git workspace artifact
|
|
with zipfile.ZipFile(
|
|
args.output_path + "/component_report.zip", "w", zipfile.ZIP_DEFLATED
|
|
) as zipper:
|
|
for root, dirs, files in os.walk("/component_report"):
|
|
for file in files:
|
|
zipper.write(os.path.join(root, file))
|
|
# Convert all ComponentData objects in the BOM items DigiKey data
|
|
# list to dictionaries in preparation for JSON output
|
|
for idx in range(0, len(bom_items_digikey_data)):
|
|
bom_items_digikey_data[idx] = bom_items_digikey_data[idx].__dict__
|
|
# Output the BOM items DigiKey data as a json file
|
|
with open(
|
|
"digikey_data_from_bom.json", mode="w", encoding="utf-8"
|
|
) as json_output_file:
|
|
json_output_file.write(json.dumps(bom_items_digikey_data, indent=2))
|