Source code for lrdbenchmark.analysis.temporal.ghe_estimator

#!/usr/bin/env python3
"""
Unified GHE (Generalized Hurst Exponent) Estimator for Long-Range Dependence Analysis.

This module implements the GHE estimator based on the paper:
"Typical Algorithms for Estimating Hurst Exponent of Time Sequence: A Data Analyst's Perspective"
by HONG-YAN ZHANG, ZHI-QIANG FENG, SI-YU FENG, AND YU ZHOU
IEEE ACCESS 2024, DOI: 10.1109/ACCESS.2024.3512542

The GHE method analyzes the scaling properties of time series data by computing
q-th order moments of increments and estimating the generalized Hurst exponent.
"""

import warnings
from typing import Any, Dict, List, Optional, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
from scipy import stats
from scipy.optimize import curve_fit

# Import optimization frameworks
try:
    import jax
    import jax.numpy as jnp
    from jax import jit, vmap

    JAX_AVAILABLE = True
except ImportError:
    JAX_AVAILABLE = False

try:
    import numba
    from numba import jit as numba_jit
    from numba import prange

    NUMBA_AVAILABLE = True
except ImportError:
    NUMBA_AVAILABLE = False

    # Create a dummy decorator when numba is not available
    def numba_jit(*args, **kwargs):
        def decorator(func):
            return func

        return decorator

    prange = range  # Dummy prange

# Import base estimator
from lrdbenchmark.analysis.base_estimator import BaseEstimator


[docs] class GHEEstimator(BaseEstimator): """ Unified GHE (Generalized Hurst Exponent) Estimator for Long-Range Dependence Analysis. The GHE estimator analyzes the scaling properties of time series data by computing q-th order moments of increments and estimating the generalized Hurst exponent. Features: - Automatic framework selection (JAX, Numba, NumPy) - Multiple q values for multifractal analysis - Robust error handling and fallback mechanisms - Comprehensive result reporting - Visualization capabilities Parameters ---------- q_values : array-like, optional Array of q values for multifractal analysis. Default: [1, 2, 3, 4, 5] tau_min : int, optional Minimum time lag. Default: 2 tau_max : int, optional Maximum time lag. Default: min(N//4, 50) where N is data length tau_step : int, optional Step size for time lags. Default: 1 use_jax : bool, optional Force JAX backend if available. Default: None (auto-select) use_numba : bool, optional Force Numba backend if available. Default: None (auto-select) """
[docs] def __init__(self, **kwargs): """ Initialize the GHE estimator. Parameters ---------- **kwargs : dict Estimator parameters """ super().__init__(**kwargs) # Set default parameters self.parameters.setdefault("q_values", np.array([1, 2, 3, 4, 5])) self.parameters.setdefault("tau_min", 2) self.parameters.setdefault("tau_max", None) self.parameters.setdefault("tau_step", 1) self.parameters.setdefault("use_jax", None) self.parameters.setdefault("use_numba", None) # Initialize results self.results = {} self.name = "GHE" self.category = "Temporal" # Validate parameters self._validate_parameters()
[docs] def _validate_parameters(self) -> None: """Validate estimator parameters.""" q_values = self.parameters["q_values"] if not isinstance(q_values, (list, np.ndarray)): raise ValueError("q_values must be a list or numpy array") q_values = np.array(q_values) if len(q_values) == 0: raise ValueError("q_values cannot be empty") if np.any(q_values <= 0): raise ValueError("All q_values must be positive") if self.parameters["tau_min"] < 1: raise ValueError("tau_min must be at least 1") if self.parameters["tau_step"] < 1: raise ValueError("tau_step must be at least 1")
[docs] def _compute_qth_moments_numpy( self, data: np.ndarray, q_values: np.ndarray, tau_values: np.ndarray ) -> np.ndarray: """ Compute q-th order moments using NumPy. Parameters ---------- data : np.ndarray Time series data q_values : np.ndarray Array of q values tau_values : np.ndarray Array of time lags Returns ------- np.ndarray Array of shape (len(q_values), len(tau_values)) containing q-th moments """ N = len(data) moments = np.zeros((len(q_values), len(tau_values))) for i, q in enumerate(q_values): for j, tau in enumerate(tau_values): if tau >= N: moments[i, j] = np.nan continue # Compute increments increments = data[tau:] - data[:-tau] # Compute q-th moment if q == 1: moments[i, j] = np.mean(np.abs(increments)) else: moments[i, j] = np.mean(np.abs(increments) ** q) return moments
[docs] def _compute_qth_moments_numba( self, data: np.ndarray, q_values: np.ndarray, tau_values: np.ndarray ) -> np.ndarray: """ Compute q-th order moments using Numba. Parameters ---------- data : np.ndarray Time series data q_values : np.ndarray Array of q values tau_values : np.ndarray Array of time lags Returns ------- np.ndarray Array of shape (len(q_values), len(tau_values)) containing q-th moments """ if not NUMBA_AVAILABLE: return self._compute_qth_moments_numpy(data, q_values, tau_values) @numba_jit(nopython=True, parallel=True) def compute_moments_numba(data, q_values, tau_values): N = len(data) n_q = len(q_values) n_tau = len(tau_values) moments = np.zeros((n_q, n_tau)) for i in prange(n_q): q = q_values[i] for j in prange(n_tau): tau = int(tau_values[j]) if tau >= N: moments[i, j] = np.nan continue # Compute increments increments = data[tau:] - data[:-tau] # Compute q-th moment if q == 1.0: moments[i, j] = np.mean(np.abs(increments)) else: moments[i, j] = np.mean(np.abs(increments) ** q) return moments return compute_moments_numba(data, q_values, tau_values)
[docs] def _compute_qth_moments_jax( self, data: np.ndarray, q_values: np.ndarray, tau_values: np.ndarray ) -> np.ndarray: """ Compute q-th order moments using JAX. Parameters ---------- data : np.ndarray Time series data q_values : np.ndarray Array of q values tau_values : np.ndarray Array of time lags Returns ------- np.ndarray Array of shape (len(q_values), len(tau_values)) containing q-th moments """ if not JAX_AVAILABLE: return self._compute_qth_moments_numpy(data, q_values, tau_values) @jit def compute_moments_jax(data, q_values, tau_values): N = len(data) moments = jnp.zeros((len(q_values), len(tau_values))) def compute_for_tau(tau_idx): tau = tau_values[tau_idx] if tau >= N: return jnp.full(len(q_values), jnp.nan) # Compute increments increments = data[tau:] - data[:-tau] # Compute q-th moments for all q values def compute_for_q(q_idx): q = q_values[q_idx] if q == 1.0: return jnp.mean(jnp.abs(increments)) else: return jnp.mean(jnp.abs(increments) ** q) return jnp.array([compute_for_q(i) for i in range(len(q_values))]) return jnp.array([compute_for_tau(i) for i in range(len(tau_values))]).T return np.array(compute_moments_jax(data, q_values, tau_values))
[docs] def _estimate_hurst_exponents( self, tau_values: np.ndarray, moments: np.ndarray, q_values: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Estimate Hurst exponents from q-th moments. Parameters ---------- tau_values : np.ndarray Array of time lags moments : np.ndarray Array of q-th moments q_values : np.ndarray Array of q values Returns ------- Tuple[np.ndarray, np.ndarray, np.ndarray] Hurst exponents, R-squared values, and standard errors """ hurst_exponents = np.zeros(len(q_values)) r_squared = np.zeros(len(q_values)) std_errors = np.zeros(len(q_values)) for i, q in enumerate(q_values): # Get valid data points (non-NaN) valid_mask = ~np.isnan(moments[i, :]) if np.sum(valid_mask) < 2: hurst_exponents[i] = np.nan r_squared[i] = np.nan std_errors[i] = np.nan continue tau_valid = tau_values[valid_mask] moments_valid = moments[i, valid_mask] # Log transform for linear regression log_tau = np.log(tau_valid) log_moments = np.log(moments_valid) # Linear regression: log(K_q(tau)) = q*H(q)*log(tau) + C # So: log_moments = q*H(q)*log_tau + C # Therefore: H(q) = slope / q try: slope, intercept, r_value, p_value, std_err = stats.linregress( log_tau, log_moments ) hurst_exponents[i] = slope / q r_squared[i] = r_value**2 std_errors[i] = std_err / q except: hurst_exponents[i] = np.nan r_squared[i] = np.nan std_errors[i] = np.nan return hurst_exponents, r_squared, std_errors
[docs] def _select_backend(self, data_length: int) -> str: """ Select the best available backend for computation. Parameters ---------- data_length : int Length of the input data Returns ------- str Selected backend ('jax', 'numba', or 'numpy') """ use_jax = self.parameters["use_jax"] use_numba = self.parameters["use_numba"] if use_jax is True and JAX_AVAILABLE: return "jax" elif use_numba is True and NUMBA_AVAILABLE: return "numba" elif use_jax is False and use_numba is False: return "numpy" # Auto-select based on data size and availability if JAX_AVAILABLE and data_length > 1000: return "jax" elif NUMBA_AVAILABLE and data_length > 100: return "numba" else: return "numpy"
[docs] def estimate(self, data: np.ndarray) -> Dict[str, Any]: """ Estimate the generalized Hurst exponent using the GHE method. Parameters ---------- data : np.ndarray Time series data to analyze Returns ------- Dict[str, Any] Dictionary containing estimation results """ # Validate input if not isinstance(data, np.ndarray): data = np.array(data) if len(data) < 10: raise ValueError("Data must have at least 10 points") # Get parameters q_values = np.array(self.parameters["q_values"]) tau_min = self.parameters["tau_min"] tau_max = self.parameters["tau_max"] or min(len(data) // 4, 50) tau_step = self.parameters["tau_step"] # Generate time lags tau_values = np.arange(tau_min, tau_max + 1, tau_step) if len(tau_values) < 2: raise ValueError("Not enough time lags for analysis") # Select backend backend = self._select_backend(len(data)) # Compute q-th moments try: if backend == "jax": moments = self._compute_qth_moments_jax(data, q_values, tau_values) elif backend == "numba": moments = self._compute_qth_moments_numba(data, q_values, tau_values) else: moments = self._compute_qth_moments_numpy(data, q_values, tau_values) except Exception as e: warnings.warn(f"Backend {backend} failed, falling back to NumPy: {e}") moments = self._compute_qth_moments_numpy(data, q_values, tau_values) backend = "numpy" # Estimate Hurst exponents hurst_exponents, r_squared, std_errors = self._estimate_hurst_exponents( tau_values, moments, q_values ) # Compute average Hurst exponent (for q=2, which corresponds to standard Hurst) q2_idx = np.where(np.abs(q_values - 2.0) < 1e-6)[0] if len(q2_idx) > 0: main_hurst = hurst_exponents[q2_idx[0]] else: # If q=2 not available, use the closest q value q2_idx = np.argmin(np.abs(q_values - 2.0)) main_hurst = hurst_exponents[q2_idx] # Store results self.results = { "hurst_parameter": main_hurst, "generalized_hurst_exponents": hurst_exponents, "q_values": q_values, "tau_values": tau_values, "moments": moments, "r_squared": r_squared, "std_errors": std_errors, "backend_used": backend, "success": True, "method": "GHE", "data_length": len(data), "n_tau": len(tau_values), "n_q": len(q_values), } return self.results
[docs] def plot_scaling_behavior(self, figsize: Tuple[int, int] = (12, 8)) -> plt.Figure: """ Plot the scaling behavior of q-th moments. Parameters ---------- figsize : Tuple[int, int], optional Figure size. Default: (12, 8) Returns ------- matplotlib.figure.Figure The created figure """ if not self.results or not self.results.get("success", False): raise ValueError("No successful estimation results available") fig, axes = plt.subplots(2, 2, figsize=figsize) fig.suptitle("GHE Scaling Behavior Analysis", fontsize=16) q_values = self.results["q_values"] tau_values = self.results["tau_values"] moments = self.results["moments"] hurst_exponents = self.results["generalized_hurst_exponents"] # Plot 1: Log-log plot of moments vs tau ax1 = axes[0, 0] for i, q in enumerate(q_values): valid_mask = ~np.isnan(moments[i, :]) if np.sum(valid_mask) > 1: ax1.loglog( tau_values[valid_mask], moments[i, valid_mask], "o-", label=f"q={q:.1f}", alpha=0.7, ) ax1.set_xlabel("Time Lag τ") ax1.set_ylabel("K_q(τ)") ax1.set_title("Scaling of q-th Moments") ax1.legend() ax1.grid(True, alpha=0.3) # Plot 2: Hurst exponents vs q ax2 = axes[0, 1] valid_mask = ~np.isnan(hurst_exponents) ax2.plot( q_values[valid_mask], hurst_exponents[valid_mask], "bo-", linewidth=2, markersize=6, ) ax2.axhline( y=0.5, color="r", linestyle="--", alpha=0.7, label="H=0.5 (no correlation)" ) ax2.set_xlabel("q") ax2.set_ylabel("H(q)") ax2.set_title("Generalized Hurst Exponent") ax2.legend() ax2.grid(True, alpha=0.3) # Plot 3: R-squared values ax3 = axes[1, 0] r_squared = self.results["r_squared"] valid_mask = ~np.isnan(r_squared) ax3.bar(q_values[valid_mask], r_squared[valid_mask], alpha=0.7) ax3.set_xlabel("q") ax3.set_ylabel("R²") ax3.set_title("Goodness of Fit (R²)") ax3.grid(True, alpha=0.3) # Plot 4: Standard errors ax4 = axes[1, 1] std_errors = self.results["std_errors"] valid_mask = ~np.isnan(std_errors) ax4.bar(q_values[valid_mask], std_errors[valid_mask], alpha=0.7) ax4.set_xlabel("q") ax4.set_ylabel("Standard Error") ax4.set_title("Estimation Uncertainty") ax4.grid(True, alpha=0.3) plt.tight_layout() return fig
[docs] def get_multifractal_spectrum(self) -> Dict[str, np.ndarray]: """ Compute the multifractal spectrum from generalized Hurst exponents. Returns ------- Dict[str, np.ndarray] Dictionary containing multifractal spectrum parameters """ if not self.results or not self.results.get("success", False): raise ValueError("No successful estimation results available") q_values = self.results["q_values"] hurst_exponents = self.results["generalized_hurst_exponents"] # Remove NaN values valid_mask = ~np.isnan(hurst_exponents) q_valid = q_values[valid_mask] h_valid = hurst_exponents[valid_mask] if len(q_valid) < 3: return {"alpha": np.array([]), "f_alpha": np.array([])} # Compute multifractal spectrum # α = H(q) + q * H'(q) # f(α) = q * α - τ(q) # where τ(q) = q * H(q) - 1 # Compute derivatives using finite differences if len(q_valid) > 1: dH_dq = np.gradient(h_valid, q_valid) alpha = h_valid + q_valid * dH_dq tau_q = q_valid * h_valid - 1 f_alpha = q_valid * alpha - tau_q else: alpha = h_valid f_alpha = np.zeros_like(alpha) return { "alpha": alpha, "f_alpha": f_alpha, "q_values": q_valid, "hurst_exponents": h_valid, }
# Example usage and testing if __name__ == "__main__": # Generate test data np.random.seed(42) N = 1000 # Generate fractional Brownian motion H_true = 0.7 t = np.linspace(0, 1, N) fbm = np.cumsum(np.random.normal(0, 1, N) * (t[1] - t[0]) ** H_true) # Test GHE estimator ghe = GHEEstimator(q_values=[1, 2, 3, 4, 5], tau_min=2, tau_max=50) results = ghe.estimate(fbm) print("GHE Estimation Results:") print(f"Main Hurst Parameter (q=2): {results['hurst_parameter']:.4f}") print(f"True Hurst Parameter: {H_true:.4f}") print(f"Backend Used: {results['backend_used']}") print(f"Generalized Hurst Exponents: {results['generalized_hurst_exponents']}") # Plot results fig = ghe.plot_scaling_behavior() plt.show()