Source code for lrdbenchmark.analysis.temporal.rs_estimator

#!/usr/bin/env python3
"""
Unified R/S (Rescaled Range) Estimator.
Refactored to use modular backends (NumPy, JAX, Numba).
"""

import math
import os
import warnings
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
from scipy import stats

from lrdbenchmark.analysis.backend_utils import (
    JAX_AVAILABLE,
    NUMBA_AVAILABLE,
    select_backend,
)
from lrdbenchmark.analysis.base_estimator import BaseEstimator

from .rs_backends import numpy_backend

# Optional backends
try:
    from .rs_backends import jax_backend
except ImportError:
    jax_backend = None

try:
    from .rs_backends import numba_backend
except ImportError:
    numba_backend = None


[docs] class RSEstimator(BaseEstimator): """ Unified R/S (Rescaled Range) Estimator for Long-Range Dependence Analysis. Backends: - 'jax': GPU/TPU accelerated (if available) - 'numba': CPU JIT compiled - 'numpy': Reference implementation """
[docs] def __init__( self, min_block_size: Optional[int] = None, max_block_size: Optional[int] = None, num_blocks: Optional[int] = None, use_optimization: str = "auto", *, min_window_size: Optional[int] = None, max_window_size: Optional[int] = None, num_windows: Optional[int] = None, window_sizes: Optional[Sequence[int]] = None, overlap: bool = False, ) -> None: # Legacy parameter bridging if min_window_size is not None: min_block_size = min_window_size if max_window_size is not None: max_block_size = max_window_size if num_windows is not None: num_blocks = num_windows # Defaults min_block_size = 10 if min_block_size is None else int(min_block_size) num_blocks = 10 if num_blocks is None else int(num_blocks) sanitized_windows = None if window_sizes is not None: sanitized_windows = self._sanitize_window_sizes(window_sizes) params_window = sanitized_windows.tolist() else: params_window = None super().__init__() self.parameters = { "min_block_size": int(min_block_size), "max_block_size": int(max_block_size) if max_block_size is not None else None, "num_blocks": int(num_blocks), "window_sizes": params_window, "overlap": bool(overlap), } self.optimization_framework = select_backend(use_optimization) self.results = {} self._validate_parameters()
def _validate_parameters(self) -> None: if self.parameters["min_block_size"] < 4: raise ValueError("min_block_size must be at least 4") if ( self.parameters["max_block_size"] is not None and self.parameters["max_block_size"] <= self.parameters["min_block_size"] ): raise ValueError("max_block_size must be greater than min_block_size") if ( self.parameters["num_blocks"] < 3 and self.parameters["window_sizes"] is None ): raise ValueError("num_blocks must be at least 3") if self.parameters["window_sizes"] is not None: windows = np.asarray(self.parameters["window_sizes"], dtype=int) if len(windows) < 3: raise ValueError("Need at least 3 window sizes")
[docs] def estimate(self, data: Union[np.ndarray, list]) -> Dict[str, Any]: """ Estimate Hurst parameter using R/S analysis. """ data = np.asarray(data) n = len(data) if n < 50: warnings.warn("Data length is very small, results may be unreliable") # Resolve block sizes block_sizes = self._resolve_block_sizes(n) # Select Backend Strategy backend_name = self.optimization_framework compute_func = self._get_compute_function(backend_name) # Execute try: rs_values = compute_func(data, block_sizes) except Exception as e: warnings.warn( f"Backend '{backend_name}' failed: {e}. Falling back to NumPy." ) rs_values = numpy_backend.compute_rs(data, block_sizes) backend_name = "numpy (fallback)" # Post-Processing rs_values = np.asarray(rs_values) block_sizes = np.asarray(block_sizes) valid_mask = (rs_values > 0) & ~np.isnan(rs_values) if np.sum(valid_mask) < 3: raise ValueError("Insufficient valid R/S values for regression") valid_blocks = block_sizes[valid_mask] valid_rs = rs_values[valid_mask] return self._build_results( block_sizes=valid_blocks, rs_values=valid_rs, method="rs_analysis", framework=backend_name, )
def _get_compute_function(self, backend: str): if backend == "jax": if jax_backend and jax_backend.JAX_AVAILABLE: return jax_backend.compute_rs warnings.warn("JAX requested but not available. Falling back to NumPy.") return numpy_backend.compute_rs if backend == "numba": if numba_backend and numba_backend.NUMBA_AVAILABLE: return numba_backend.compute_rs warnings.warn("Numba requested but not available. Falling back to NumPy.") return numpy_backend.compute_rs return numpy_backend.compute_rs
[docs] def _resolve_block_sizes(self, n: int) -> np.ndarray: """Construct block sizes.""" if self.parameters["window_sizes"] is not None: windows = np.array(self.parameters["window_sizes"], dtype=int) else: max_block = self.parameters["max_block_size"] if max_block is None: max_block = max(self.parameters["min_block_size"] + 1, n // 4) # Logspace windows = np.logspace( np.log10(self.parameters["min_block_size"]), np.log10(max_block), self.parameters["num_blocks"], dtype=int, ) windows = np.unique(windows) # Filter valid = windows[ (windows >= self.parameters["min_block_size"]) & (windows <= n // 2) ] if len(valid) < 3: # Fallback if aggressive expectations failed? # Or raise error raise ValueError( f"Need at least 3 valid window sizes (found {len(valid)}). Data length {n} might be too short for current parameters." ) return valid
def _sanitize_window_sizes(self, window_sizes: Sequence[int]) -> np.ndarray: windows = np.array(window_sizes, dtype=int) if np.any(windows <= 0): raise ValueError("Window sizes must be positive integers") return windows def _build_results(self, block_sizes, rs_values, method, framework): log_blocks = np.log(block_sizes) log_rs = np.log(rs_values) slope, intercept, r_squared, p_value, std_err = self._linear_regression( log_blocks, log_rs ) hurst = slope conf_int = self._compute_confidence_interval(hurst, std_err, len(block_sizes)) results = { "hurst_parameter": float(hurst), "r_squared": float(r_squared), "slope": float(slope), "intercept": float(intercept), "p_value": float(p_value), "std_error": float(std_err), "block_sizes": block_sizes.tolist(), "rs_values": rs_values.tolist(), "log_block_sizes": log_blocks.tolist(), "log_rs_values": log_rs.tolist(), "confidence_interval": conf_int, "method": method, "optimization_framework": framework, } self.results = results return results def _linear_regression(self, x, y): slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) return slope, intercept, r_value**2, p_value, std_err def _compute_confidence_interval(self, hurst, std_err, n, confidence=0.95): if not np.isfinite(std_err) or std_err <= 0 or n < 3: return [float("nan"), float("nan")] alpha = 1.0 - confidence dof = max(n - 2, 1) critical = stats.t.ppf(1 - alpha / 2, dof) margin = critical * std_err return [float(hurst - margin), float(hurst + margin)]
[docs] def get_optimization_info(self) -> Dict[str, Any]: return { "current_framework": self.optimization_framework, "jax_available": getattr(jax_backend, "JAX_AVAILABLE", False), "numba_available": getattr(numba_backend, "NUMBA_AVAILABLE", False), "recommended_framework": "jax" if getattr(jax_backend, "JAX_AVAILABLE", False) else ( "numba" if getattr(numba_backend, "NUMBA_AVAILABLE", False) else "numpy" ), }
# API Compatibility
[docs] def get_confidence_intervals(self, confidence_level=0.95): # Stub or implement if needed. # Original just returned dict? # Leaving minimal impl return {}