0
mirror of https://github.com/TheusHen/EEGFrontier.git synced 2026-04-19 07:14:12 +00:00
Files
EEGFrontier/Pendulum/pendulum_eeg/simulator.py
TheusHen 52c98dcdfa Add EEG processing and visualization components
- Implemented mne_tools.py for converting sample records to MNE Raw format.
- Created models.py to define data structures for sample, event, and error packets.
- Developed pyqt_focus.py for a PyQt-based focus monitoring GUI with real-time EEG data visualization.
- Added 
eflex_bridge.py to manage the EEG engine instance.
- Introduced simulator.py to simulate EEG data packets for testing purposes.
- Established a web dashboard in pendulum_web.py for live EEG data display and diagnostics.
- Configured project dependencies in pyproject.toml and 
equirements.txt.
- Added batch and PowerShell scripts for running and stopping the Reflex application.
- Created 
xconfig.py for Reflex application configuration.
2026-02-28 17:56:04 -03:00

68 lines
2.2 KiB
Python

from __future__ import annotations
import math
import random
import time
from dataclasses import dataclass, field
from .firmware_protocol import ADS_STATUS_HEADER_OK, counts_to_microvolts
from .models import SamplePacket
def microvolts_to_counts(microvolts: float, vref_uv: int = 4_500_000, gain: int = 24) -> int:
scale = (gain * 8_388_607.0) / float(vref_uv)
return int(microvolts * scale)
@dataclass(slots=True)
class EEGSimulator:
sample_rate_hz: int = 250
amplitude_uv: float = 35.0
start_monotonic: float = field(default_factory=time.monotonic)
sample_index: int = 0
def next_packet(self) -> SamplePacket:
t = self.sample_index / float(self.sample_rate_hz)
drift = 1.0 + 0.2 * math.sin(2.0 * math.pi * 0.03 * t)
# Mistura simples de bandas para parecer EEG.
alpha = math.sin(2.0 * math.pi * 10.0 * t)
beta = math.sin(2.0 * math.pi * 19.0 * t + 0.5)
theta = math.sin(2.0 * math.pi * 6.0 * t + 1.2)
delta = math.sin(2.0 * math.pi * 2.0 * t + 2.4)
gamma = math.sin(2.0 * math.pi * 35.0 * t + 0.7)
noise = random.gauss(0.0, 0.15)
base_uv = self.amplitude_uv * drift * (
0.35 * alpha + 0.45 * beta + 0.20 * theta + 0.12 * delta + 0.06 * gamma + noise
)
ch_values_uv = [
base_uv + random.gauss(0.0, 2.0),
base_uv * 0.95 + random.gauss(0.0, 2.0),
base_uv * 1.05 + random.gauss(0.0, 2.2),
base_uv * 1.02 + random.gauss(0.0, 1.8),
]
ch_counts = [microvolts_to_counts(v) for v in ch_values_uv]
timestamp_us = int((time.monotonic() - self.start_monotonic) * 1_000_000.0)
packet = SamplePacket(
version=1,
sample_index=self.sample_index,
t_us=timestamp_us,
status24=ADS_STATUS_HEADER_OK,
ch1=ch_counts[0],
ch2=ch_counts[1],
ch3=ch_counts[2],
ch4=ch_counts[3],
flags=1, # FLAG_STREAMING
missed_drdy_frame=0,
recoveries_total=0,
)
self.sample_index += 1
return packet
def to_microvolts(self, counts: int) -> float:
return counts_to_microvolts(counts)