0
mirror of https://gitlab.com/hyperglitch/jellyfish.git synced 2026-01-12 18:11:12 +00:00
jellyfish-powersupply/sw/jf_calibrate.py

414 lines
13 KiB
Python
Executable File

#!/usr/bin/env python
# SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
import argparse
import json
import queue
import os
import time
import serial
import numpy as np
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
from datetime import datetime
from libjellyfishopp import JfUsbProcess
def linear_fit(calibration, plot=True):
# calibration is a list of tuples (set_code, mean_voltage, stdev_voltage)
x_vals, y_means, y_stds = zip(*calibration)
x_vals = np.array(x_vals)
y_means = np.array(y_means)
y_stds = np.array(y_stds)
def linear_model(x, m, b):
return m * x + b
# Weighted fit: pass standard deviations as sigma
if np.any(y_stds)>0:
popt, pcov = curve_fit(
linear_model, x_vals, y_means,
sigma=y_stds, absolute_sigma=True # absolute_sigma=True makes stdev interpreted as real stddev
)
else:
# fit without uncertainty
popt, pcov = curve_fit(linear_model, x_vals, y_means)
slope, intercept = popt
slope_err, intercept_err = np.sqrt(np.diag(pcov))
#print(f"Fit: y = {slope:.6f}x + {intercept:.6f}")
print(f"Slope uncertainty: ±{slope_err:.6f}")
print(f"Intercept uncertainty: ±{intercept_err:.6f}")
if plot:
# Optional: plot it
x_fit = np.linspace(min(x_vals), max(x_vals), 100)
y_fit = linear_model(x_fit, *popt)
plt.errorbar(x_vals, y_means, yerr=y_stds, fmt='o', label='Mean ± Std Dev')
plt.plot(x_fit, y_fit, 'r-', label='Weighted Fit')
plt.xlabel("X")
plt.ylabel("Y")
plt.legend()
plt.title("Linear Fit with Measurement Uncertainty")
plt.show()
return slope, intercept
def queue_flush(q):
while True:
try:
q.get_nowait()
except queue.Empty:
break
def calibrate_voltage(scpi, data_queue):
print("Calibrating voltage...")
print("JellyfishOPP power supply must be at least 12V")
print("Connect the volmeter to the JellyfishOPP output and press enter")
input()
vout_idx = 'adc10'
in_calib = [] # relation between DAC code and voltage
out_calib = [] # relation between ADC code and voltage
for set_code in (2000, 5000, 10000, 20000):
print(f"Setting voltage to DAC code {set_code}")
scpi.write(b"SOURCE:DAC %d\n"%(set_code,))
# flush the queue
queue_flush(data_queue)
time.sleep(0.5)
queue_flush(data_queue)
while True:
inp = input("Type the measured voltage in volts: ")
try:
voltage = float(inp) * 1000 # convert to mV
break
except ValueError:
print("Invalid input")
continue
# get the voltage from the queue
adc_voltage = []
for _ in range(5):
try:
q = data_queue.get_nowait()
time.sleep(0.2)
except queue.Empty:
print("ERROR: No data in the queue")
return
for rec in q:
adc_voltage.append(float(rec[vout_idx]))
mean_voltage = float(np.mean(adc_voltage))
stdev_voltage = float(np.std(adc_voltage))
print(f"Avg ADC voltage: {mean_voltage:.3f}, stdev {stdev_voltage} ({len(adc_voltage)})")
in_calib.append((set_code, voltage, 0))
out_calib.append((voltage, mean_voltage, stdev_voltage))
print("Calibration done")
in_m, in_b = linear_fit(in_calib)
out_m, out_b = linear_fit(out_calib)
# we need to get the inverse coefficients for our use case
in_m_ = float(1.0/in_m)
in_b_ = float(-in_b/in_m)
out_m_ = float(1.0/out_m)
out_b_ = float(-out_b/out_m)
print("Calibration results:")
print(f"Voltage calibration: y = {in_m_:.6f}x + {in_b_:.6f}")
print(f"ADC voltage calibration: y = {out_m_:.6f}x + {out_b_:.6f}")
return {
'dac': [in_m_, in_b_],
'voltage': [out_m_, out_b_]
}
def calibrate_current(scpi, data_queue):
# Calibration results:
# R4: y = -0.000012x + 0.003723
# R3: y = -0.000125x -0.013356
# R2: y = -0.001238x +0.015826
# R1: y = -0.011676x -0.029388
# R0: y = -0.124461x -2.797160
print("Calibrating current...")
# set the highest range and lowest voltage
scpi.write(b"SOURCE:DAC 0\n")
scpi.write(b"MEASURE:RANGE R4\n")
print("Connect the current meter directly between the JellyfishOPP output terminals and press enter")
range_desc = {
'R5': '30uA',
'R4': '300uA',
'R3': '3mA',
'R2': '30mA',
'R1': '300mA',
'R0': '3A'
}
calibration_table = {
'isense1': [
[0, 0], # R0
None, # R1
None, # R2
None, # R3
None, # R4
None, # R5
[0, 0], # R6 (not used, auto range)
[0, 0], # R7 (not used, all off)
]
}
isense_calib = []
for current_range in ('R5', 'R4', 'R3', 'R2', 'R1'):
print()
print(f"Setting current range to {current_range}: {range_desc[current_range]}")
print(f"Set the current meter for the {range_desc[current_range]} range and press enter")
input()
i1_idx = 'adc00'
i2_idx = 'adc01'
adc_code_min = -1000
adc_code_max = -29000
limits = [-1, -1]
dac_code = 0
inc = 10 if current_range in ('R5', 'R4', 'R3', 'R2') else 100
print("Searching for limits...")
scpi.write(b"MEASURE:RANGE %s\n"%(current_range.encode(),))
while True:
scpi.write(b"SOURCE:DAC %d\n"%(dac_code,))
# flush the queue
time.sleep(0.1)
queue_flush(data_queue)
time.sleep(0.1)
meas1 = []
meas2 = []
try:
q = data_queue.get_nowait()
except queue.Empty:
print("ERROR: No data in the queue")
return
for rec in q:
meas1.append(float(rec[i1_idx]))
meas2.append(float(rec[i2_idx]))
meas1_avg = float(np.mean(meas1))
meas1_std = float(np.std(meas1))
meas2_avg = float(np.mean(meas2))
meas2_std = float(np.std(meas2))
print(dac_code, meas1_avg, meas1_std, meas2_avg, meas2_std)
if meas1_avg < adc_code_min and limits[0]==-1:
limits[0] = dac_code
print(f" Found limit low: {dac_code} -> {meas1_avg}")
elif meas1_avg < adc_code_max and limits[1]==-1:
limits[1] = dac_code
print(f" Found limit high: {dac_code} -> {meas1_avg}")
break
dac_code += inc
# reset the DAC
scpi.write(b"SOURCE:DAC 0\n")
time.sleep(0.2)
center_point = (limits[0]+limits[1])/2
points = [(limits[0]+center_point)/2, center_point, (limits[1]+center_point)/2]
print('points', points)
calib = [] # relation between DAC code and voltage
for pt in points:
scpi.write(b"SOURCE:DAC %d\n"%(pt,))
# flush the queue
time.sleep(0.1)
queue_flush(data_queue)
time.sleep(0.1)
while True:
inp = input("Type the measured current in milliamps: ")
try:
curr = float(inp)
break
except ValueError:
print("Invalid input")
continue
# get the current from the queue
adc_current = []
isense_current = []
for _ in range(5):
try:
q = data_queue.get_nowait()
time.sleep(0.1)
except queue.Empty:
print("ERROR: No data in the queue")
return
for rec in q:
adc_current.append(float(rec[i1_idx]))
isense_current.append(float(rec[i2_idx]))
mean_current = float(np.mean(adc_current))
stdev_current = float(np.std(adc_current))
print(f"Avg ADC current: {mean_current:.3f}, stdev {stdev_current} ({len(adc_current)})")
calib.append((curr, mean_current, stdev_current))
isense_mean = float(np.mean(isense_current))
isense_std = float(np.std(isense_current))
isense_calib.append((curr, isense_mean, isense_std))
# reset the DAC
scpi.write(b"SOURCE:DAC 0\n")
time.sleep(0.2)
print("Calibration done")
print(calib)
in_m, in_b = linear_fit(calib)
# we need to get the inverse coefficients for our use case
in_m_ = float(1.0/in_m)
in_b_ = float(-in_b/in_m)
print("Calibration results:")
print(f"y = {in_m_:.6f}x + {in_b_:.6f}")
calibration_table['isense1'][int(current_range[1])] = [in_m_, in_b_]
# fit the main isense
in_m, in_b = linear_fit(isense_calib)
# we need to get the inverse coefficients for our use case
in_m_ = float(1.0/in_m)
in_b_ = float(-in_b/in_m)
print("Isense Calibration results:")
print(f"y = {in_m_:.6f}x + {in_b_:.6f}")
calibration_table['isense2'] = [in_m_, in_b_]
return calibration_table
def print_calibration_header():
if os.path.exists('calibration.json'):
with open('calibration.json') as f:
try:
calibration_table_full = json.load(f)
except ValueError:
print("ERROR: Could not parse calibration table")
return
else:
print("No calibration table found, plotting raw values")
return
print("/*")
print(" * SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>")
print(" *")
print(" * SPDX-License-Identifier: GPL-3.0-or-later")
print(" */")
print("")
print("// File generated by jf_calibrate.py on %s"%(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
print("")
print("#include \"jf_calibration.h\"")
print("")
print("const jf_calibration_t JF_CALIBRATION_EMPTY = {")
print(" { 0, 0, 0},")
print(" \"Empty\",")
print(" { 1, 0 },")
print(" { 1, 0 },")
print(" { 1, 0 },")
print(" { 1, 0 },")
print(" {")
for i in range(8):
print(" 1, 0,")
print(" }")
print("};")
print("")
isense_fields = ['R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'auto', 'all off']
print("const jf_calibration_t JF_CALIBRATION_DATA[] = {")
for devid, calibration_table in calibration_table_full.items():
print(" {")
print(f" {{ 0x{devid[:8]}, 0x{devid[8:16]}, 0x{devid[16:24]} }},\t\t// UID")
print(f" \"{calibration_table['comment']}\",\t\t\t\t// Description")
print(f" {{ {calibration_table['voltage'][0]:.8f}, {calibration_table['voltage'][1]:.8f} }},\t\t\t// Voltage")
print(f" {{ {calibration_table['dac'][0]:.8f}, {calibration_table['dac'][1]:.8f} }}, \t\t\t// DAC")
print(f" {{ {calibration_table['ain'][0]:.8f}, {calibration_table['ain'][1]:.8f} }}, \t\t\t// AIN")
print(f" {{ {calibration_table['isense2'][0]:.8f}, {calibration_table['isense2'][1]:.8f} }},\t\t\t// Isense2")
print(" {")
for i in range(len(calibration_table['isense1'])):
print(f" {calibration_table['isense1'][i][0]:.8f}, {calibration_table['isense1'][i][1]:.8f},\t\t\t// {isense_fields[i]}")
print(" }")
print(" },")
print(" { { 0, 0, 0 }, }")
print("};")
def main():
parser = argparse.ArgumentParser(description='Calibrate JellyfishOPP')
parser.add_argument('value', type=str, choices=['voltage', 'current', 'ain', 'code'], help='Calibrate voltage or current')
args = parser.parse_args()
if args.value == 'code':
print_calibration_header()
return
scpi_interface = '/dev/ttyUSB0'
scpi_baud = 230400
try:
scpi = serial.Serial(scpi_interface, scpi_baud, timeout=0.1)
except serial.SerialException:
print("ERROR: Could not open serial port")
return
scpi.write(b"SYSTEM:ID?\n")
try:
devid = scpi.readlines()[-1].strip().decode()
except (TypeError, IndexError, AttributeError):
print("ERROR: Could not read device ID")
return
proc = JfUsbProcess(calibration_table=None, queue_size=1)
proc.start()
calibration_table = {}
if args.value == 'voltage':
calibration_table = calibrate_voltage(scpi, proc.get_data_queue())
elif args.value == 'current':
calibration_table = calibrate_current(scpi, proc.get_data_queue())
print("--------------")
print("Result:")
print(json.dumps({devid: calibration_table}, indent=2, sort_keys=True))
proc.stop()
if __name__ == "__main__":
main()