mirror of
https://github.com/blackboxprogramming/lucidia.git
synced 2026-03-17 07:57:19 -05:00
220 lines
8.1 KiB
Python
220 lines
8.1 KiB
Python
"""
|
|
Breath Keeper A/B analysis and persistence metrics.
|
|
|
|
This module provides utilities for analyzing oscillator signals, including
|
|
calculation of unbiased autocorrelation, coherence half-life, beat period,
|
|
phase-slip, and energy drift. It also implements a breath keeper (phase-locked
|
|
loop with node snapping, amplitude control, and symplectic oscillator) to
|
|
maintain phase coherence and conserve energy. A command-line interface is
|
|
provided for running baseline vs keeper-enabled analysis on a CSV file of
|
|
time series data.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
from typing import Tuple, Optional, Dict
|
|
|
|
try:
|
|
from scipy.signal import hilbert, find_peaks
|
|
except Exception as e:
|
|
raise SystemExit("Please install scipy via 'pip install scipy' to use breath_keeper.") from e
|
|
|
|
# Utility functions
|
|
def unbiased_autocorr(x: np.ndarray) -> np.ndarray:
|
|
x = x.astype(np.float64)
|
|
x = x - np.mean(x)
|
|
N = len(x)
|
|
ac = np.correlate(x, x, mode='full')
|
|
ac = ac[N-1:]
|
|
norm = np.arange(N, 0, -1, dtype=np.float64)
|
|
ac_unbiased = ac / norm
|
|
return ac_unbiased / ac_unbiased[0]
|
|
|
|
def fit_coherence_half_life(ac: np.ndarray, Fs: float, min_lag_s: float = 2.0, max_lag_frac: float = 0.5) -> Tuple[float, Tuple[float, float]]:
|
|
N = len(ac)
|
|
t = np.arange(N)/Fs
|
|
lo = int(min_lag_s*Fs)
|
|
hi = int(min(N-1, max_lag_frac*N))
|
|
if hi <= lo+5:
|
|
return float('nan'), (float('nan'), float('nan'))
|
|
y = np.clip(ac[lo:hi], 1e-12, 1.0)
|
|
tt = t[lo:hi]
|
|
A = np.vstack([tt, np.ones_like(tt)]).T
|
|
coeff, _, _, _ = np.linalg.lstsq(A, np.log(y), rcond=None)
|
|
slope, intercept = coeff
|
|
tau = -1.0 / slope if slope < 0 else float('nan')
|
|
residuals = np.log(y) - (A @ coeff)
|
|
sigma = np.std(residuals)
|
|
denom = np.sum((tt - np.mean(tt))**2)
|
|
if denom <= 0:
|
|
return tau, (float('nan'), float('nan'))
|
|
var_slope = sigma**2 / denom
|
|
se_slope = np.sqrt(var_slope)
|
|
slope_lo = slope - 2*se_slope
|
|
slope_hi = slope + 2*se_slope
|
|
tau_lo = -1.0/slope_hi if slope_hi < 0 else float('nan')
|
|
tau_hi = -1.0/slope_lo if slope_lo < 0 else float('nan')
|
|
return float(tau), (float(tau_lo), float(tau_hi))
|
|
|
|
def analytic_envelope(x: np.ndarray) -> np.ndarray:
|
|
return np.abs(hilbert(x))
|
|
|
|
def beat_period_from_envelope(env: np.ndarray, Fs: float, min_period_s: float = 0.5, max_period_s: float = 10.0) -> float:
|
|
ac = unbiased_autocorr(env)
|
|
lags = np.arange(len(ac))/Fs
|
|
lo = int(min_period_s*Fs)
|
|
hi = int(min(len(ac)-1, max_period_s*Fs))
|
|
if hi <= lo+3:
|
|
return float('nan')
|
|
peaks, _ = find_peaks(ac[lo:hi], height=0.2)
|
|
if len(peaks) == 0:
|
|
return float('nan')
|
|
first = peaks[0] + lo
|
|
return lags[first]
|
|
|
|
def node_trough_indices(env: np.ndarray, Tb: float, Fs: float) -> np.ndarray:
|
|
if not np.isfinite(Tb) or Tb <= 0:
|
|
idx, _ = find_peaks(-env, distance=int(0.25*Fs))
|
|
return idx
|
|
idx, _ = find_peaks(-env, distance=int(0.8*Tb*Fs))
|
|
return idx
|
|
|
|
def phase_from_analytic(x: np.ndarray) -> np.ndarray:
|
|
return np.unwrap(np.angle(hilbert(x)))
|
|
|
|
def energy_series(x: np.ndarray, Fs: float, win_periods: float = 1.0, carrier_Hz: Optional[float] = None) -> np.ndarray:
|
|
if carrier_Hz and carrier_Hz > 0:
|
|
win = int(max(1, round(Fs/carrier_Hz*win_periods)))
|
|
else:
|
|
win = int(max(1, round(Fs/10)))
|
|
kernel = np.ones(win)/win
|
|
rms = np.sqrt(np.convolve(x**2, kernel, mode='same'))
|
|
return rms**2
|
|
|
|
def wrap_pi(a):
|
|
return (a + np.pi) % (2*np.pi) - np.pi
|
|
|
|
@dataclass
|
|
class KeeperConfig:
|
|
Fs: float
|
|
Kp: float = 0.05
|
|
Ki: float = 0.001
|
|
agc_tau: float = 0.5
|
|
snap_thresh: float = 0.05
|
|
omega_init: Optional[float] = None
|
|
|
|
class BreathKeeper:
|
|
def __init__(self, cfg: KeeperConfig):
|
|
self.cfg = cfg
|
|
self._phi_int = 0.0
|
|
self._amp = 1.0
|
|
self.q = 0.0
|
|
self.p = 0.0
|
|
self.omega = cfg.omega_init if cfg.omega_init else 2*np.pi*1.0
|
|
|
|
def phase_est(self) -> float:
|
|
return np.arctan2(self.p, self.q + 1e-12)
|
|
|
|
def set_phase(self, phi: float):
|
|
A = max(1e-9, self._amp)
|
|
self.q = A * np.cos(phi)
|
|
self.p = A * np.sin(phi)
|
|
|
|
def step(self, x_t: float, env_t: float, phi_meas: float) -> float:
|
|
phi_err = wrap_pi(phi_meas - self.phase_est())
|
|
self._phi_int += self.cfg.Ki * phi_err
|
|
dphi = self.cfg.Kp * phi_err + self._phi_int
|
|
self.omega = max(1e-6, self.omega + dphi)
|
|
if env_t < self.cfg.snap_thresh * (self._amp + 1e-9):
|
|
self.set_phase(np.round(self.phase_est()/np.pi)*np.pi)
|
|
alpha = np.exp(-1.0/(self.cfg.agc_tau*self.cfg.Fs))
|
|
self._amp = alpha*self._amp + (1-alpha)*abs(x_t)
|
|
dt = 1.0/self.cfg.Fs
|
|
self.p -= (self.omega**2)*self.q*(dt*0.5)
|
|
self.q += self.p*dt
|
|
self.p -= (self.omega**2)*self.q*(dt*0.5)
|
|
return self.phase_est()
|
|
|
|
@dataclass
|
|
class Metrics:
|
|
Tb: float
|
|
tau_c: float
|
|
tau_ci: Tuple[float,float]
|
|
phase_slip_rad_per_beat: float
|
|
energy_drift_per_beat: float
|
|
|
|
def compute_metrics(x: np.ndarray, Fs: float) -> Metrics:
|
|
env = analytic_envelope(x)
|
|
Tb = beat_period_from_envelope(env, Fs)
|
|
ac = unbiased_autocorr(env)
|
|
tau_c, tau_ci = fit_coherence_half_life(ac, Fs)
|
|
nodes = node_trough_indices(env, Tb, Fs)
|
|
phi = phase_from_analytic(x)
|
|
if len(nodes) >= 2:
|
|
dphi = wrap_pi(np.diff(phi[nodes]))
|
|
phase_slip = float(np.mean(np.abs(dphi)))
|
|
else:
|
|
phase_slip = float('nan')
|
|
E = energy_series(x, Fs)
|
|
if len(nodes) >= 2:
|
|
Eb = E[nodes]
|
|
rel_changes = np.diff(Eb)/(Eb[:-1]+1e-12)
|
|
energy_drift = float(np.mean(rel_changes))
|
|
else:
|
|
energy_drift = float('nan')
|
|
return Metrics(Tb=Tb, tau_c=float(tau_c), tau_ci=(float(tau_ci[0]), float(tau_ci[1])), phase_slip_rad_per_beat=phase_slip, energy_drift_per_beat=energy_drift)
|
|
|
|
def keeper_follow(x: np.ndarray, Fs: float) -> np.ndarray:
|
|
env = analytic_envelope(x)
|
|
phi_meas = np.unwrap(np.angle(hilbert(x)))
|
|
cfg = KeeperConfig(Fs=Fs)
|
|
k = BreathKeeper(cfg)
|
|
y = np.zeros_like(x)
|
|
for n in range(len(x)):
|
|
phi = k.step(x[n], env[n], phi_meas[n])
|
|
y[n] = np.cos(phi)
|
|
return y
|
|
|
|
def analyze_ab(x: np.ndarray, Fs: float) -> Dict[str, Metrics]:
|
|
base_metrics = compute_metrics(x, Fs)
|
|
y = keeper_follow(x, Fs)
|
|
keep_metrics = compute_metrics(y, Fs)
|
|
return {"baseline": base_metrics, "keeper": keep_metrics}
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Breath keeper analysis: compute baseline and keeper metrics on CSV file.")
|
|
parser.add_argument("--csv", type=str, required=True, help="Path to CSV with columns t,x or x.")
|
|
parser.add_argument("--fs", type=float, required=False, help="Sampling rate in Hz if no t column.")
|
|
args = parser.parse_args()
|
|
import numpy as np
|
|
data = np.genfromtxt(args.csv, delimiter=",", names=True, dtype=None, encoding=None)
|
|
if "x" in data.dtype.names:
|
|
x = data["x"].astype(np.float64)
|
|
if "t" in data.dtype.names:
|
|
t = data["t"].astype(np.float64)
|
|
Fs_val = 1.0/np.mean(np.diff(t))
|
|
else:
|
|
if args.fs is None:
|
|
raise ValueError("Sampling rate must be provided if no t column.")
|
|
Fs_val = float(args.fs)
|
|
else:
|
|
raw = np.genfromtxt(args.csv, delimiter=",")
|
|
if raw.ndim == 1:
|
|
if args.fs is None:
|
|
raise ValueError("Sampling rate must be provided for single column CSV.")
|
|
x = raw.astype(np.float64)
|
|
Fs_val = float(args.fs)
|
|
else:
|
|
t = raw[:,0].astype(np.float64)
|
|
x = raw[:,1].astype(np.float64)
|
|
Fs_val = 1.0/np.mean(np.diff(t))
|
|
results = analyze_ab(x, Fs_val)
|
|
for label, m in results.items():
|
|
print(f"[{label}]")
|
|
print(f" Beat period Tb (s): {m.Tb:.6f}")
|
|
print(f" Coherence half-life \u03c4c (s): {m.tau_c:.6f} (CI ~ {m.tau_ci[0]:.3f}, {m.tau_ci[1]:.3f})")
|
|
print(f" Phase slip |\u03c6̇| (rad/beat): {m.phase_slip_rad_per_beat:.6e}")
|
|
print(f" Energy drift \u0110 (/beat): {m.energy_drift_per_beat:.6e}")
|