0
mirror of https://gitlab.com/hyperglitch/jellyfish.git synced 2026-01-17 18:35:47 +00:00
jellyfish-powersupply/sw/usb_hs_test.py

141 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later
import usb.core
import usb.util
import time
import struct
import sys
from collections import deque
# your VID/PID
VENDOR_ID = 0x1209
PRODUCT_ID = 0x4A46
# find the device
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if dev is None:
print("Device not found")
sys.exit(1)
# select configuration (most devices only have one)
cfg = dev.get_active_configuration()
# --- only detach the vendor interface (interface number 2) ---
RAW_IFACE = 2
if dev.is_kernel_driver_active(RAW_IFACE):
print(f"Detaching kernel driver from interface {RAW_IFACE}")
dev.detach_kernel_driver(RAW_IFACE)
# now claim just the raw interface
usb.util.claim_interface(dev, RAW_IFACE)
print(f"Claimed interface {RAW_IFACE}")
IN_EP = 0x83 # EP3 IN
OUT_EP = 0x03 # EP3 OUT
FRAME_HEADER = b'\x4a\x00'
FRAME_SIZE = 12
PAYLOAD_SIZE = FRAME_SIZE - 2
def read_data():
try:
data = dev.read(IN_EP, 512, timeout=1000)
#print("Bulk IN:", data)
return data
except usb.core.USBError as e:
if e.errno == 110:
print("Timeout waiting for data")
else:
print("Read error:", e)
return []
def write_data(buf):
try:
n = dev.write(OUT_EP, buf, timeout=1000)
print(f"Bulk OUT: sent {n} bytes")
except usb.core.USBError as e:
print("Write error:", e)
def process_unpacked(d0, d1, d2, d3, _, d5):
f = {
'adc00': d0,
'adc01': d1,
'adc10': d2,
'adc11': d3,
'gpios': d5 & 0x1f,
'range': d5 >> 5
}
f['adc10'] = 10174-(f['adc10']*989)/1000
#if f['range'] !=0:
# print(f)
#print(f)
def main():
frame_struct = struct.Struct('>hhhhBB')
try:
st = time.time()
pkgcount = 0
while True:
#write_data(msg)
#time.sleep(0.5)
d = read_data()
buf = d.tobytes()
mv = memoryview(buf)
pos = 0
last = None
while True:
idx = buf.find(FRAME_HEADER, pos)
if idx < 0:
break
data_start_idx = None
if last is not None and idx - last == FRAME_SIZE:
# unpack directly from the memoryview—zero copy!
data_start_idx = last + 2
elif last is None:
# handle the first frame (might not have a full header)
if idx==11 and buf[0] == FRAME_HEADER[1]:
data_start_idx = 1
elif idx==12 and buf[0] == FRAME_HEADER[0] and buf[1] == FRAME_HEADER[1]:
data_start_idx = 2
if data_start_idx is not None:
d0, d1, d2, d3, _, d5 = frame_struct.unpack_from(mv, data_start_idx)
process_unpacked(d0, d1, d2, d3, _, d5)
pkgcount += 1
if pkgcount % 10000 == 0:
now = time.time()
print(f"Packages: {pkgcount/(now-st):.3f} frames/sec")
st = now
pkgcount = 0
last = idx
pos = idx + 1
# TODO: check for partial frame at the end of the buffer
finally:
# clean up: release and reattach the kernel driver
usb.util.release_interface(dev, RAW_IFACE)
try:
dev.attach_kernel_driver(RAW_IFACE)
except Exception:
pass
print("Released interface and re-attached kernel driver.")
if __name__ == "__main__":
main()