#!/usr/bin/env python3
"""
Critical Regime Generators for LRDBenchmark.
Physics-motivated generators for testing LRD estimators in critical,
nonequilibrium, and heavy-tailed regimes where classical assumptions fail.
Classes:
- OrnsteinUhlenbeckProcess: OU with time-varying friction (transient criticality)
- SubordinatedProcess: Subordinated Brownian motion (nonequilibrium)
- FractionalLevyMotion: Heavy-tailed, non-Gaussian LRD (α<2 stable)
- SOCAvalancheModel: Bak-Tang-Wiesenfeld self-organized criticality
"""
from enum import Enum
from typing import Any, Dict, Optional, Tuple
import numpy as np
from scipy.special import gamma as gamma_func
[docs]
class OrnsteinUhlenbeckProcess:
"""
Ornstein-Uhlenbeck process with time-varying friction coefficient.
Models transient criticality where the system transitions between
different relaxation regimes.
The SDE is:
dX_t = -θ(t) * X_t * dt + σ * dW_t
where θ(t) is the time-varying friction coefficient.
Example
-------
>>> gen = OrnsteinUhlenbeckProcess(theta_start=0.1, theta_end=1.0)
>>> result = gen.generate(1000)
"""
[docs]
def __init__(
self,
theta_start: float = 0.1,
theta_end: float = 1.0,
sigma: float = 1.0,
transition_type: str = "linear",
dt: float = 0.01,
random_state: Optional[int] = None,
):
"""
Initialize OU process with time-varying friction.
Parameters
----------
theta_start : float
Initial friction coefficient (low = critical-like)
theta_end : float
Final friction coefficient
sigma : float
Noise intensity
transition_type : str
How θ transitions: 'linear', 'exponential', 'step'
dt : float
Time step for simulation
random_state : int, optional
Random seed
"""
self.theta_start = theta_start
self.theta_end = theta_end
self.sigma = sigma
self.transition_type = transition_type
self.dt = dt
self.rng = np.random.default_rng(random_state)
[docs]
def _get_theta_trajectory(self, length: int) -> np.ndarray:
"""Get time-varying friction coefficient."""
t = np.linspace(0, 1, length)
if self.transition_type == "linear":
return self.theta_start + (self.theta_end - self.theta_start) * t
elif self.transition_type == "exponential":
rate = 3.0
return self.theta_start + (self.theta_end - self.theta_start) * (
1 - np.exp(-rate * t)
)
elif self.transition_type == "step":
theta = np.full(length, self.theta_start)
theta[length // 2 :] = self.theta_end
return theta
else:
raise ValueError(f"Unknown transition type: {self.transition_type}")
[docs]
def generate(self, length: int, seed: Optional[int] = None) -> Dict[str, Any]:
"""
Generate OU process with time-varying friction.
Returns dict with 'signal', 'theta_trajectory', 'metadata'.
"""
local_rng = np.random.default_rng(seed) if seed is not None else self.rng
theta_traj = self._get_theta_trajectory(length)
# Euler-Maruyama simulation
x = np.zeros(length)
x[0] = local_rng.normal(0, self.sigma / np.sqrt(2 * self.theta_start))
sqrt_dt = np.sqrt(self.dt)
for i in range(1, length):
theta = theta_traj[i - 1]
drift = -theta * x[i - 1] * self.dt
diffusion = self.sigma * sqrt_dt * local_rng.normal()
x[i] = x[i - 1] + drift + diffusion
return {
"signal": x,
"theta_trajectory": theta_traj,
"metadata": {
"process_type": "OrnsteinUhlenbeck",
"theta_start": self.theta_start,
"theta_end": self.theta_end,
"sigma": self.sigma,
"transition_type": self.transition_type,
"stationary": False,
},
}
[docs]
class SubordinatedProcess:
"""
Subordinated Brownian motion for modeling nonequilibrium phenomena.
The process X(S(t)) where X is Brownian motion and S(t) is an
inverse stable subordinator, producing subdiffusive behavior.
This models systems with trapping events and anomalous diffusion
where classical ergodicity breaks down.
Example
-------
>>> gen = SubordinatedProcess(alpha=0.7)
>>> result = gen.generate(1000)
"""
[docs]
def __init__(
self, alpha: float = 0.7, sigma: float = 1.0, random_state: Optional[int] = None
):
"""
Initialize subordinated process.
Parameters
----------
alpha : float
Subordinator index (0 < alpha < 1). Lower = more trapping.
sigma : float
Diffusion coefficient of parent Brownian motion
random_state : int, optional
Random seed
"""
if not 0 < alpha < 1:
raise ValueError("alpha must be in (0, 1)")
self.alpha = alpha
self.sigma = sigma
self.rng = np.random.default_rng(random_state)
[docs]
def _generate_stable_subordinator(
self, length: int, rng: np.random.Generator
) -> np.ndarray:
"""Generate one-sided stable Lévy process (subordinator)."""
# Use Chambers-Mallows-Stuck algorithm for positive stable
u = rng.uniform(0, np.pi, length)
e = rng.exponential(1.0, length)
# Positive stable with index alpha using the
# Chambers-Mallows-Stuck one-sided stable formula.
s = (np.sin(self.alpha * u) / np.sin(u) ** (1 / self.alpha)) * (
np.sin((1 - self.alpha) * u) / e
) ** ((1 - self.alpha) / self.alpha)
return np.cumsum(s)
[docs]
def generate(self, length: int, seed: Optional[int] = None) -> Dict[str, Any]:
"""
Generate subordinated Brownian motion.
Returns dict with 'signal', 'operational_time', 'metadata'.
"""
local_rng = np.random.default_rng(seed) if seed is not None else self.rng
# Generate subordinator (operational time)
subordinator = self._generate_stable_subordinator(length * 2, local_rng)
# Generate parent Brownian motion (on operational time scale)
bm = np.cumsum(local_rng.normal(0, self.sigma, length * 2))
# Inverse subordinator (first passage times)
physical_times = np.linspace(0, subordinator[-1], length)
# Interpolate BM at inverse subordinator times
indices = np.searchsorted(subordinator, physical_times)
indices = np.clip(indices, 0, len(bm) - 1)
signal = bm[indices]
return {
"signal": signal,
"operational_time": subordinator[:length],
"metadata": {
"process_type": "SubordinatedBrownian",
"alpha": self.alpha,
"sigma": self.sigma,
"subdiffusive": True,
"ergodic": False,
},
}
[docs]
class FractionalLevyMotion:
"""
Linear Fractional Stable Motion (LFSM) via FFT-based spectral method.
Generates heavy-tailed, non-Gaussian processes with long-range dependence
by applying fractional integration to symmetric α-stable noise in the
frequency domain.
The algorithm:
1. Generate symmetric α-stable noise Z
2. FFT to frequency domain: Z̃ = FFT(Z)
3. Apply spectral kernel: X̃ = Z̃ * |ω|^{-d} where d = H - 1/α
4. IFFT back to time domain: X = IFFT(X̃)
When α = 2 (Gaussian case), d = H - 0.5, recovering fractional Brownian motion.
Parameters
----------
H : float
Hurst (self-similarity) parameter, 0 < H < 1
alpha : float
Stability index, 0 < alpha <= 2. α=2 is Gaussian (fBm).
beta : float
Skewness parameter, -1 <= beta <= 1. Use 0 for symmetric.
scale : float
Scale parameter for the stable distribution
use_hpfracc : bool
If True, attempt to use hpfracc library for optimized operations.
Falls back to NumPy if hpfracc is not available.
random_state : int, optional
Random seed for reproducibility
Example
-------
>>> gen = FractionalLevyMotion(H=0.7, alpha=1.5)
>>> result = gen.generate(1000)
>>> signal = result['signal']
Notes
-----
The relationship between H (Hurst parameter), α (stability index), and
d (fractional integration order) is: d = H - 1/α
For LFSM, the valid parameter range requires: 0 < H < 1 and 1/α < H < 1
to ensure d > 0 (fractional integration, not differentiation).
References
----------
Samorodnitsky, G. & Taqqu, M. S. (1994). Stable Non-Gaussian Random
Processes: Stochastic Models with Infinite Variance. Chapman & Hall.
"""
# Try to import hpfracc at class level
_hpfracc_available = None
_hpfracc_module = None
[docs]
@classmethod
def _check_hpfracc(cls) -> bool:
"""Check if hpfracc is available (cached)."""
if cls._hpfracc_available is None:
try:
import hpfracc
cls._hpfracc_module = hpfracc
cls._hpfracc_available = True
except ImportError:
cls._hpfracc_available = False
return cls._hpfracc_available
[docs]
def __init__(
self,
H: float = 0.7,
alpha: float = 1.5,
beta: float = 0.0,
scale: float = 1.0,
use_hpfracc: bool = True,
random_state: Optional[int] = None,
):
"""
Initialize Linear Fractional Stable Motion generator.
Parameters
----------
H : float
Hurst-like self-similarity parameter (0 < H < 1)
alpha : float
Stability index (0 < alpha <= 2). α=2 is Gaussian.
beta : float
Skewness parameter (-1 <= beta <= 1)
scale : float
Scale parameter
use_hpfracc : bool
Whether to use hpfracc library if available
random_state : int, optional
Random seed
"""
if not 0 < H < 1:
raise ValueError("H must be in (0, 1)")
if not 0 < alpha <= 2:
raise ValueError("alpha must be in (0, 2]")
if not -1 <= beta <= 1:
raise ValueError("beta must be in [-1, 1]")
self.H = H
self.alpha = alpha
self.beta = beta
self.scale = scale
self.use_hpfracc = use_hpfracc and self._check_hpfracc()
self.rng = np.random.default_rng(random_state)
# Compute fractional integration order: d = H - 1/alpha
# This is the key relationship for LFSM
self.d = H - 1.0 / alpha
[docs]
def _generate_stable_rv(self, size: int, rng: np.random.Generator) -> np.ndarray:
"""
Generate stable random variables using Chambers-Mallows-Stuck algorithm.
For symmetric α-stable (beta=0), this produces the Lévy driver noise.
"""
u = rng.uniform(-np.pi / 2, np.pi / 2, size)
e = rng.exponential(1.0, size)
if self.alpha == 2:
# Gaussian case
return rng.normal(0, self.scale * np.sqrt(2), size)
if self.alpha == 1:
# Cauchy case
return self.scale * np.tan(u)
# General stable case (Chambers-Mallows-Stuck)
b = np.arctan(self.beta * np.tan(np.pi * self.alpha / 2)) / self.alpha
s = (1 + self.beta**2 * np.tan(np.pi * self.alpha / 2) ** 2) ** (
1 / (2 * self.alpha)
)
x = (
s
* (np.sin(self.alpha * (u + b)) / np.cos(u) ** (1 / self.alpha))
* (np.cos(u - self.alpha * (u + b)) / e) ** ((1 - self.alpha) / self.alpha)
)
return self.scale * x
[docs]
def _apply_spectral_kernel(self, noise: np.ndarray) -> np.ndarray:
"""
Apply fractional integration kernel |ω|^{-d} in frequency domain.
This is the core of the spectral method for LFSM generation.
"""
n = len(noise)
# FFT of the stable noise
noise_fft = np.fft.fft(noise)
# Construct frequency array
freq = np.fft.fftfreq(n)
# Build spectral kernel: |ω|^{-d}
# Handle DC component (ω=0) to avoid division by zero
omega = 2 * np.pi * freq
with np.errstate(divide="ignore", invalid="ignore"):
kernel = np.abs(omega) ** (-self.d)
# Set DC component to 0 (removes mean, standard for LFSM)
kernel[0] = 0.0
# Handle any remaining infinities at very low frequencies
kernel = np.nan_to_num(kernel, nan=0.0, posinf=0.0, neginf=0.0)
# Apply kernel in frequency domain
result_fft = noise_fft * kernel
# Inverse FFT to get time-domain signal
result = np.fft.ifft(result_fft)
# Take real part (imaginary should be negligible for real input)
return np.real(result)
[docs]
def _apply_spectral_kernel_hpfracc(self, noise: np.ndarray) -> np.ndarray:
"""
Apply fractional integration using hpfracc's optimized methods.
Uses Riemann-Liouville fractional integral when available.
"""
try:
hpfracc = self._hpfracc_module
# Use hpfracc's fractional integral if available
# The RL integral of order d corresponds to |ω|^{-d} in frequency domain
if hasattr(hpfracc, "riemann_liouville_integral"):
t = np.linspace(0, 1, len(noise))
result = hpfracc.riemann_liouville_integral(t, noise, self.d)
return result
elif hasattr(hpfracc, "optimized_riemann_liouville_integral"):
t = np.linspace(0, 1, len(noise))
from hpfracc import FractionalOrder
order = FractionalOrder(self.d)
result = hpfracc.optimized_riemann_liouville_integral(t, noise, order)
return result
else:
# Fall back to numpy implementation
return self._apply_spectral_kernel(noise)
except Exception:
# On any error, fall back to numpy
return self._apply_spectral_kernel(noise)
[docs]
def generate(self, length: int, seed: Optional[int] = None) -> Dict[str, Any]:
"""
Generate Linear Fractional Stable Motion.
Uses FFT-based spectral method with kernel |ω|^{-d} where d = H - 1/α.
Parameters
----------
length : int
Length of the time series to generate
seed : int, optional
Random seed for this generation (overrides constructor seed)
Returns
-------
dict
Dictionary containing:
- 'signal': The generated LFSM time series
- 'metadata': Process parameters and properties
"""
local_rng = np.random.default_rng(seed) if seed is not None else self.rng
# Step 1: Generate symmetric α-stable innovations (Lévy driver)
innovations = self._generate_stable_rv(length, local_rng)
# Steps 2-4: Apply spectral fractional integration
if self.use_hpfracc and self._hpfracc_available:
signal = self._apply_spectral_kernel_hpfracc(innovations)
else:
signal = self._apply_spectral_kernel(innovations)
return {
"signal": signal,
"metadata": {
"process_type": "LinearFractionalStableMotion",
"H": self.H,
"alpha": self.alpha,
"beta": self.beta,
"scale": self.scale,
"d": self.d, # Fractional integration order
"heavy_tailed": self.alpha < 2,
"infinite_variance": self.alpha < 2,
"method": "spectral_fft",
"used_hpfracc": self.use_hpfracc and self._hpfracc_available,
},
}
[docs]
class SOCAvalancheModel:
"""
Self-Organized Criticality avalanche model (Bak-Tang-Wiesenfeld).
Simulates a sandpile model producing scale-free avalanche dynamics.
The resulting time series of avalanche sizes exhibits power-law
correlations characteristic of critical systems.
Example
-------
>>> gen = SOCAvalancheModel(grid_size=64)
>>> result = gen.generate(1000)
"""
[docs]
def __init__(
self,
grid_size: int = 32,
threshold: int = 4,
random_state: Optional[int] = None,
):
"""
Initialize SOC sandpile model.
Parameters
----------
grid_size : int
Size of square lattice
threshold : int
Toppling threshold (typically 4 for 2D)
random_state : int, optional
Random seed
"""
self.grid_size = grid_size
self.threshold = threshold
self.rng = np.random.default_rng(random_state)
[docs]
def _run_sandpile(self, n_avalanches: int, rng: np.random.Generator) -> np.ndarray:
"""Run sandpile simulation and record avalanche sizes."""
grid = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)
avalanche_sizes = []
for _ in range(n_avalanches):
# Add grain at random site
x = rng.integers(0, self.grid_size)
y = rng.integers(0, self.grid_size)
grid[x, y] += 1
# Topple until stable
avalanche_size = 0
while np.any(grid >= self.threshold):
unstable = grid >= self.threshold
avalanche_size += np.sum(unstable)
# Topple unstable sites
topple_count = grid[unstable] // self.threshold
grid[unstable] -= topple_count * self.threshold
# Distribute to neighbors
unstable_coords = np.argwhere(unstable)
for cx, cy in unstable_coords:
if cx > 0:
grid[cx - 1, cy] += 1
if cx < self.grid_size - 1:
grid[cx + 1, cy] += 1
if cy > 0:
grid[cx, cy - 1] += 1
if cy < self.grid_size - 1:
grid[cx, cy + 1] += 1
avalanche_sizes.append(avalanche_size)
return np.array(avalanche_sizes)
[docs]
def generate(
self, length: int, seed: Optional[int] = None, warmup: int = 1000
) -> Dict[str, Any]:
"""
Generate time series of avalanche sizes from SOC sandpile.
Parameters
----------
length : int
Number of avalanche events to generate
seed : int, optional
Random seed
warmup : int
Number of initial events to discard (reach critical state)
Returns dict with 'signal', 'metadata'.
"""
local_rng = np.random.default_rng(seed) if seed is not None else self.rng
# Run with warmup
total_events = warmup + length
all_sizes = self._run_sandpile(total_events, local_rng)
# Discard warmup
signal = all_sizes[warmup:].astype(np.float64)
# Normalize for analysis
signal = (signal - np.mean(signal)) / (np.std(signal) + 1e-10)
return {
"signal": signal,
"raw_avalanche_sizes": all_sizes[warmup:],
"metadata": {
"process_type": "SOCAvalanche",
"grid_size": self.grid_size,
"threshold": self.threshold,
"warmup": warmup,
"critical": True,
"power_law_distributed": True,
},
}
# Convenience factory function
[docs]
def create_critical_regime_process(process_type: str, **kwargs) -> Any:
"""
Factory function for critical regime processes.
Parameters
----------
process_type : str
'ornstein_uhlenbeck', 'subordinated', 'fractional_levy', 'soc_avalanche'
**kwargs
Process-specific parameters
"""
process_map = {
"ornstein_uhlenbeck": OrnsteinUhlenbeckProcess,
"ou": OrnsteinUhlenbeckProcess,
"subordinated": SubordinatedProcess,
"subordinated_brownian": SubordinatedProcess,
"fractional_levy": FractionalLevyMotion,
"levy": FractionalLevyMotion,
"lfsm": FractionalLevyMotion,
"linear_fractional_stable_motion": FractionalLevyMotion,
"soc_avalanche": SOCAvalancheModel,
"soc": SOCAvalancheModel,
"sandpile": SOCAvalancheModel,
}
process_type = process_type.lower().replace(" ", "_").replace("-", "_")
if process_type not in process_map:
raise ValueError(
f"Unknown process type '{process_type}'. "
f"Available: ornstein_uhlenbeck, subordinated, fractional_levy, lfsm, soc_avalanche"
)
return process_map[process_type](**kwargs)
# Alias for explicit naming
LinearFractionalStableMotion = FractionalLevyMotion