Create breath_keeper.py

This commit is contained in:
blackboxprogramming
2025-08-10 21:52:43 -07:00
committed by GitHub
parent b8880fbdd4
commit ee45d018cb

219
breath_keeper.py Normal file
View File

@@ -0,0 +1,219 @@
"""
Breath Keeper A/B analysis and persistence metrics.
This module provides utilities for analyzing oscillator signals, including
calculation of unbiased autocorrelation, coherence half-life, beat period,
phase-slip, and energy drift. It also implements a breath keeper (phase-locked
loop with node snapping, amplitude control, and symplectic oscillator) to
maintain phase coherence and conserve energy. A command-line interface is
provided for running baseline vs keeper-enabled analysis on a CSV file of
time series data.
"""
from __future__ import annotations
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Optional, Dict
try:
from scipy.signal import hilbert, find_peaks
except Exception as e:
raise SystemExit("Please install scipy via 'pip install scipy' to use breath_keeper.") from e
# Utility functions
def unbiased_autocorr(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float64)
x = x - np.mean(x)
N = len(x)
ac = np.correlate(x, x, mode='full')
ac = ac[N-1:]
norm = np.arange(N, 0, -1, dtype=np.float64)
ac_unbiased = ac / norm
return ac_unbiased / ac_unbiased[0]
def fit_coherence_half_life(ac: np.ndarray, Fs: float, min_lag_s: float = 2.0, max_lag_frac: float = 0.5) -> Tuple[float, Tuple[float, float]]:
N = len(ac)
t = np.arange(N)/Fs
lo = int(min_lag_s*Fs)
hi = int(min(N-1, max_lag_frac*N))
if hi <= lo+5:
return float('nan'), (float('nan'), float('nan'))
y = np.clip(ac[lo:hi], 1e-12, 1.0)
tt = t[lo:hi]
A = np.vstack([tt, np.ones_like(tt)]).T
coeff, _, _, _ = np.linalg.lstsq(A, np.log(y), rcond=None)
slope, intercept = coeff
tau = -1.0 / slope if slope < 0 else float('nan')
residuals = np.log(y) - (A @ coeff)
sigma = np.std(residuals)
denom = np.sum((tt - np.mean(tt))**2)
if denom <= 0:
return tau, (float('nan'), float('nan'))
var_slope = sigma**2 / denom
se_slope = np.sqrt(var_slope)
slope_lo = slope - 2*se_slope
slope_hi = slope + 2*se_slope
tau_lo = -1.0/slope_hi if slope_hi < 0 else float('nan')
tau_hi = -1.0/slope_lo if slope_lo < 0 else float('nan')
return float(tau), (float(tau_lo), float(tau_hi))
def analytic_envelope(x: np.ndarray) -> np.ndarray:
return np.abs(hilbert(x))
def beat_period_from_envelope(env: np.ndarray, Fs: float, min_period_s: float = 0.5, max_period_s: float = 10.0) -> float:
ac = unbiased_autocorr(env)
lags = np.arange(len(ac))/Fs
lo = int(min_period_s*Fs)
hi = int(min(len(ac)-1, max_period_s*Fs))
if hi <= lo+3:
return float('nan')
peaks, _ = find_peaks(ac[lo:hi], height=0.2)
if len(peaks) == 0:
return float('nan')
first = peaks[0] + lo
return lags[first]
def node_trough_indices(env: np.ndarray, Tb: float, Fs: float) -> np.ndarray:
if not np.isfinite(Tb) or Tb <= 0:
idx, _ = find_peaks(-env, distance=int(0.25*Fs))
return idx
idx, _ = find_peaks(-env, distance=int(0.8*Tb*Fs))
return idx
def phase_from_analytic(x: np.ndarray) -> np.ndarray:
return np.unwrap(np.angle(hilbert(x)))
def energy_series(x: np.ndarray, Fs: float, win_periods: float = 1.0, carrier_Hz: Optional[float] = None) -> np.ndarray:
if carrier_Hz and carrier_Hz > 0:
win = int(max(1, round(Fs/carrier_Hz*win_periods)))
else:
win = int(max(1, round(Fs/10)))
kernel = np.ones(win)/win
rms = np.sqrt(np.convolve(x**2, kernel, mode='same'))
return rms**2
def wrap_pi(a):
return (a + np.pi) % (2*np.pi) - np.pi
@dataclass
class KeeperConfig:
Fs: float
Kp: float = 0.05
Ki: float = 0.001
agc_tau: float = 0.5
snap_thresh: float = 0.05
omega_init: Optional[float] = None
class BreathKeeper:
def __init__(self, cfg: KeeperConfig):
self.cfg = cfg
self._phi_int = 0.0
self._amp = 1.0
self.q = 0.0
self.p = 0.0
self.omega = cfg.omega_init if cfg.omega_init else 2*np.pi*1.0
def phase_est(self) -> float:
return np.arctan2(self.p, self.q + 1e-12)
def set_phase(self, phi: float):
A = max(1e-9, self._amp)
self.q = A * np.cos(phi)
self.p = A * np.sin(phi)
def step(self, x_t: float, env_t: float, phi_meas: float) -> float:
phi_err = wrap_pi(phi_meas - self.phase_est())
self._phi_int += self.cfg.Ki * phi_err
dphi = self.cfg.Kp * phi_err + self._phi_int
self.omega = max(1e-6, self.omega + dphi)
if env_t < self.cfg.snap_thresh * (self._amp + 1e-9):
self.set_phase(np.round(self.phase_est()/np.pi)*np.pi)
alpha = np.exp(-1.0/(self.cfg.agc_tau*self.cfg.Fs))
self._amp = alpha*self._amp + (1-alpha)*abs(x_t)
dt = 1.0/self.cfg.Fs
self.p -= (self.omega**2)*self.q*(dt*0.5)
self.q += self.p*dt
self.p -= (self.omega**2)*self.q*(dt*0.5)
return self.phase_est()
@dataclass
class Metrics:
Tb: float
tau_c: float
tau_ci: Tuple[float,float]
phase_slip_rad_per_beat: float
energy_drift_per_beat: float
def compute_metrics(x: np.ndarray, Fs: float) -> Metrics:
env = analytic_envelope(x)
Tb = beat_period_from_envelope(env, Fs)
ac = unbiased_autocorr(env)
tau_c, tau_ci = fit_coherence_half_life(ac, Fs)
nodes = node_trough_indices(env, Tb, Fs)
phi = phase_from_analytic(x)
if len(nodes) >= 2:
dphi = wrap_pi(np.diff(phi[nodes]))
phase_slip = float(np.mean(np.abs(dphi)))
else:
phase_slip = float('nan')
E = energy_series(x, Fs)
if len(nodes) >= 2:
Eb = E[nodes]
rel_changes = np.diff(Eb)/(Eb[:-1]+1e-12)
energy_drift = float(np.mean(rel_changes))
else:
energy_drift = float('nan')
return Metrics(Tb=Tb, tau_c=float(tau_c), tau_ci=(float(tau_ci[0]), float(tau_ci[1])), phase_slip_rad_per_beat=phase_slip, energy_drift_per_beat=energy_drift)
def keeper_follow(x: np.ndarray, Fs: float) -> np.ndarray:
env = analytic_envelope(x)
phi_meas = np.unwrap(np.angle(hilbert(x)))
cfg = KeeperConfig(Fs=Fs)
k = BreathKeeper(cfg)
y = np.zeros_like(x)
for n in range(len(x)):
phi = k.step(x[n], env[n], phi_meas[n])
y[n] = np.cos(phi)
return y
def analyze_ab(x: np.ndarray, Fs: float) -> Dict[str, Metrics]:
base_metrics = compute_metrics(x, Fs)
y = keeper_follow(x, Fs)
keep_metrics = compute_metrics(y, Fs)
return {"baseline": base_metrics, "keeper": keep_metrics}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Breath keeper analysis: compute baseline and keeper metrics on CSV file.")
parser.add_argument("--csv", type=str, required=True, help="Path to CSV with columns t,x or x.")
parser.add_argument("--fs", type=float, required=False, help="Sampling rate in Hz if no t column.")
args = parser.parse_args()
import numpy as np
data = np.genfromtxt(args.csv, delimiter=",", names=True, dtype=None, encoding=None)
if "x" in data.dtype.names:
x = data["x"].astype(np.float64)
if "t" in data.dtype.names:
t = data["t"].astype(np.float64)
Fs_val = 1.0/np.mean(np.diff(t))
else:
if args.fs is None:
raise ValueError("Sampling rate must be provided if no t column.")
Fs_val = float(args.fs)
else:
raw = np.genfromtxt(args.csv, delimiter=",")
if raw.ndim == 1:
if args.fs is None:
raise ValueError("Sampling rate must be provided for single column CSV.")
x = raw.astype(np.float64)
Fs_val = float(args.fs)
else:
t = raw[:,0].astype(np.float64)
x = raw[:,1].astype(np.float64)
Fs_val = 1.0/np.mean(np.diff(t))
results = analyze_ab(x, Fs_val)
for label, m in results.items():
print(f"[{label}]")
print(f" Beat period Tb (s): {m.Tb:.6f}")
print(f" Coherence half-life \u03c4c (s): {m.tau_c:.6f} (CI ~ {m.tau_ci[0]:.3f}, {m.tau_ci[1]:.3f})")
print(f" Phase slip |\u03c6̇| (rad/beat): {m.phase_slip_rad_per_beat:.6e}")
print(f" Energy drift \u0110 (/beat): {m.energy_drift_per_beat:.6e}")