0
mirror of https://gitlab.com/hyperglitch/jellyfish.git synced 2025-11-12 22:06:08 +00:00

211 lines
6.1 KiB
Python

#!/usr/bin/env python
# SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
import multiprocessing
import queue
import signal
import struct
import time
import usb.core
import usb.util
from .constants import VENDOR_ID, PRODUCT_ID, IN_EP, FRAME_HEADER, FRAME_SIZE
logger = logging.getLogger(__name__)
_time_to_die = None
_last_sigint = 0
def sigint_handler(signal, frame):
global _time_to_die
_time_to_die.set()
print("Stopping background process")
def jf_fetch(data_queue, time_to_die, calibration_table=None):
# handle Ctrl+C
global _time_to_die
_time_to_die = time_to_die
signal.signal(signal.SIGINT, sigint_handler)
# find the device
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if dev is None:
logger.error("Device not found")
time_to_die.set()
return
assert isinstance(dev, usb.core.Device)
# --- only detach the vendor interface (interface number 2) ---
RAW_IFACE = 2
if dev.is_kernel_driver_active(RAW_IFACE):
logger.info(f"Detaching kernel driver from interface {RAW_IFACE}")
dev.detach_kernel_driver(RAW_IFACE)
# now claim just the raw interface
usb.util.claim_interface(dev, RAW_IFACE)
logger.info(f"Claimed interface {RAW_IFACE}")
pkgcount = 0
st = time.time()
local_buffer = []
buffer_usage = 0.0
errcnt = 0
frame_struct = struct.Struct('>hhhhBB')
calib = {
'voltage': (1, 0),
'isense2': (1, 0),
'isense1': (
(0, 0),
(1, 0),
(1, 0),
(1, 0),
(1, 0),
(1, 0),
),
'ain': (1, 0)
}
if calibration_table is not None:
calib = calibration_table
range_last = 1
while not time_to_die.is_set():
try:
data = dev.read(IN_EP, 512, timeout=500)
errcnt = 0
except usb.core.USBError as e:
errcnt += 1
if e.errno == 110:
logger.error("Timeout waiting for data")
elif e.errno == 19:
logger.error("Device disconnected")
time_to_die.set()
return # no need to release the interface
elif e.errno == 5:
logger.error("Read error: I/O error (no data from the device)")
else:
logger.error("Read error: %s", e)
data = []
if errcnt > 10:
logger.error("Too many read errors, exiting")
time_to_die.set()
return
try:
buf = data.tobytes()
except AttributeError:
logger.error("Device error: device disconnected")
return
mv = memoryview(buf)
pos = 0
last = None
frame_packets = 0
while True:
idx = buf.find(FRAME_HEADER, pos)
if idx < 0:
break
data_start_idx = None
if last is not None and idx - last == FRAME_SIZE:
# unpack directly from the memoryview—zero copy!
data_start_idx = last + 2
elif last is None:
# handle the first frame (might not have a full header)
if idx==11 and buf[0] == FRAME_HEADER[1]:
data_start_idx = 1
elif idx==12 and buf[0] == FRAME_HEADER[0] and buf[1] == FRAME_HEADER[1]:
data_start_idx = 2
if data_start_idx is not None:
d0, d1, d2, d3, _, d5 = frame_struct.unpack_from(mv, data_start_idx)
curr_isense2 = d1 * calib['isense2'][0] + calib['isense2'][1]
rng = d5 >> 5
f = {
'adc00': curr_isense2 if range_last==0 else calib['isense1'][range_last][0]*d0 + calib['isense1'][range_last][1],
'adc01': curr_isense2,
'adc10': calib['voltage'][0]*d2 + calib['voltage'][1],
'adc11': d3,
'gpios': d5 & 0x1f,
'range': rng
}
local_buffer.append(f)
range_last = rng
frame_packets += 1
pkgcount += 1
if pkgcount >= 100000:
now = time.time()
logger.debug(f"Packages: {pkgcount/(now-st):.3f} frames/sec, buffer usage: {buffer_usage:.3f}")
st = now
pkgcount = 0
last = idx
pos = idx + 1
f_usage = (frame_packets*FRAME_SIZE)/float(len(data))
buffer_usage = 0.9*buffer_usage + 0.1*f_usage
if len(local_buffer)>128:
try:
data_queue.put(local_buffer, block=False)
except queue.Full:
pass
local_buffer = []
# TODO: check for partial frame at the end of the buffer
usb.util.release_interface(dev, RAW_IFACE)
try:
dev.attach_kernel_driver(RAW_IFACE)
except Exception:
pass
logger.info("Released interface and re-attached kernel driver.")
class JfUsbProcess:
def __init__(self, calibration_table=None, queue_size=5000):
self.data_queue = multiprocessing.Queue(maxsize=queue_size)
self.time_to_die = multiprocessing.Event()
self.calibration_table = calibration_table
self.process = None
logger.setLevel(logging.DEBUG)
def log_level(self, level):
logger.setLevel(level)
def get_data_queue(self):
return self.data_queue
def get_time_to_die(self):
return self.time_to_die
def start(self):
self.process = multiprocessing.Process(target=jf_fetch, args=(self.data_queue, self.time_to_die, self.calibration_table))
self.process.start()
def stop(self):
if self.process is None:
return
self.time_to_die.set()
time.sleep(0.5) # wait for the process to exit
logger.info("Process done")
self.process.terminate()