mirror of
https://gitlab.com/hyperglitch/jellyfish.git
synced 2026-01-16 10:26:52 +00:00
314 lines
11 KiB
Python
Executable File
314 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
||
|
||
# SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>
|
||
#
|
||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import serial
|
||
import sys
|
||
import time
|
||
|
||
from collections import deque
|
||
|
||
from PyQt5 import QtWidgets, QtCore
|
||
from PyQt5.QtWidgets import QLabel
|
||
from PyQt5.QtGui import QFont, QPalette
|
||
from PyQt5.QtCore import Qt
|
||
import pyqtgraph as pg
|
||
|
||
from libjellyfishopp import JfUsbProcess
|
||
|
||
MAX_BUFFER_SIZE = 20_000
|
||
|
||
|
||
# --- Plotter GUI ---
|
||
class Plotter(QtWidgets.QMainWindow):
|
||
def __init__(self, data_queue, time_to_die):
|
||
super().__init__()
|
||
self.setWindowTitle("JellyfishOPP Plotter")
|
||
self.resize(1000, 800)
|
||
|
||
# State flags
|
||
self.paused = False
|
||
self.sticky = True
|
||
|
||
# Data & buffers
|
||
self.data_queue = data_queue
|
||
self.time_to_die = time_to_die
|
||
self.buffers = { k: deque(maxlen=MAX_BUFFER_SIZE)
|
||
for k in ['adc00','adc01','adc10','adc11','gpios','range'] }
|
||
self.min_vals = {k: None for k in self.buffers}
|
||
self.max_vals = {k: None for k in self.buffers}
|
||
|
||
# Central widget + layout
|
||
cw = QtWidgets.QWidget()
|
||
self.main_layout = QtWidgets.QVBoxLayout(cw)
|
||
self.setCentralWidget(cw)
|
||
|
||
# ─── Toolbar ─────────────────────────────────────
|
||
tb = QtWidgets.QToolBar()
|
||
self.addToolBar(tb)
|
||
|
||
self.vlabel = None
|
||
self.vlabel_value = 0
|
||
self.ilabel = None
|
||
self.ilabel_value = 0
|
||
self.label_last_update = time.time()
|
||
|
||
# Pause/Resume
|
||
self.action_pause = QtWidgets.QAction("Pause", self)
|
||
self.action_pause.triggered.connect(self.toggle_pause)
|
||
tb.addAction(self.action_pause)
|
||
# Sticky Y toggle
|
||
self.action_sticky = QtWidgets.QAction("Sticky Y", self)
|
||
self.action_sticky.triggered.connect(self.toggle_sticky)
|
||
tb.addAction(self.action_sticky)
|
||
|
||
# ─── Plot area ───────────────────────────────────
|
||
self.plot_container = QtWidgets.QWidget()
|
||
self.main_layout.addWidget(self.plot_container)
|
||
self.build_plots()
|
||
|
||
# ─── Status bar ──────────────────────────────────
|
||
sb = QtWidgets.QStatusBar()
|
||
self.setStatusBar(sb)
|
||
self.lbl_rate = QtWidgets.QLabel()
|
||
self.lbl_pause = QtWidgets.QLabel()
|
||
self.lbl_sticky = QtWidgets.QLabel()
|
||
sb.addWidget(self.lbl_rate)
|
||
sb.addWidget(self.lbl_pause)
|
||
sb.addWidget(self.lbl_sticky)
|
||
|
||
# ─── Timer for updates ───────────────────────────
|
||
self.timer = QtCore.QTimer()
|
||
self.timer.timeout.connect(self.update_plots)
|
||
self.timer.start(33)
|
||
|
||
def update_labels(self, voltage, current):
|
||
alpha = 0.8
|
||
self.vlabel_value = self.vlabel_value*alpha + voltage*(1-alpha)
|
||
self.ilabel_value = self.ilabel_value*alpha + current*(1-alpha)
|
||
|
||
if self.vlabel is None or self.ilabel is None:
|
||
return
|
||
|
||
if time.time() - self.label_last_update > 0.3:
|
||
self.label_last_update = time.time()
|
||
|
||
# format voltage
|
||
if self.vlabel_value > 1000:
|
||
self.vlabel.setText(f"{self.vlabel_value/1000:.3f}V")
|
||
else:
|
||
self.vlabel.setText(f"{self.vlabel_value:.3f}mV")
|
||
|
||
# format current
|
||
if abs(self.ilabel_value) > 1000:
|
||
self.ilabel.setText(f"{self.ilabel_value/1000:.3f}A")
|
||
elif abs(self.ilabel_value) > 1:
|
||
self.ilabel.setText(f"{self.ilabel_value:.3f}mA")
|
||
else:
|
||
self.ilabel.setText(f"{self.ilabel_value*1000:.3f}µA")
|
||
|
||
def build_plots(self):
|
||
# Get existing layout (if any)
|
||
grid = self.plot_container.layout()
|
||
if grid is None:
|
||
# first time: install a new QGridLayout
|
||
grid = QtWidgets.QGridLayout()
|
||
self.plot_container.setLayout(grid)
|
||
else:
|
||
# clear out old items & widgets
|
||
while grid.count():
|
||
item = grid.takeAt(0)
|
||
w = item.widget()
|
||
if w:
|
||
w.setParent(None)
|
||
|
||
# Now repopulate
|
||
self.plots = []
|
||
self.curves = []
|
||
signals = ['adc00', 'range', 'adc01','adc10','adc11','gpios']
|
||
|
||
playout = {
|
||
'adc00': (1, 0, 1, 2),
|
||
#'adc01': (2, 0, 1, 2),
|
||
'range': (2, 0, 1, 2),
|
||
'adc10': (3, 0, 1, 2),
|
||
'adc11': (4, 0, 1, 2),
|
||
'gpios': (5, 0, 1, 1),
|
||
#'range': (5, 1, 1, 1),
|
||
'adc01': (5, 1, 1, 1),
|
||
}
|
||
titles = ['Isense1', 'Range', 'Isense2', 'Vout', 'Ain', 'GPIOS']
|
||
colors = ['y','c','m','g','r','b']
|
||
for idx, key in enumerate(signals):
|
||
pw = pg.PlotWidget(title=titles[idx])
|
||
pw.showGrid(x=True, y=True)
|
||
curve = pw.plot(pen=colors[idx])
|
||
self.plots.append(pw)
|
||
self.curves.append((key, curve))
|
||
grid.addWidget(pw, playout[key][0], playout[key][1], playout[key][2], playout[key][3])
|
||
|
||
vlabel = QLabel("123.4mV")
|
||
vlabel.setFont(QFont("Arial", 28, QFont.Bold))
|
||
vlabel.setAlignment(Qt.AlignCenter)
|
||
grid.addWidget(vlabel, 0, 0, 1, 1)
|
||
|
||
ilabel = QLabel("123.4mA")
|
||
ilabel.setFont(QFont("Arial", 28, QFont.Bold))
|
||
ilabel.setAlignment(Qt.AlignCenter)
|
||
grid.addWidget(ilabel, 0, 1, 1, 1)
|
||
|
||
self.vlabel = vlabel
|
||
self.ilabel = ilabel
|
||
|
||
for pw in self.plots[1:]:
|
||
pw.setXLink(self.plots[0])
|
||
|
||
|
||
def toggle_pause(self):
|
||
self.paused = not self.paused
|
||
self.action_pause.setText("Resume" if self.paused else "Pause")
|
||
|
||
def toggle_sticky(self):
|
||
self.sticky = not self.sticky
|
||
self.action_sticky.setText("Free Y" if not self.sticky else "Sticky Y")
|
||
|
||
def update_plots(self):
|
||
if self.time_to_die.is_set():
|
||
QtWidgets.QApplication.quit()
|
||
return
|
||
|
||
if self.paused:
|
||
# discard the queue
|
||
while not self.data_queue.empty():
|
||
buff = self.data_queue.get()
|
||
return
|
||
|
||
# Ingest data
|
||
while not self.data_queue.empty():
|
||
buff = self.data_queue.get()
|
||
for f in buff:
|
||
for k in self.buffers:
|
||
self.buffers[k].append(f[k])
|
||
|
||
self.update_labels(self.buffers['adc10'][-1], self.buffers['adc00'][-1])
|
||
|
||
# Draw each plot
|
||
for idx, (key, curve) in enumerate(self.curves):
|
||
data = list(self.buffers[key])[-MAX_BUFFER_SIZE:]
|
||
if not data:
|
||
continue
|
||
curve.setData(data)
|
||
|
||
if self.sticky:
|
||
lo, hi = min(data), max(data)
|
||
if self.min_vals[key] is None or lo < self.min_vals[key]:
|
||
self.min_vals[key] = lo
|
||
if self.max_vals[key] is None or hi > self.max_vals[key]:
|
||
self.max_vals[key] = hi
|
||
self.plots[idx].setYRange(self.min_vals[key], self.max_vals[key])
|
||
#else:
|
||
# self.plots[idx].enableAutoRange('y', True)
|
||
#self.plots[idx].enableAutoRange('y', not self.sticky)
|
||
|
||
# Update status bar
|
||
# Approximate rate = samples in buffer / elapsed time
|
||
rate = 0 #len(self.buffers['adc00']) / (self.time_to_die or 1)
|
||
self.lbl_rate.setText(f"Rate: {rate:.1f} Hz")
|
||
self.lbl_pause.setText("Paused" if self.paused else "Running")
|
||
self.lbl_sticky.setText("Sticky Y" if self.sticky else "Free Y")
|
||
|
||
def keyPressEvent(self, ev):
|
||
k = ev.key()
|
||
if k == QtCore.Qt.Key_A:
|
||
self.min_vals = {x: None for x in self.buffers}
|
||
self.max_vals = {x: None for x in self.buffers}
|
||
print("Y‑axis reset")
|
||
elif k == QtCore.Qt.Key_Space:
|
||
self.paused = not self.paused
|
||
self.action_pause.setText("Resume" if self.paused else "Pause")
|
||
else:
|
||
super().keyPressEvent(ev)
|
||
def closeEvent(self, ev):
|
||
super().closeEvent(ev)
|
||
|
||
|
||
def main():
|
||
# Example of the calibration file:
|
||
# "devid": {
|
||
# "voltage": [-0.99938, 10223.210015], # m and b coefficients for voltage
|
||
# "isense2": [-0.124461, -2.797160], # m and b coefficients for isense2 (main shunt)
|
||
# "isense1": [
|
||
# [0, 0], # range 0 (not used, main shunt is used)
|
||
# [-0.011676, -0.029388], # range 1
|
||
# [-0.001238, 0.015826], # range 2
|
||
# [-0.000125, -0.013356], # range 3
|
||
# [-0.000012, 0.003723], # range 4
|
||
# [0, 0], # range 5
|
||
# [0, 0], # not used, code for auto-range
|
||
# [0, 0] # not used, all switches off
|
||
# ],
|
||
# "ain": [1, 0]
|
||
# }
|
||
|
||
logging.basicConfig(level=logging.DEBUG)
|
||
|
||
calibration_table_full = {}
|
||
if os.path.exists('calibration.json'):
|
||
print("Loading calibration table...")
|
||
with open('calibration.json') as f:
|
||
calibration_table_full = json.load(f)
|
||
|
||
else:
|
||
print("No calibration table found, plotting raw values")
|
||
|
||
scpi_interface = '/dev/ttyUSB0'
|
||
scpi_baud = 230400
|
||
|
||
try:
|
||
scpi = serial.Serial(scpi_interface, scpi_baud, timeout=0.1)
|
||
except serial.SerialException:
|
||
print("ERROR: Could not open serial port")
|
||
return
|
||
|
||
scpi.write(b"SYSTEM:ID?\n")
|
||
try:
|
||
devid = scpi.readlines()[-1].strip().decode()
|
||
except (TypeError, IndexError, AttributeError):
|
||
print("ERROR: Could not read device ID")
|
||
return
|
||
print(f"Device ID: {devid}")
|
||
|
||
try:
|
||
calibration_table = calibration_table_full[devid]
|
||
except KeyError:
|
||
print("ERROR: Could not find calibration table for this device")
|
||
return
|
||
|
||
if not all(k in calibration_table for k in ('voltage', 'isense1', 'isense2', 'comment')):
|
||
print("ERROR: Calibration table is incomplete")
|
||
return
|
||
|
||
print(f"Found device: {calibration_table['comment']}")
|
||
|
||
proc = JfUsbProcess(calibration_table=calibration_table)
|
||
data_queue = proc.get_data_queue()
|
||
time_to_die = proc.get_time_to_die()
|
||
proc.start()
|
||
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
win = Plotter(data_queue, time_to_die)
|
||
win.show()
|
||
app.exec_()
|
||
|
||
time_to_die.set()
|
||
proc.stop()
|
||
print("Exiting...")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|