mirror of
https://gitlab.com/hyperglitch/jellyfish.git
synced 2026-01-12 18:11:12 +00:00
414 lines
13 KiB
Python
Executable File
414 lines
13 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import argparse
|
|
import json
|
|
import queue
|
|
import os
|
|
import time
|
|
import serial
|
|
|
|
import numpy as np
|
|
from scipy.optimize import curve_fit
|
|
import matplotlib.pyplot as plt
|
|
|
|
from datetime import datetime
|
|
from libjellyfishopp import JfUsbProcess
|
|
|
|
|
|
def linear_fit(calibration, plot=True):
|
|
# calibration is a list of tuples (set_code, mean_voltage, stdev_voltage)
|
|
x_vals, y_means, y_stds = zip(*calibration)
|
|
x_vals = np.array(x_vals)
|
|
y_means = np.array(y_means)
|
|
y_stds = np.array(y_stds)
|
|
|
|
|
|
def linear_model(x, m, b):
|
|
return m * x + b
|
|
|
|
# Weighted fit: pass standard deviations as sigma
|
|
if np.any(y_stds)>0:
|
|
popt, pcov = curve_fit(
|
|
linear_model, x_vals, y_means,
|
|
sigma=y_stds, absolute_sigma=True # absolute_sigma=True makes stdev interpreted as real stddev
|
|
)
|
|
else:
|
|
# fit without uncertainty
|
|
popt, pcov = curve_fit(linear_model, x_vals, y_means)
|
|
|
|
slope, intercept = popt
|
|
slope_err, intercept_err = np.sqrt(np.diag(pcov))
|
|
|
|
#print(f"Fit: y = {slope:.6f}x + {intercept:.6f}")
|
|
print(f"Slope uncertainty: ±{slope_err:.6f}")
|
|
print(f"Intercept uncertainty: ±{intercept_err:.6f}")
|
|
|
|
if plot:
|
|
# Optional: plot it
|
|
x_fit = np.linspace(min(x_vals), max(x_vals), 100)
|
|
y_fit = linear_model(x_fit, *popt)
|
|
|
|
plt.errorbar(x_vals, y_means, yerr=y_stds, fmt='o', label='Mean ± Std Dev')
|
|
plt.plot(x_fit, y_fit, 'r-', label='Weighted Fit')
|
|
plt.xlabel("X")
|
|
plt.ylabel("Y")
|
|
plt.legend()
|
|
plt.title("Linear Fit with Measurement Uncertainty")
|
|
plt.show()
|
|
|
|
return slope, intercept
|
|
|
|
|
|
def queue_flush(q):
|
|
while True:
|
|
try:
|
|
q.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
|
|
|
|
def calibrate_voltage(scpi, data_queue):
|
|
print("Calibrating voltage...")
|
|
print("JellyfishOPP power supply must be at least 12V")
|
|
print("Connect the volmeter to the JellyfishOPP output and press enter")
|
|
input()
|
|
|
|
vout_idx = 'adc10'
|
|
|
|
in_calib = [] # relation between DAC code and voltage
|
|
out_calib = [] # relation between ADC code and voltage
|
|
for set_code in (2000, 5000, 10000, 20000):
|
|
print(f"Setting voltage to DAC code {set_code}")
|
|
scpi.write(b"SOURCE:DAC %d\n"%(set_code,))
|
|
|
|
# flush the queue
|
|
queue_flush(data_queue)
|
|
time.sleep(0.5)
|
|
queue_flush(data_queue)
|
|
|
|
while True:
|
|
inp = input("Type the measured voltage in volts: ")
|
|
try:
|
|
voltage = float(inp) * 1000 # convert to mV
|
|
break
|
|
except ValueError:
|
|
print("Invalid input")
|
|
continue
|
|
|
|
# get the voltage from the queue
|
|
adc_voltage = []
|
|
for _ in range(5):
|
|
try:
|
|
q = data_queue.get_nowait()
|
|
time.sleep(0.2)
|
|
except queue.Empty:
|
|
print("ERROR: No data in the queue")
|
|
return
|
|
for rec in q:
|
|
adc_voltage.append(float(rec[vout_idx]))
|
|
|
|
mean_voltage = float(np.mean(adc_voltage))
|
|
stdev_voltage = float(np.std(adc_voltage))
|
|
print(f"Avg ADC voltage: {mean_voltage:.3f}, stdev {stdev_voltage} ({len(adc_voltage)})")
|
|
|
|
in_calib.append((set_code, voltage, 0))
|
|
out_calib.append((voltage, mean_voltage, stdev_voltage))
|
|
|
|
print("Calibration done")
|
|
|
|
in_m, in_b = linear_fit(in_calib)
|
|
out_m, out_b = linear_fit(out_calib)
|
|
|
|
# we need to get the inverse coefficients for our use case
|
|
|
|
in_m_ = float(1.0/in_m)
|
|
in_b_ = float(-in_b/in_m)
|
|
|
|
out_m_ = float(1.0/out_m)
|
|
out_b_ = float(-out_b/out_m)
|
|
|
|
print("Calibration results:")
|
|
print(f"Voltage calibration: y = {in_m_:.6f}x + {in_b_:.6f}")
|
|
print(f"ADC voltage calibration: y = {out_m_:.6f}x + {out_b_:.6f}")
|
|
|
|
return {
|
|
'dac': [in_m_, in_b_],
|
|
'voltage': [out_m_, out_b_]
|
|
}
|
|
|
|
|
|
|
|
def calibrate_current(scpi, data_queue):
|
|
# Calibration results:
|
|
# R4: y = -0.000012x + 0.003723
|
|
# R3: y = -0.000125x -0.013356
|
|
# R2: y = -0.001238x +0.015826
|
|
# R1: y = -0.011676x -0.029388
|
|
# R0: y = -0.124461x -2.797160
|
|
print("Calibrating current...")
|
|
|
|
# set the highest range and lowest voltage
|
|
scpi.write(b"SOURCE:DAC 0\n")
|
|
scpi.write(b"MEASURE:RANGE R4\n")
|
|
|
|
print("Connect the current meter directly between the JellyfishOPP output terminals and press enter")
|
|
|
|
range_desc = {
|
|
'R5': '30uA',
|
|
'R4': '300uA',
|
|
'R3': '3mA',
|
|
'R2': '30mA',
|
|
'R1': '300mA',
|
|
'R0': '3A'
|
|
}
|
|
|
|
calibration_table = {
|
|
'isense1': [
|
|
[0, 0], # R0
|
|
None, # R1
|
|
None, # R2
|
|
None, # R3
|
|
None, # R4
|
|
None, # R5
|
|
[0, 0], # R6 (not used, auto range)
|
|
[0, 0], # R7 (not used, all off)
|
|
]
|
|
}
|
|
|
|
isense_calib = []
|
|
for current_range in ('R5', 'R4', 'R3', 'R2', 'R1'):
|
|
print()
|
|
print(f"Setting current range to {current_range}: {range_desc[current_range]}")
|
|
print(f"Set the current meter for the {range_desc[current_range]} range and press enter")
|
|
input()
|
|
|
|
i1_idx = 'adc00'
|
|
i2_idx = 'adc01'
|
|
|
|
adc_code_min = -1000
|
|
adc_code_max = -29000
|
|
|
|
limits = [-1, -1]
|
|
dac_code = 0
|
|
|
|
inc = 10 if current_range in ('R5', 'R4', 'R3', 'R2') else 100
|
|
|
|
print("Searching for limits...")
|
|
scpi.write(b"MEASURE:RANGE %s\n"%(current_range.encode(),))
|
|
while True:
|
|
scpi.write(b"SOURCE:DAC %d\n"%(dac_code,))
|
|
# flush the queue
|
|
time.sleep(0.1)
|
|
queue_flush(data_queue)
|
|
time.sleep(0.1)
|
|
|
|
meas1 = []
|
|
meas2 = []
|
|
try:
|
|
q = data_queue.get_nowait()
|
|
except queue.Empty:
|
|
print("ERROR: No data in the queue")
|
|
return
|
|
for rec in q:
|
|
meas1.append(float(rec[i1_idx]))
|
|
meas2.append(float(rec[i2_idx]))
|
|
|
|
meas1_avg = float(np.mean(meas1))
|
|
meas1_std = float(np.std(meas1))
|
|
meas2_avg = float(np.mean(meas2))
|
|
meas2_std = float(np.std(meas2))
|
|
print(dac_code, meas1_avg, meas1_std, meas2_avg, meas2_std)
|
|
|
|
if meas1_avg < adc_code_min and limits[0]==-1:
|
|
limits[0] = dac_code
|
|
print(f" Found limit low: {dac_code} -> {meas1_avg}")
|
|
elif meas1_avg < adc_code_max and limits[1]==-1:
|
|
limits[1] = dac_code
|
|
print(f" Found limit high: {dac_code} -> {meas1_avg}")
|
|
break
|
|
|
|
dac_code += inc
|
|
# reset the DAC
|
|
scpi.write(b"SOURCE:DAC 0\n")
|
|
time.sleep(0.2)
|
|
|
|
center_point = (limits[0]+limits[1])/2
|
|
points = [(limits[0]+center_point)/2, center_point, (limits[1]+center_point)/2]
|
|
print('points', points)
|
|
|
|
calib = [] # relation between DAC code and voltage
|
|
for pt in points:
|
|
scpi.write(b"SOURCE:DAC %d\n"%(pt,))
|
|
# flush the queue
|
|
time.sleep(0.1)
|
|
queue_flush(data_queue)
|
|
time.sleep(0.1)
|
|
|
|
while True:
|
|
inp = input("Type the measured current in milliamps: ")
|
|
try:
|
|
curr = float(inp)
|
|
break
|
|
except ValueError:
|
|
print("Invalid input")
|
|
continue
|
|
|
|
# get the current from the queue
|
|
adc_current = []
|
|
isense_current = []
|
|
for _ in range(5):
|
|
try:
|
|
q = data_queue.get_nowait()
|
|
time.sleep(0.1)
|
|
except queue.Empty:
|
|
print("ERROR: No data in the queue")
|
|
return
|
|
for rec in q:
|
|
adc_current.append(float(rec[i1_idx]))
|
|
isense_current.append(float(rec[i2_idx]))
|
|
|
|
mean_current = float(np.mean(adc_current))
|
|
stdev_current = float(np.std(adc_current))
|
|
print(f"Avg ADC current: {mean_current:.3f}, stdev {stdev_current} ({len(adc_current)})")
|
|
|
|
calib.append((curr, mean_current, stdev_current))
|
|
|
|
isense_mean = float(np.mean(isense_current))
|
|
isense_std = float(np.std(isense_current))
|
|
isense_calib.append((curr, isense_mean, isense_std))
|
|
|
|
# reset the DAC
|
|
scpi.write(b"SOURCE:DAC 0\n")
|
|
time.sleep(0.2)
|
|
print("Calibration done")
|
|
print(calib)
|
|
|
|
in_m, in_b = linear_fit(calib)
|
|
|
|
# we need to get the inverse coefficients for our use case
|
|
in_m_ = float(1.0/in_m)
|
|
in_b_ = float(-in_b/in_m)
|
|
|
|
print("Calibration results:")
|
|
print(f"y = {in_m_:.6f}x + {in_b_:.6f}")
|
|
|
|
calibration_table['isense1'][int(current_range[1])] = [in_m_, in_b_]
|
|
|
|
# fit the main isense
|
|
in_m, in_b = linear_fit(isense_calib)
|
|
|
|
# we need to get the inverse coefficients for our use case
|
|
in_m_ = float(1.0/in_m)
|
|
in_b_ = float(-in_b/in_m)
|
|
|
|
print("Isense Calibration results:")
|
|
print(f"y = {in_m_:.6f}x + {in_b_:.6f}")
|
|
calibration_table['isense2'] = [in_m_, in_b_]
|
|
return calibration_table
|
|
|
|
|
|
|
|
def print_calibration_header():
|
|
|
|
if os.path.exists('calibration.json'):
|
|
with open('calibration.json') as f:
|
|
try:
|
|
calibration_table_full = json.load(f)
|
|
except ValueError:
|
|
print("ERROR: Could not parse calibration table")
|
|
return
|
|
else:
|
|
print("No calibration table found, plotting raw values")
|
|
return
|
|
|
|
print("/*")
|
|
print(" * SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>")
|
|
print(" *")
|
|
print(" * SPDX-License-Identifier: GPL-3.0-or-later")
|
|
print(" */")
|
|
print("")
|
|
print("// File generated by jf_calibrate.py on %s"%(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
|
|
print("")
|
|
print("#include \"jf_calibration.h\"")
|
|
print("")
|
|
print("const jf_calibration_t JF_CALIBRATION_EMPTY = {")
|
|
print(" { 0, 0, 0},")
|
|
print(" \"Empty\",")
|
|
print(" { 1, 0 },")
|
|
print(" { 1, 0 },")
|
|
print(" { 1, 0 },")
|
|
print(" { 1, 0 },")
|
|
print(" {")
|
|
for i in range(8):
|
|
print(" 1, 0,")
|
|
print(" }")
|
|
print("};")
|
|
print("")
|
|
|
|
isense_fields = ['R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'auto', 'all off']
|
|
print("const jf_calibration_t JF_CALIBRATION_DATA[] = {")
|
|
for devid, calibration_table in calibration_table_full.items():
|
|
print(" {")
|
|
print(f" {{ 0x{devid[:8]}, 0x{devid[8:16]}, 0x{devid[16:24]} }},\t\t// UID")
|
|
print(f" \"{calibration_table['comment']}\",\t\t\t\t// Description")
|
|
print(f" {{ {calibration_table['voltage'][0]:.8f}, {calibration_table['voltage'][1]:.8f} }},\t\t\t// Voltage")
|
|
print(f" {{ {calibration_table['dac'][0]:.8f}, {calibration_table['dac'][1]:.8f} }}, \t\t\t// DAC")
|
|
print(f" {{ {calibration_table['ain'][0]:.8f}, {calibration_table['ain'][1]:.8f} }}, \t\t\t// AIN")
|
|
print(f" {{ {calibration_table['isense2'][0]:.8f}, {calibration_table['isense2'][1]:.8f} }},\t\t\t// Isense2")
|
|
print(" {")
|
|
for i in range(len(calibration_table['isense1'])):
|
|
print(f" {calibration_table['isense1'][i][0]:.8f}, {calibration_table['isense1'][i][1]:.8f},\t\t\t// {isense_fields[i]}")
|
|
print(" }")
|
|
print(" },")
|
|
|
|
print(" { { 0, 0, 0 }, }")
|
|
print("};")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Calibrate JellyfishOPP')
|
|
parser.add_argument('value', type=str, choices=['voltage', 'current', 'ain', 'code'], help='Calibrate voltage or current')
|
|
args = parser.parse_args()
|
|
|
|
if args.value == 'code':
|
|
print_calibration_header()
|
|
return
|
|
|
|
scpi_interface = '/dev/ttyUSB0'
|
|
scpi_baud = 230400
|
|
|
|
try:
|
|
scpi = serial.Serial(scpi_interface, scpi_baud, timeout=0.1)
|
|
except serial.SerialException:
|
|
print("ERROR: Could not open serial port")
|
|
return
|
|
|
|
scpi.write(b"SYSTEM:ID?\n")
|
|
try:
|
|
devid = scpi.readlines()[-1].strip().decode()
|
|
except (TypeError, IndexError, AttributeError):
|
|
print("ERROR: Could not read device ID")
|
|
return
|
|
|
|
proc = JfUsbProcess(calibration_table=None, queue_size=1)
|
|
proc.start()
|
|
|
|
calibration_table = {}
|
|
|
|
if args.value == 'voltage':
|
|
calibration_table = calibrate_voltage(scpi, proc.get_data_queue())
|
|
elif args.value == 'current':
|
|
calibration_table = calibrate_current(scpi, proc.get_data_queue())
|
|
print("--------------")
|
|
print("Result:")
|
|
print(json.dumps({devid: calibration_table}, indent=2, sort_keys=True))
|
|
|
|
proc.stop()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|