mirror of
https://gitlab.com/hyperglitch/jellyfish.git
synced 2026-01-17 18:35:47 +00:00
141 lines
3.7 KiB
Python
141 lines
3.7 KiB
Python
# SPDX-FileCopyrightText: 2025 Igor Brkic <igor@hyperglitch.com>
|
||
#
|
||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||
|
||
import usb.core
|
||
import usb.util
|
||
import time
|
||
import struct
|
||
import sys
|
||
|
||
from collections import deque
|
||
|
||
# your VID/PID
|
||
VENDOR_ID = 0x1209
|
||
PRODUCT_ID = 0x4A46
|
||
|
||
# find the device
|
||
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
|
||
if dev is None:
|
||
print("Device not found")
|
||
sys.exit(1)
|
||
|
||
# select configuration (most devices only have one)
|
||
cfg = dev.get_active_configuration()
|
||
|
||
# --- only detach the vendor interface (interface number 2) ---
|
||
RAW_IFACE = 2
|
||
if dev.is_kernel_driver_active(RAW_IFACE):
|
||
print(f"Detaching kernel driver from interface {RAW_IFACE}")
|
||
dev.detach_kernel_driver(RAW_IFACE)
|
||
|
||
# now claim just the raw interface
|
||
usb.util.claim_interface(dev, RAW_IFACE)
|
||
print(f"Claimed interface {RAW_IFACE}")
|
||
|
||
IN_EP = 0x83 # EP3 IN
|
||
OUT_EP = 0x03 # EP3 OUT
|
||
|
||
FRAME_HEADER = b'\x4a\x00'
|
||
FRAME_SIZE = 12
|
||
PAYLOAD_SIZE = FRAME_SIZE - 2
|
||
|
||
|
||
def read_data():
|
||
try:
|
||
data = dev.read(IN_EP, 512, timeout=1000)
|
||
#print("Bulk IN:", data)
|
||
return data
|
||
except usb.core.USBError as e:
|
||
if e.errno == 110:
|
||
print("Timeout waiting for data")
|
||
else:
|
||
print("Read error:", e)
|
||
return []
|
||
|
||
|
||
def write_data(buf):
|
||
try:
|
||
n = dev.write(OUT_EP, buf, timeout=1000)
|
||
print(f"Bulk OUT: sent {n} bytes")
|
||
except usb.core.USBError as e:
|
||
print("Write error:", e)
|
||
|
||
|
||
def process_unpacked(d0, d1, d2, d3, _, d5):
|
||
f = {
|
||
'adc00': d0,
|
||
'adc01': d1,
|
||
'adc10': d2,
|
||
'adc11': d3,
|
||
'gpios': d5 & 0x1f,
|
||
'range': d5 >> 5
|
||
}
|
||
f['adc10'] = 10174-(f['adc10']*989)/1000
|
||
#if f['range'] !=0:
|
||
# print(f)
|
||
#print(f)
|
||
|
||
|
||
def main():
|
||
frame_struct = struct.Struct('>hhhhBB')
|
||
try:
|
||
st = time.time()
|
||
pkgcount = 0
|
||
|
||
while True:
|
||
#write_data(msg)
|
||
#time.sleep(0.5)
|
||
|
||
d = read_data()
|
||
|
||
buf = d.tobytes()
|
||
mv = memoryview(buf)
|
||
pos = 0
|
||
last = None
|
||
|
||
while True:
|
||
idx = buf.find(FRAME_HEADER, pos)
|
||
if idx < 0:
|
||
break
|
||
|
||
data_start_idx = None
|
||
|
||
if last is not None and idx - last == FRAME_SIZE:
|
||
# unpack directly from the memoryview—zero copy!
|
||
data_start_idx = last + 2
|
||
elif last is None:
|
||
# handle the first frame (might not have a full header)
|
||
if idx==11 and buf[0] == FRAME_HEADER[1]:
|
||
data_start_idx = 1
|
||
elif idx==12 and buf[0] == FRAME_HEADER[0] and buf[1] == FRAME_HEADER[1]:
|
||
data_start_idx = 2
|
||
|
||
if data_start_idx is not None:
|
||
d0, d1, d2, d3, _, d5 = frame_struct.unpack_from(mv, data_start_idx)
|
||
process_unpacked(d0, d1, d2, d3, _, d5)
|
||
pkgcount += 1
|
||
|
||
if pkgcount % 10000 == 0:
|
||
now = time.time()
|
||
print(f"Packages: {pkgcount/(now-st):.3f} frames/sec")
|
||
st = now
|
||
pkgcount = 0
|
||
|
||
last = idx
|
||
pos = idx + 1
|
||
# TODO: check for partial frame at the end of the buffer
|
||
|
||
|
||
finally:
|
||
# clean up: release and re‐attach the kernel driver
|
||
usb.util.release_interface(dev, RAW_IFACE)
|
||
try:
|
||
dev.attach_kernel_driver(RAW_IFACE)
|
||
except Exception:
|
||
pass
|
||
print("Released interface and re-attached kernel driver.")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|