#!/usr/bin/env python3
"""
Comprehensive Benchmark Module for LRDBench
A unified interface for running all types of benchmarks and analyses
"""
import json
import os
import sys
import time
import warnings
from datetime import datetime
from itertools import combinations
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy import stats
from ...analytics.error_analyzer import ErrorAnalyzer
from ...analytics.provenance import ProvenanceTracker, create_provenance_bundle
from ...analytics.stratified_report_generator import StratifiedReportGenerator
from ...models.data_models.arfima_model import ARFIMAModel
# Import data models
from ...models.data_models.fbm_model import FractionalBrownianMotion as FBMModel
from ...models.data_models.fgn_model import FractionalGaussianNoise as FGNModel
from ...models.data_models.mrw_model import MultifractalRandomWalk as MRWModel
from ...random_manager import get_random_manager
from ...robustness.adaptive_preprocessor import AdaptiveDataPreprocessor
# Import advanced metrics
from ..advanced_metrics import (
AdvancedPerformanceProfiler,
ConvergenceAnalyzer,
MeanSignedErrorAnalyzer,
RobustnessStressTester,
ScalingInfluenceAnalyzer,
calculate_convergence_rate,
calculate_mean_signed_error,
profile_estimator_performance,
)
# Import new diagnostic and reporting systems
from ..diagnostics import (
PowerLawDiagnostics,
ScaleWindowSensitivityAnalyser,
run_comprehensive_diagnostics,
)
from ..multifractal.mfdfa_estimator import MFDFAEstimator
from ..multifractal.wavelet_leaders_estimator import MultifractalWaveletLeadersEstimator
from ..spectral.gph_estimator import GPHEstimator
from ..spectral.periodogram_estimator import PeriodogramEstimator
from ..spectral.whittle_estimator import WhittleEstimator
from ..temporal.dfa_estimator import DFAEstimator
from ..temporal.dma_estimator import DMAEstimator
from ..temporal.higuchi_estimator import HiguchiEstimator
# Import estimators
from ..temporal.rs_estimator import RSEstimator
from ..uncertainty.quantifier import UncertaintyQuantifier
from ..wavelet.cwt_estimator import CWTEstimator
from ..wavelet.log_variance_estimator import WaveletLogVarianceEstimator
from ..wavelet.variance_estimator import WaveletVarianceEstimator
from ..wavelet.whittle_estimator import WaveletWhittleEstimator
# Note: Neural network estimators now use pretrained models instead of unified estimators
# Import contamination models
# Simple contamination classes for benchmarking
class AdditiveGaussianNoise:
def __init__(self, rng: np.random.Generator, noise_level=0.1, std=0.1):
self.noise_level = noise_level
self.std = std
self.rng = rng
def apply(self, data):
noise = self.rng.normal(0, self.std * self.noise_level, len(data))
return data + noise
class MultiplicativeNoise:
def __init__(self, rng: np.random.Generator, noise_level=0.05, std=0.05):
self.noise_level = noise_level
self.std = std
self.rng = rng
def apply(self, data):
noise = self.rng.normal(1, self.std * self.noise_level, len(data))
return data * noise
class OutlierContamination:
def __init__(
self,
rng: np.random.Generator,
outlier_fraction=0.1,
outlier_magnitude=3.0,
):
self.outlier_fraction = outlier_fraction
self.outlier_magnitude = outlier_magnitude
self.rng = rng
def apply(self, data):
contaminated = data.copy()
n_outliers = int(len(data) * self.outlier_fraction)
outlier_indices = self.rng.choice(len(data), n_outliers, replace=False)
contaminated[outlier_indices] += self.rng.normal(
0, self.outlier_magnitude, n_outliers
)
return contaminated
class TrendContamination:
def __init__(
self,
rng: Optional[np.random.Generator] = None,
trend_strength=0.1,
trend_type="linear",
):
self.trend_strength = trend_strength
self.trend_type = trend_type
self.rng = rng
def apply(self, data):
n = len(data)
if self.trend_type == "linear":
trend = np.linspace(0, self.trend_strength, n)
else:
trend = np.zeros(n)
return data + trend
class SeasonalContamination:
def __init__(
self,
rng: Optional[np.random.Generator] = None,
seasonal_strength=0.1,
period=None,
):
self.seasonal_strength = seasonal_strength
self.period = period
self.rng = rng
def apply(self, data):
n = len(data)
if self.period is None:
self.period = n // 4
t = np.arange(n)
seasonal = self.seasonal_strength * np.sin(2 * np.pi * t / self.period)
return data + seasonal
class MissingDataContamination:
def __init__(
self,
rng: np.random.Generator,
missing_fraction=0.1,
missing_pattern="random",
):
self.missing_fraction = missing_fraction
self.missing_pattern = missing_pattern
self.rng = rng
def apply(self, data):
contaminated = data.copy()
n_missing = int(len(data) * self.missing_fraction)
if self.missing_pattern == "random":
missing_indices = self.rng.choice(len(data), n_missing, replace=False)
contaminated[missing_indices] = np.nan
return contaminated
# Import pre-trained ML models for production use
try:
# Import pre-trained neural models
from ...models.pretrained_models.cnn_pretrained import CNNPretrainedModel
from ...models.pretrained_models.gru_pretrained import GRUPretrainedModel
from ...models.pretrained_models.lstm_pretrained import LSTMPretrainedModel
from ...models.pretrained_models.ml_pretrained import (
GradientBoostingPretrainedModel,
RandomForestPretrainedModel,
SVREstimatorPretrainedModel,
)
from ...models.pretrained_models.transformer_pretrained import (
TransformerPretrainedModel,
)
PRETRAINED_MODELS_AVAILABLE = True
except ImportError:
# Pretrained models not available
RandomForestPretrainedModel = None
SVREstimatorPretrainedModel = None
GradientBoostingPretrainedModel = None
CNNPretrainedModel = None
TransformerPretrainedModel = None
LSTMPretrainedModel = None
GRUPretrainedModel = None
PRETRAINED_MODELS_AVAILABLE = False
[docs]
class ComprehensiveBenchmark:
"""
Comprehensive benchmark class for testing all estimators and data models.
"""
[docs]
def __init__(self, output_dir: Optional[str] = None, runtime_profile: str = "auto"):
"""
Initialize the benchmark system.
Parameters
----------
output_dir : str, optional
Directory to save benchmark results
runtime_profile : str, optional
Runtime profile to control computational intensity. Options:
- "auto": determine automatically (default)
- "quick": minimise expensive diagnostics (useful for tests)
- "full": enable all diagnostics and resampling routines
"""
self.output_dir = Path(output_dir) if output_dir else Path("benchmark_results")
self.output_dir.mkdir(exist_ok=True)
self.protocol_config_path = Path("config/benchmark_protocol.yaml")
self.protocol_config = self._load_protocol_config(self.protocol_config_path)
self.runtime_profile = self._resolve_runtime_profile(runtime_profile)
data_generation_cfg = self.protocol_config.get("data_generation", {})
self.random_manager = get_random_manager()
self.random_manager.initialise(data_generation_cfg.get("random_seed"))
preprocessing_cfg = self.protocol_config.get("preprocessing", {})
winsor_limits = preprocessing_cfg.get("winsorize_limits", (0.01, 0.99))
if isinstance(winsor_limits, list):
winsor_limits = tuple(winsor_limits)
self.data_preprocessor = AdaptiveDataPreprocessor(
outlier_threshold=preprocessing_cfg.get("outlier_threshold", 3.0),
winsorize_limits=winsor_limits,
enable_winsorize=preprocessing_cfg.get("apply_winsorize", True),
enable_detrend=preprocessing_cfg.get("detrend", True),
)
# Initialize all estimator categories
self.all_estimators = self._initialize_all_estimators()
benchmark_cfg = self.protocol_config.get("benchmark", {})
diagnostics_cfg = self.protocol_config.get("diagnostics", {})
advanced_metrics_cfg = benchmark_cfg.get("advanced_metrics", {})
uncertainty_cfg = benchmark_cfg.get("uncertainty", {})
robustness_cfg = benchmark_cfg.get("robustness", {})
self.enable_advanced_metrics = bool(advanced_metrics_cfg.get("enabled", True))
self.enable_uncertainty = bool(uncertainty_cfg.get("enabled", True))
self.enable_robustness = bool(robustness_cfg.get("enabled", True))
log_log_cfg = diagnostics_cfg.get("log_log_checks", {})
self.enable_diagnostics = bool(log_log_cfg.get("enabled", True))
if self.runtime_profile == "quick":
self.enable_advanced_metrics = False
self.enable_uncertainty = False
self.enable_robustness = False
self.enable_diagnostics = False
# Initialize advanced metrics analyzers
if self.enable_advanced_metrics:
self.convergence_analyzer = ConvergenceAnalyzer()
self.mse_analyzer = MeanSignedErrorAnalyzer()
else:
self.convergence_analyzer = None
self.mse_analyzer = None
profiler_seed = self.random_manager.spawn_seed("benchmark:advanced_profiler")
self.advanced_profiler = AdvancedPerformanceProfiler(random_state=profiler_seed)
if self.enable_uncertainty:
uncertainty_seed = self.random_manager.spawn_seed(
"benchmark:uncertainty", seed=uncertainty_cfg.get("random_state")
)
self.uncertainty_quantifier = UncertaintyQuantifier(
n_block_bootstrap=uncertainty_cfg.get("n_block_bootstrap", 64),
block_size=uncertainty_cfg.get("block_size"),
n_wavelet_bootstrap=uncertainty_cfg.get("n_wavelet_bootstrap", 64),
wavelet=uncertainty_cfg.get("wavelet", "db4"),
max_wavelet_level=uncertainty_cfg.get("max_wavelet_level"),
n_parametric=uncertainty_cfg.get("n_parametric", 48),
confidence_level=benchmark_cfg.get("confidence_level", 0.95),
random_state=uncertainty_seed,
)
else:
self.uncertainty_quantifier = None
self.error_analyzer = ErrorAnalyzer()
# Initialize robustness stress tester
if self.enable_robustness:
robustness_config = robustness_cfg.get("config", {})
robustness_seed = self.random_manager.spawn_seed(
"benchmark:robustness", seed=robustness_cfg.get("random_state")
)
self.robustness_tester = RobustnessStressTester(
random_state=robustness_seed, config=robustness_config
)
else:
self.robustness_tester = None
# Initialize new diagnostic and reporting systems
if self.enable_diagnostics:
self.power_law_diagnostics = PowerLawDiagnostics(
min_r_squared=log_log_cfg.get("min_r_squared", 0.5),
min_points=log_log_cfg.get("min_points", 6),
)
sensitivity_cfg = diagnostics_cfg.get("scale_window_sensitivity", {})
self.scale_sensitivity_analyser = ScaleWindowSensitivityAnalyser(
perturbation_levels=sensitivity_cfg.get(
"perturbation_levels", [0.9, 0.95, 1.05, 1.1]
),
leave_one_out=sensitivity_cfg.get("leave_one_out", True),
)
self.scaling_influence_analyser = ScalingInfluenceAnalyzer()
else:
self.power_law_diagnostics = None
self.scale_sensitivity_analyser = None
self.scaling_influence_analyser = None
# Initialize provenance tracker
self.provenance_tracker = ProvenanceTracker(self.protocol_config)
# Initialize stratified report generator
self.stratified_report_generator = StratifiedReportGenerator(
self.protocol_config
)
# Initialize data models
self.data_models = self._initialize_data_models()
# Initialize contamination models
self.contamination_models = self._initialize_contamination_models()
# Results storage
self.results = {}
self.performance_metrics = {}
[docs]
def _resolve_runtime_profile(self, runtime_profile: Optional[str]) -> str:
"""
Determine the runtime profile controlling benchmark intensity.
"""
env_override = os.getenv("LRDBENCHMARK_RUNTIME_PROFILE")
if env_override:
runtime_profile = env_override
if not runtime_profile:
runtime_profile = "auto"
runtime_profile = runtime_profile.lower()
if runtime_profile == "auto":
quick_flag = os.getenv("LRDBENCHMARK_QUICK_MODE")
if quick_flag and quick_flag.strip().lower() in {"1", "true", "yes", "on"}:
return "quick"
if os.getenv("PYTEST_CURRENT_TEST"):
return "quick"
if "pytest" in sys.modules:
return "quick"
return "full"
if runtime_profile in {"quick", "full"}:
return runtime_profile
warnings.warn(
f"Unknown runtime_profile '{runtime_profile}'. Falling back to 'full'."
)
return "full"
[docs]
def _load_protocol_config(self, path: Path) -> Dict[str, Any]:
"""Load benchmark protocol configuration from YAML/JSON file."""
default_config: Dict[str, Any] = {
"version": "2.0",
"preprocessing": {
"outlier_threshold": 3.0,
"winsorize_limits": [0.01, 0.99],
"detrend": True,
"apply_winsorize": True,
},
"scale_selection": {
"spectral": {
"min_freq_ratio": 0.01,
"max_freq_ratio": 0.1,
"low_freq_trim": 0.0,
},
"temporal": {
"min_window": 16,
"max_window": 256,
"window_density": "log",
},
"wavelet": {"min_level": 1, "max_level": 8, "wavelet": "db4"},
},
"data_models": {},
"estimator_overrides": {},
"benchmark": {
"confidence_level": 0.95,
"uncertainty": {
"n_block_bootstrap": 64,
"n_wavelet_bootstrap": 64,
"n_parametric": 48,
},
"robustness": {
"enabled": True,
"config": {},
},
},
}
try:
with open(path, "r") as f:
# Try YAML first, then JSON
if path.suffix.lower() in [".yaml", ".yml"]:
try:
import yaml
config_data = yaml.safe_load(f)
except ImportError:
warnings.warn(
"PyYAML not installed. Install with 'pip install pyyaml' to use YAML configs. "
"Falling back to JSON parsing."
)
f.seek(0)
config_data = json.load(f)
else:
config_data = json.load(f)
# Merge with defaults, handling both new unified schema and legacy format
merged = self._deep_merge_dicts(default_config, config_data)
# Map new unified schema to legacy format for backward compatibility
if "data_generation" in merged:
# Extract data models from new schema
if "models" in merged["data_generation"]:
merged["data_models"] = merged["data_generation"]["models"]
if "estimators" in merged and "overrides" in merged["estimators"]:
merged["estimator_overrides"] = merged["estimators"]["overrides"]
# Ensure robustness config exists
if "benchmark" in merged:
if "robustness" not in merged["benchmark"]:
merged["benchmark"]["robustness"] = {"enabled": True, "config": {}}
return merged
except FileNotFoundError:
warnings.warn(f"Protocol configuration '{path}' not found. Using defaults.")
except (json.JSONDecodeError, ValueError) as exc:
warnings.warn(
f"Failed to parse protocol configuration '{path}': {exc}. Using defaults."
)
return default_config
[docs]
def _deep_merge_dicts(
self, base: Dict[str, Any], updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Recursively merge dictionaries without mutating inputs."""
merged = dict(base)
for key, value in updates.items():
if (
key in merged
and isinstance(merged[key], dict)
and isinstance(value, dict)
):
merged[key] = self._deep_merge_dicts(merged[key], value)
else:
merged[key] = value
return merged
[docs]
def _initialize_all_estimators(self) -> Dict[str, Dict[str, Any]]:
"""Initialize all available estimators organized by category."""
estimators = {
"classical": {
# Temporal estimators
"R/S": RSEstimator(),
"DFA": DFAEstimator(),
"DMA": DMAEstimator(),
"Higuchi": HiguchiEstimator(),
# Spectral estimators
"GPH": GPHEstimator(),
"Whittle": WhittleEstimator(),
"Periodogram": PeriodogramEstimator(),
# Wavelet estimators
"CWT": CWTEstimator(),
"WaveletVar": WaveletVarianceEstimator(),
"WaveletLogVar": WaveletLogVarianceEstimator(),
"WaveletWhittle": WaveletWhittleEstimator(),
# Multifractal estimators
"MFDFA": MFDFAEstimator(),
"WaveletLeaders": MultifractalWaveletLeadersEstimator(),
},
"ML": {
"RandomForest": RandomForestPretrainedModel()
if RandomForestPretrainedModel is not None
else None,
"GradientBoosting": GradientBoostingPretrainedModel()
if GradientBoostingPretrainedModel is not None
else None,
"SVR": SVREstimatorPretrainedModel()
if SVREstimatorPretrainedModel is not None
else None,
},
"neural": {
"CNN": CNNPretrainedModel(input_length=500)
if CNNPretrainedModel is not None
else None,
"LSTM": LSTMPretrainedModel(input_length=500)
if LSTMPretrainedModel is not None
else None,
"GRU": GRUPretrainedModel(input_length=500)
if GRUPretrainedModel is not None
else None,
"Transformer": TransformerPretrainedModel(input_length=500)
if TransformerPretrainedModel is not None
else None,
},
}
overrides = self.protocol_config.get("estimator_overrides", {})
if overrides:
estimators = self._apply_estimator_overrides(estimators, overrides)
return estimators
[docs]
def _apply_estimator_overrides(
self,
estimators: Dict[str, Dict[str, Any]],
overrides: Dict[str, Dict[str, Any]],
) -> Dict[str, Dict[str, Any]]:
"""Apply protocol-defined parameter overrides to initialized estimators."""
for estimator_map in estimators.values():
for name, estimator in estimator_map.items():
if estimator is None:
continue
override = overrides.get(name)
if override:
setter = getattr(estimator, "set_params", None)
if callable(setter):
try:
setter(**override)
except Exception as exc:
warnings.warn(
f"Failed to apply override for estimator '{name}': {exc}"
)
return estimators
[docs]
def _initialize_data_models(self) -> Dict[str, Any]:
"""Initialize all available data models."""
data_models = {
"fBm": FBMModel,
"fGn": FGNModel,
"ARFIMAModel": ARFIMAModel,
"MRW": MRWModel,
}
return data_models
[docs]
def _initialize_contamination_models(self) -> Dict[str, Any]:
"""Initialize all available contamination models."""
contamination_models = {
"additive_gaussian": AdditiveGaussianNoise,
"multiplicative_noise": MultiplicativeNoise,
"outliers": OutlierContamination,
"trend": TrendContamination,
"seasonal": SeasonalContamination,
"missing_data": MissingDataContamination,
}
return contamination_models
def _infer_estimator_family(self, estimator_name: str) -> Optional[str]:
"""Return the estimator family/category for a given estimator name."""
for family, estimator_map in self.all_estimators.items():
if estimator_name in estimator_map:
return family
return None
[docs]
def get_estimators_by_type(
self, benchmark_type: str = "comprehensive", data_length: int = 1000
) -> Dict[str, Any]:
"""
Get estimators based on the specified benchmark type.
Parameters
----------
benchmark_type : str
Type of benchmark to run:
- 'comprehensive': All estimators (default)
- 'classical': Only classical statistical estimators
- 'ML': Only machine learning estimators (non-neural)
- 'neural': Only neural network estimators
data_length : int
Length of data to be tested (used for adaptive wavelet estimators)
Returns
-------
dict
Dictionary of estimators for the specified type
"""
if benchmark_type == "comprehensive":
# Combine all estimators
all_est = {}
for category in self.all_estimators.values():
all_est.update(category)
# Filter out None estimators
all_est = {
name: estimator
for name, estimator in all_est.items()
if estimator is not None
}
return all_est
elif benchmark_type in self.all_estimators:
estimators = self.all_estimators[benchmark_type].copy()
# Filter out None estimators
estimators = {
name: estimator
for name, estimator in estimators.items()
if estimator is not None
}
return estimators
else:
raise ValueError(
f"Unknown benchmark type: {benchmark_type}. "
f"Available types: {list(self.all_estimators.keys()) + ['comprehensive']}"
)
[docs]
def generate_test_data(
self, model_name: str, data_length: int = 1000, **kwargs
) -> Tuple[np.ndarray, Dict[str, Any]]:
"""
Generate test data using specified model.
Parameters
----------
model_name : str
Name of the data model to use
data_length : int
Length of data to generate
**kwargs : dict
Additional parameters for the data model
Returns
-------
tuple
(data, parameters)
"""
if model_name not in self.data_models:
raise ValueError(f"Unknown data model: {model_name}")
model_class = self.data_models[model_name]
protocol_defaults = {
key: value
for key, value in self.protocol_config.get("data_models", {})
.get(model_name, {})
.items()
if key != "enabled"
}
# Set default parameters if not provided
if model_name == "fBm":
base_params = {"H": 0.7, "sigma": 1.0}
params = {**base_params, **protocol_defaults, **kwargs}
elif model_name == "fGn":
base_params = {"H": 0.7, "sigma": 1.0}
params = {**base_params, **protocol_defaults, **kwargs}
elif model_name == "ARFIMAModel":
base_params = {"d": 0.3, "ar_params": [0.5], "ma_params": [0.2]}
params = {**base_params, **protocol_defaults, **kwargs}
elif model_name == "MRW":
base_params = {"H": 0.7, "lambda_param": 0.5, "sigma": 1.0}
params = {**base_params, **protocol_defaults, **kwargs}
else:
params = {**protocol_defaults, **kwargs}
seed_override = params.pop("random_state", None)
model = model_class(**params)
stream_name = f"data_model:{model_name}"
data_seed = self.random_manager.spawn_seed(stream_name, seed=seed_override)
data_rng = self.random_manager.spawn_generator(stream_name, seed=data_seed)
data = model.generate(length=data_length, rng=data_rng)
params["model_name"] = model_name
if seed_override is not None:
params["requested_random_seed"] = seed_override
params["random_seed"] = data_seed
return data, params
[docs]
def apply_contamination(
self,
data: np.ndarray,
contamination_type: str,
contamination_level: float = 0.1,
**kwargs,
) -> Tuple[np.ndarray, Dict[str, Any]]:
"""
Apply contamination to the data.
Parameters
----------
data : np.ndarray
Original clean data
contamination_type : str
Type of contamination to apply
contamination_level : float
Level/intensity of contamination (0.0 to 1.0)
**kwargs : dict
Additional parameters for specific contamination types
Returns
-------
tuple
(contaminated_data, contamination_info)
"""
if contamination_type not in self.contamination_models:
raise ValueError(
f"Unknown contamination type: {contamination_type}. "
f"Available types: {list(self.contamination_models.keys())}"
)
contamination_class = self.contamination_models[contamination_type]
# Set default parameters based on contamination type
if contamination_type == "additive_gaussian":
default_params = {"noise_level": contamination_level, "std": 0.1}
elif contamination_type == "multiplicative_noise":
default_params = {"noise_level": contamination_level, "std": 0.05}
elif contamination_type == "outliers":
default_params = {
"outlier_fraction": contamination_level,
"outlier_magnitude": 3.0,
}
elif contamination_type == "trend":
default_params = {
"trend_strength": contamination_level,
"trend_type": "linear",
}
elif contamination_type == "seasonal":
default_params = {
"seasonal_strength": contamination_level,
"period": len(data) // 4,
}
elif contamination_type == "missing_data":
default_params = {
"missing_fraction": contamination_level,
"missing_pattern": "random",
}
else:
default_params = {}
# Update with provided kwargs
contamination_params = {**default_params, **kwargs}
# Apply contamination
stream_name = f"contamination:{contamination_type}"
seed_override = contamination_params.get("random_state")
contamination_seed = self.random_manager.spawn_seed(
stream_name, seed=seed_override
)
contamination_rng = self.random_manager.spawn_generator(
stream_name, seed=contamination_seed
)
model_kwargs = dict(contamination_params)
model_kwargs.pop("random_state", None)
model_kwargs.setdefault("rng", contamination_rng)
contamination_model = contamination_class(**model_kwargs)
contaminated_data = contamination_model.apply(data)
contamination_info = {
"type": contamination_type,
"level": contamination_level,
"parameters": contamination_params,
"original_data_shape": data.shape,
"contaminated_data_shape": contaminated_data.shape,
"random_seed": contamination_seed,
}
return contaminated_data, contamination_info
[docs]
def run_single_estimator_test(
self, estimator_name: str, data: np.ndarray, true_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Run a single estimator test.
Parameters
----------
estimator_name : str
Name of the estimator to test
data : np.ndarray
Test data
true_params : dict
True parameters of the data
Returns
-------
dict
Test results
"""
# Get the estimator from the comprehensive list
all_estimators = self.get_estimators_by_type("comprehensive")
estimator = all_estimators[estimator_name]
estimator_family = self._infer_estimator_family(estimator_name)
# Measure execution time
start_time = time.time()
processed_data = np.asarray(data, dtype=np.float64)
preprocessing_metadata: Dict[str, Any] = {}
try:
processed_data, preprocessing_metadata = self.data_preprocessor.preprocess(
processed_data
)
preprocessing_metadata["config"] = self.protocol_config.get(
"preprocessing", {}
)
except Exception as exc:
warnings.warn(f"Preprocessing failed for estimator {estimator_name}: {exc}")
preprocessing_metadata = {
"status": "failed",
"error": str(exc),
"config": self.protocol_config.get("preprocessing", {}),
}
try:
result = estimator.estimate(processed_data)
execution_time = time.time() - start_time
# Extract key metrics
hurst_est = result.get("hurst_parameter", None)
if hurst_est is not None:
# Try different parameter names for different data models
true_hurst = None
if "H" in true_params:
true_hurst = true_params["H"]
elif "d" in true_params:
# For ARFIMA models, use d parameter directly
true_hurst = true_params["d"]
if true_hurst is not None:
error = abs(hurst_est - true_hurst)
else:
# If we can't calculate error, still mark as successful but with no error metric
error = None
# Update true_hurst in the result for consistency
if true_hurst is not None:
true_params["H"] = true_hurst
else:
error = None
# Advanced metrics analysis
advanced_metrics: Dict[str, Any] = {}
if (
self.enable_advanced_metrics
and self.convergence_analyzer is not None
and true_params.get("H") is not None
):
try:
# Convergence analysis
convergence_results = (
self.convergence_analyzer.analyze_convergence_rate(
estimator, processed_data, true_params.get("H")
)
)
advanced_metrics["convergence_rate"] = convergence_results.get(
"convergence_rate"
)
advanced_metrics["convergence_achieved"] = convergence_results.get(
"convergence_achieved"
)
advanced_metrics["stability_metric"] = convergence_results.get(
"stability_metric"
)
# Mean signed error analysis (Monte Carlo)
mse_results = self._calculate_monte_carlo_mse(
estimator, processed_data, true_params.get("H")
)
advanced_metrics["mean_signed_error"] = mse_results.get(
"mean_signed_error"
)
advanced_metrics["bias_percentage"] = mse_results.get(
"bias_percentage"
)
advanced_metrics["significant_bias"] = mse_results.get(
"significant_bias"
)
except Exception as e:
warnings.warn(f"Advanced metrics calculation failed: {e}")
advanced_metrics = {
"convergence_rate": None,
"convergence_achieved": None,
"stability_metric": None,
"mean_signed_error": None,
"bias_percentage": None,
"significant_bias": None,
}
elif not self.enable_advanced_metrics:
advanced_metrics = {
"status": "skipped",
"reason": (
"Advanced metrics disabled by runtime profile."
if self.runtime_profile == "quick"
else "Advanced metrics disabled by benchmark configuration."
),
}
# Uncertainty quantification
if hurst_est is not None:
if self.enable_uncertainty and self.uncertainty_quantifier is not None:
try:
uncertainty_details = (
self.uncertainty_quantifier.compute_intervals(
estimator=estimator,
data=processed_data,
base_result=result,
true_value=true_params.get("H"),
data_model_name=true_params.get("model_name"),
data_model_params=true_params,
data_model_registry=self.data_models,
)
)
except Exception as exc:
warnings.warn(f"Uncertainty quantification failed: {exc}")
uncertainty_details = {
"status": "failed",
"error": str(exc),
}
else:
reason = (
"Uncertainty quantification disabled by runtime profile."
if self.runtime_profile == "quick"
else "Uncertainty quantification disabled in benchmark configuration."
)
uncertainty_details = {
"status": "disabled",
"reason": reason,
}
else:
uncertainty_details = {
"status": "unavailable",
"reason": "No hurst estimate available for uncertainty quantification.",
}
if not self.enable_uncertainty:
result["confidence_interval"] = None
if isinstance(uncertainty_details, dict):
result["uncertainty"] = uncertainty_details
primary_interval = uncertainty_details.get("primary_interval")
if (
primary_interval
and isinstance(primary_interval, dict)
and primary_interval.get("confidence_interval") is not None
):
result.setdefault(
"confidence_interval",
tuple(primary_interval["confidence_interval"]),
)
# Robustness stress panels: before/after H comparisons
robustness_panel = None
if (
self.enable_robustness
and self.robustness_tester is not None
and hurst_est is not None
):
try:
# Use original data (before preprocessing) for robustness tests
# The stress tester will apply its own transformations
robustness_panel = self.robustness_tester.run_panels(
estimator=estimator,
data=data, # Original data before preprocessing
baseline_result=result, # Baseline result with H estimate
true_value=true_params.get("H"),
)
except Exception as exc:
warnings.warn(
f"Robustness stress panel failed for {estimator_name}: {exc}"
)
robustness_panel = {
"status": "failed",
"error": str(exc),
}
elif not self.enable_robustness:
robustness_panel = {
"status": "skipped",
"reason": (
"Robustness analysis disabled by runtime profile."
if self.runtime_profile == "quick"
else "Robustness analysis disabled by benchmark configuration."
),
}
# Run comprehensive diagnostics on power-law fit
diagnostics_results: Dict[str, Any] = {}
diagnostics_cfg = self.protocol_config.get("diagnostics", {})
if (
self.enable_diagnostics
and self.power_law_diagnostics is not None
and hurst_est is not None
):
try:
# Try to extract scale data from result
scales, statistics = self._extract_scale_data(result, estimator)
if scales is not None and statistics is not None:
# Run power-law diagnostics
diag_results = self.power_law_diagnostics.diagnose(
scales,
statistics,
slope=result.get("slope"),
intercept=result.get("intercept"),
)
diagnostics_results["power_law"] = diag_results
# Run scaling influence analysis if enabled
if (
diagnostics_cfg.get("log_log_checks", {}).get(
"influence_analysis", True
)
and self.scaling_influence_analyser is not None
):
try:
influence_results = (
self.scaling_influence_analyser.analyse(
scales, statistics, min_points=6
)
)
diagnostics_results[
"scaling_influence"
] = influence_results
except Exception as exc_inf:
diagnostics_results["scaling_influence"] = {
"status": "failed",
"error": str(exc_inf),
}
# Run scale window sensitivity if enabled
if (
diagnostics_cfg.get("scale_window_sensitivity", {}).get(
"enabled", True
)
and self.scale_sensitivity_analyser is not None
):
try:
sensitivity_results = (
self.scale_sensitivity_analyser.analyse(
estimator, processed_data, result, scales
)
)
diagnostics_results[
"scale_sensitivity"
] = sensitivity_results
except Exception as exc_sens:
diagnostics_results["scale_sensitivity"] = {
"status": "failed",
"error": str(exc_sens),
}
except Exception as exc:
warnings.warn(
f"Diagnostic analysis failed for {estimator_name}: {exc}"
)
diagnostics_results = {"status": "failed", "error": str(exc)}
elif not self.enable_diagnostics:
diagnostics_results = {
"status": "skipped",
"reason": (
"Diagnostics disabled by runtime profile."
if self.runtime_profile == "quick"
else "Diagnostics disabled by benchmark configuration."
),
}
# Build provenance for this result row
result_provenance = self._build_result_row_provenance(
{
"estimator": estimator_name,
"timestamp": datetime.now().isoformat(),
"estimated_hurst": hurst_est,
"true_hurst": true_params.get("H", None),
"error": error,
"preprocessing": preprocessing_metadata,
"robustness_panel": robustness_panel,
"uncertainty": uncertainty_details,
},
true_params,
)
test_result = {
"estimator": estimator_name,
"success": True,
"execution_time": execution_time,
"estimated_hurst": hurst_est,
"true_hurst": true_params.get("H", None),
"error": error,
"r_squared": result.get("r_squared", None),
"p_value": result.get("p_value", None),
"intercept": result.get("intercept", None),
"slope": result.get("slope", None),
"std_error": result.get("std_error", None),
"confidence_interval": result.get("confidence_interval"),
"advanced_metrics": advanced_metrics,
"uncertainty": uncertainty_details,
"robustness_panel": robustness_panel,
"diagnostics": diagnostics_results,
"preprocessing": preprocessing_metadata,
"provenance": result_provenance, # Per-row provenance artifact
"full_result": result,
}
if self.enable_uncertainty:
self._record_uncertainty_event(
estimator_name=estimator_name,
data_model=true_params.get("model_name"),
uncertainty=uncertainty_details,
estimate=hurst_est,
true_value=true_params.get("H"),
data_length=len(processed_data),
estimator_family=estimator_family,
)
except Exception as e:
execution_time = time.time() - start_time
test_result = {
"estimator": estimator_name,
"success": False,
"execution_time": execution_time,
"error_message": str(e),
"estimated_hurst": None,
"true_hurst": true_params.get("H", None),
"error": None,
"r_squared": None,
"p_value": None,
"intercept": None,
"slope": None,
"std_error": None,
"full_result": None,
}
return test_result
[docs]
def _calculate_monte_carlo_mse(
self, estimator, data: np.ndarray, true_value: float, n_simulations: int = 50
) -> Dict[str, Any]:
"""
Calculate mean signed error using Monte Carlo simulations.
Parameters
----------
estimator : BaseEstimator
Estimator instance
data : np.ndarray
Original dataset
true_value : float
True parameter value
n_simulations : int
Number of Monte Carlo simulations
Returns
-------
dict
Mean signed error analysis results
"""
if not self.enable_advanced_metrics or self.mse_analyzer is None:
return {
"mean_signed_error": None,
"bias_percentage": None,
"significant_bias": None,
}
if self.runtime_profile == "quick":
n_simulations = min(n_simulations, 5)
estimates = []
rng = self.random_manager.spawn_generator(
f"benchmark:mse:{estimator.__class__.__name__}"
)
for i in range(n_simulations):
# Add small random noise to create variations
noise_level = 0.01 * np.std(data)
noisy_data = data + rng.normal(0, noise_level, len(data))
try:
result = estimator.estimate(noisy_data)
estimate = result.get("hurst_parameter", None)
if estimate is not None:
estimates.append(estimate)
except:
continue
if len(estimates) == 0:
return {
"mean_signed_error": None,
"bias_percentage": None,
"significant_bias": None,
}
# Create true values list (all same value)
true_values = [true_value] * len(estimates)
# Calculate mean signed error
mse_results = self.mse_analyzer.calculate_mean_signed_error(
estimates, true_values
)
return {
"mean_signed_error": mse_results.get("mean_signed_error"),
"bias_percentage": mse_results.get("bias_percentage"),
"significant_bias": mse_results.get("significant_bias"),
}
[docs]
def _compute_significance_tests(
self,
results: Dict[str, Any],
alpha: float = 0.05,
) -> Dict[str, Any]:
"""
Compute omnibus and post-hoc significance tests across estimators.
Parameters
----------
results : Dict[str, Any]
Raw benchmark results grouped by data model.
alpha : float
Significance level for hypothesis testing.
Returns
-------
Dict[str, Any]
Significance testing outcomes including Friedman statistics and
Holm-adjusted pairwise Wilcoxon tests.
"""
alpha = float(alpha)
if not results:
return {
"status": "insufficient_data",
"reason": "No benchmark results available for significance testing.",
}
records: List[Dict[str, Any]] = []
for model_name, model_data in results.items():
estimator_results = model_data.get("estimator_results", [])
if not estimator_results:
continue
row: Dict[str, Any] = {"data_model": model_name}
for est_result in estimator_results:
if est_result.get("success") and est_result.get("error") is not None:
row[est_result["estimator"]] = est_result["error"]
if len(row) > 2:
records.append(row)
if not records:
return {
"status": "insufficient_data",
"reason": "No comparable estimator errors available across data models.",
}
performance_df = pd.DataFrame(records).set_index("data_model")
performance_df = performance_df.dropna(axis=1, how="any")
performance_df = performance_df.dropna(axis=0, how="any")
if performance_df.shape[0] < 2:
return {
"status": "insufficient_data",
"reason": "At least two data models with complete estimator coverage are required.",
}
if performance_df.shape[1] < 2:
return {
"status": "insufficient_data",
"reason": "At least two estimators with complete coverage are required.",
}
rank_df = performance_df.rank(axis=1, method="average")
estimator_names = list(rank_df.columns)
significance_payload: Dict[str, Any] = {
"status": "ok",
"considered_data_models": list(performance_df.index),
"considered_estimators": estimator_names,
"error_table": {
str(idx): {col: float(val) for col, val in row.items()}
for idx, row in performance_df.iterrows()
},
"rank_table": {
str(idx): {col: float(val) for col, val in row.items()}
for idx, row in rank_df.iterrows()
},
"alpha": alpha,
}
if performance_df.shape[1] >= 3:
try:
friedman_stat, friedman_p = stats.friedmanchisquare(
*[performance_df[col].values for col in performance_df.columns]
)
significance_payload["friedman"] = {
"statistic": float(friedman_stat),
"p_value": float(friedman_p),
"n_data_models": int(performance_df.shape[0]),
"n_estimators": int(performance_df.shape[1]),
"alpha": alpha,
}
except Exception as exc:
significance_payload["friedman"] = {
"statistic": None,
"p_value": None,
"n_data_models": int(performance_df.shape[0]),
"n_estimators": int(performance_df.shape[1]),
"error": str(exc),
"alpha": alpha,
}
else:
significance_payload["friedman"] = {
"statistic": None,
"p_value": None,
"n_data_models": int(performance_df.shape[0]),
"n_estimators": int(performance_df.shape[1]),
"error": "Friedman test requires at least three estimators.",
"alpha": alpha,
}
friedman_p_value = significance_payload["friedman"].get("p_value")
significance_payload["global_null_rejected"] = (
friedman_p_value is not None and friedman_p_value < alpha
)
mean_ranks = {
estimator: float(rank_df[estimator].mean()) for estimator in estimator_names
}
significance_payload["mean_ranks"] = mean_ranks
def _determine_better(est_a: str, est_b: str) -> Optional[str]:
rank_a = mean_ranks.get(est_a)
rank_b = mean_ranks.get(est_b)
if rank_a is None or rank_b is None:
return None
if np.isclose(rank_a, rank_b, atol=1e-9):
return None
return est_a if rank_a < rank_b else est_b
if len(estimator_names) >= 3 and performance_df.shape[0] >= 1:
try:
q_alpha = stats.studentized_range.ppf(
1 - alpha / 2.0, len(estimator_names), np.inf
)
critical_difference = float(
q_alpha
/ np.sqrt(2.0)
* np.sqrt(
len(estimator_names)
* (len(estimator_names) + 1)
/ (6.0 * performance_df.shape[0])
)
)
nemenyi_results: List[Dict[str, Any]] = []
for est_a, est_b in combinations(estimator_names, 2):
diff = abs(mean_ranks[est_a] - mean_ranks[est_b])
better = _determine_better(est_a, est_b)
nemenyi_results.append(
{
"pair": [est_a, est_b],
"mean_rank_difference": float(diff),
"critical_difference": critical_difference,
"significant": bool(diff > critical_difference),
"better_estimator": better,
}
)
significance_payload["nemenyi"] = {
"status": "ok",
"alpha": alpha,
"q_critical": float(q_alpha),
"critical_difference": critical_difference,
"results": nemenyi_results,
}
except Exception as exc:
significance_payload["nemenyi"] = {
"status": "failed",
"alpha": alpha,
"reason": str(exc),
"results": [],
}
else:
significance_payload["nemenyi"] = {
"status": "insufficient_data",
"alpha": alpha,
"reason": "Nemenyi test requires at least three estimators and one data model.",
"results": [],
}
def _apply_holm(
records: List[Dict[str, Any]],
alpha_level: float,
p_key: str = "p_value",
) -> None:
indices = [
idx
for idx, record in enumerate(records)
if record.get(p_key) is not None
]
if not indices:
for record in records:
record.setdefault("holm_p_value", record.get(p_key))
record.setdefault("significant", False)
return
sorted_indices = sorted(indices, key=lambda idx: float(records[idx][p_key]))
m = len(sorted_indices)
previous = 0.0
for order, idx in enumerate(sorted_indices):
raw_p = float(records[idx][p_key])
adjusted = min(1.0, raw_p * (m - order))
adjusted = max(adjusted, previous)
records[idx]["holm_p_value"] = adjusted
records[idx]["significant"] = adjusted < alpha_level
previous = adjusted
for record in records:
record.setdefault("holm_p_value", record.get(p_key))
record.setdefault("significant", False)
wilcoxon_results: List[Dict[str, Any]] = []
for est_a, est_b in combinations(estimator_names, 2):
ranks_a = rank_df[est_a].values
ranks_b = rank_df[est_b].values
if np.allclose(ranks_a, ranks_b):
wilcoxon_results.append(
{
"pair": [est_a, est_b],
"statistic": 0.0,
"p_value": 1.0,
"holm_p_value": 1.0,
"significant": False,
"note": "Identical rank distributions",
"n": int(len(ranks_a)),
"better_estimator": None,
}
)
continue
try:
stat, p_value = stats.wilcoxon(
ranks_a,
ranks_b,
zero_method="pratt",
alternative="two-sided",
)
wilcoxon_results.append(
{
"pair": [est_a, est_b],
"statistic": float(stat),
"p_value": float(p_value),
"n": int(len(ranks_a)),
"better_estimator": _determine_better(est_a, est_b),
}
)
except ValueError as exc:
wilcoxon_results.append(
{
"pair": [est_a, est_b],
"error": str(exc),
"statistic": None,
"p_value": None,
"holm_p_value": None,
"significant": False,
"n": int(len(ranks_a)),
"better_estimator": None,
}
)
_apply_holm(wilcoxon_results, alpha)
significance_payload["wilcoxon_signed_rank"] = wilcoxon_results
significance_payload["post_hoc"] = wilcoxon_results
sign_results: List[Dict[str, Any]] = []
for est_a, est_b in combinations(estimator_names, 2):
diffs = performance_df[est_a].values - performance_df[est_b].values
diffs = diffs[np.isfinite(diffs)]
diffs = diffs[np.abs(diffs) > 1e-12]
n = len(diffs)
if n == 0:
sign_results.append(
{
"pair": [est_a, est_b],
"p_value": None,
"holm_p_value": None,
"significant": False,
"n": 0,
"note": "No non-zero paired differences.",
"better_estimator": None,
"n_positive": 0,
"n_negative": 0,
}
)
continue
n_positive = int(np.sum(diffs > 0))
n_negative = int(np.sum(diffs < 0))
try:
binom_res = stats.binomtest(
n_positive, n_positive + n_negative, 0.5, alternative="two-sided"
)
p_value = float(binom_res.pvalue)
except Exception as exc:
sign_results.append(
{
"pair": [est_a, est_b],
"p_value": None,
"holm_p_value": None,
"significant": False,
"n": int(n_positive + n_negative),
"error": str(exc),
"better_estimator": None,
"n_positive": n_positive,
"n_negative": n_negative,
}
)
continue
better = None
if n_positive != n_negative:
better = est_a if n_negative > n_positive else est_b
sign_results.append(
{
"pair": [est_a, est_b],
"p_value": p_value,
"n": int(n_positive + n_negative),
"n_positive": n_positive,
"n_negative": n_negative,
"better_estimator": better,
}
)
_apply_holm(sign_results, alpha)
significance_payload["sign_test"] = {
"status": "ok" if sign_results else "insufficient_data",
"alpha": alpha,
"results": sign_results,
}
estimator_markers: Dict[str, Dict[str, List[str]]] = {
estimator: {"nemenyi": [], "wilcoxon": [], "sign_test": []}
for estimator in estimator_names
}
def _register_marker(
winner: Optional[str],
pair: List[str],
test_key: str,
) -> None:
if winner is None or winner not in pair or len(pair) != 2:
return
loser = pair[0] if winner == pair[1] else pair[1]
bucket = estimator_markers.setdefault(
winner, {"nemenyi": [], "wilcoxon": [], "sign_test": []}
).setdefault(test_key, [])
if loser not in bucket:
bucket.append(loser)
nemenyi_results = significance_payload["nemenyi"].get("results", [])
if isinstance(nemenyi_results, list):
for record in nemenyi_results:
if record.get("significant"):
_register_marker(
record.get("better_estimator"),
record.get("pair", []),
"nemenyi",
)
for record in wilcoxon_results:
if record.get("significant"):
_register_marker(
record.get("better_estimator"), record.get("pair", []), "wilcoxon"
)
sign_section = significance_payload["sign_test"].get("results", [])
if isinstance(sign_section, list):
for record in sign_section:
if record.get("significant"):
_register_marker(
record.get("better_estimator"),
record.get("pair", []),
"sign_test",
)
for estimator, buckets in estimator_markers.items():
for key, values in buckets.items():
buckets[key] = sorted(set(values))
significance_payload["estimator_markers"] = estimator_markers
return significance_payload
[docs]
def _compute_stratified_metrics(
self,
results: Dict[str, Any],
data_length: int,
contamination_type: Optional[str],
contamination_level: float,
) -> Dict[str, Any]:
"""
Produce stratified summaries across H bands, tail classes, data length, and contamination regime.
"""
if not results:
return {
"status": "insufficient_data",
"reason": "No benchmark results available for stratified analysis.",
}
def init_bucket() -> Dict[str, Any]:
return {
"count": 0,
"success": 0,
"errors": [],
"ci_widths": [],
"coverage": [],
"estimated_values": [],
"true_values": [],
"data_models": set(),
"estimators": set(),
}
hurst_bands: Dict[str, Dict[str, Any]] = {}
tail_classes: Dict[str, Dict[str, Any]] = {}
length_bands: Dict[str, Dict[str, Any]] = {}
contamination_bands: Dict[str, Dict[str, Any]] = {}
length_band = self._categorise_length_band(data_length)
contamination_key = (
"clean"
if contamination_type is None
else f"{contamination_type} (level={contamination_level})"
)
def update_bucket(
container: Dict[str, Dict[str, Any]], key: str
) -> Dict[str, Any]:
if key not in container:
container[key] = init_bucket()
return container[key]
total_observations = 0
for model_name, payload in results.items():
estimator_results = payload.get("estimator_results", [])
tail_class = self._infer_tail_class(model_name, payload.get("data_params"))
for est_result in estimator_results:
if not est_result.get("success"):
continue
error = est_result.get("error")
if error is None or not np.isfinite(error):
continue
hurst_true = est_result.get("true_hurst")
hurst_est = est_result.get("estimated_hurst")
hurst_value = hurst_true if hurst_true is not None else hurst_est
hurst_band = self._categorise_hurst_band(hurst_value)
bucket_h = update_bucket(hurst_bands, hurst_band)
bucket_tail = update_bucket(tail_classes, tail_class)
bucket_length = update_bucket(length_bands, length_band)
bucket_contamination = update_bucket(
contamination_bands, contamination_key
)
for bucket in (
bucket_h,
bucket_tail,
bucket_length,
bucket_contamination,
):
bucket["count"] += 1
bucket["errors"].append(float(error))
bucket["data_models"].add(model_name)
bucket["estimators"].add(est_result.get("estimator"))
if est_result.get("success"):
bucket["success"] += 1
if hurst_est is not None and np.isfinite(hurst_est):
bucket["estimated_values"].append(float(hurst_est))
if hurst_true is not None and np.isfinite(hurst_true):
bucket["true_values"].append(float(hurst_true))
ci = est_result.get("confidence_interval")
if (
isinstance(ci, (list, tuple))
and len(ci) == 2
and ci[0] is not None
and ci[1] is not None
):
try:
ci_width = float(ci[1]) - float(ci[0])
if np.isfinite(ci_width):
bucket["ci_widths"].append(ci_width)
except (TypeError, ValueError):
pass
uncertainty = est_result.get("uncertainty")
if isinstance(uncertainty, dict):
coverage = uncertainty.get("coverage")
primary = uncertainty.get("primary_interval")
method = (
primary.get("method") if isinstance(primary, dict) else None
)
coverage_flag = None
if isinstance(coverage, dict):
if method and method in coverage:
coverage_flag = coverage.get(method)
else:
for value in coverage.values():
if value is not None:
coverage_flag = value
break
if coverage_flag is not None:
try:
bucket["coverage"].append(bool(coverage_flag))
except Exception:
pass
total_observations += 1
if total_observations == 0:
return {
"status": "insufficient_data",
"reason": "No successful estimator runs with comparable errors.",
}
def summarise(container: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
summary: Dict[str, Any] = {}
for key, bucket in container.items():
count = bucket["count"]
success = bucket["success"]
errors = bucket["errors"]
ci_widths = bucket["ci_widths"]
coverage = bucket["coverage"]
estimates = bucket["estimated_values"]
true_values = bucket["true_values"]
if count == 0:
continue
summary[key] = {
"n": int(count),
"success_rate": float(success / count) if count else 0.0,
"mean_error": float(np.mean(errors)) if errors else None,
"median_error": float(np.median(errors)) if errors else None,
"std_error": float(np.std(errors)) if len(errors) > 1 else 0.0,
"mean_ci_width": float(np.mean(ci_widths)) if ci_widths else None,
"coverage_rate": float(np.mean(coverage)) if coverage else None,
"mean_estimated_h": float(np.mean(estimates))
if estimates
else None,
"mean_true_h": float(np.mean(true_values)) if true_values else None,
"data_models": sorted(bucket["data_models"]),
"estimators": sorted(
est for est in bucket["estimators"] if est is not None
),
}
return summary
return {
"status": "ok",
"total_observations": int(total_observations),
"hurst_bands": summarise(hurst_bands),
"tail_classes": summarise(tail_classes),
"data_length_bands": summarise(length_bands),
"contamination": summarise(contamination_bands),
}
[docs]
def _categorise_hurst_band(self, hurst_value: Optional[float]) -> str:
"""Assign H estimates to qualitative persistence bands."""
if hurst_value is None or not np.isfinite(hurst_value):
return "unknown"
if hurst_value < 0.4:
return "short-range (Hโค0.40)"
if hurst_value < 0.55:
return "borderline (0.40<Hโค0.55)"
if hurst_value < 0.7:
return "moderate persistence (0.55<Hโค0.70)"
if hurst_value < 0.85:
return "persistent (0.70<Hโค0.85)"
return "ultra-persistent (H>0.85)"
[docs]
def _categorise_length_band(self, data_length: Optional[int]) -> str:
"""Bucket data length into interpretable regimes."""
if data_length is None:
return "unknown length"
if data_length < 512:
return "short (โค512)"
if data_length < 2048:
return "medium (513โ2048)"
if data_length < 8192:
return "long (2049โ8192)"
return "ultra-long (>8192)"
[docs]
def _infer_estimator_family(self, estimator_name: str) -> str:
"""
Infer the family (classical, ML, neural) from estimator name.
Parameters
----------
estimator_name : str
Name of the estimator
Returns
-------
str
Estimator family
"""
family_keywords = {
"classical": [
"GPH",
"Whittle",
"Periodogram",
"DFA",
"DMA",
"R/S",
"Higuchi",
"CWT",
"WaveletVar",
"WaveletLogVar",
"WaveletWhittle",
"MFDFA",
"WaveletLeaders",
],
"ML": ["RandomForest", "GradientBoosting", "SVR", "XGBoost", "LightGBM"],
"neural": ["CNN", "LSTM", "GRU", "Transformer", "ResNet"],
}
for family, keywords in family_keywords.items():
if any(kw in estimator_name for kw in keywords):
return family
return "other"
[docs]
def _infer_tail_class(
self,
model_name: Optional[str],
data_params: Optional[Dict[str, Any]] = None,
) -> str:
"""Infer a qualitative tail/heaviness class based on the data model."""
if model_name is None:
return "unknown"
mapping = {
"fBm": "gaussian",
"fGn": "gaussian",
"ARFIMAModel": "linear-LRD",
"MRW": "multifractal-heavy-tail",
"alphaStable": "alpha-stable",
"neural_fsde": "neural-fSDE",
}
if model_name in mapping:
return mapping[model_name]
if isinstance(model_name, str) and model_name.lower().startswith("alpha"):
return "alpha-stable"
return "unknown"
[docs]
def _build_provenance_bundle(self, summary: Dict[str, Any]) -> Dict[str, Any]:
"""
Construct a comprehensive provenance bundle using ProvenanceTracker.
This bundle includes all settings needed to reproduce the experiment:
- Data generation parameters
- Estimator configuration
- Preprocessing settings
- Scale selection parameters
- Analytics configuration
- Environment information
"""
estimator_listing = {
category: [
name
for name, estimator in self.all_estimators.get(category, {}).items()
if estimator is not None
]
for category in self.all_estimators
}
# Use the new provenance tracker
benchmark_metadata = {
"timestamp": summary.get("timestamp"),
"benchmark_type": summary.get("benchmark_type"),
"data_length": summary.get("data_length"),
"contamination_type": summary.get("contamination_type"),
"contamination_level": summary.get("contamination_level"),
"total_tests": summary.get("total_tests"),
"successful_tests": summary.get("successful_tests"),
"success_rate": summary.get("success_rate"),
"data_models_tested": summary.get("data_models_tested"),
"estimators_tested": summary.get("estimators_tested"),
}
rng_snapshot = self.random_manager.snapshot()
benchmark_metadata["random_state"] = {
"global_seed": rng_snapshot.global_seed,
"streams": rng_snapshot.child_seeds,
}
return self.provenance_tracker.capture_provenance(
benchmark_metadata, estimator_listing
)
[docs]
def _attach_uncertainty_calibration_summary(
self, summary: Dict[str, Any], lookback_days: int = 90
) -> None:
"""Augment benchmark summaries with uncertainty calibration diagnostics."""
analyzer = getattr(self, "error_analyzer", None)
if analyzer is None:
return
summariser = getattr(analyzer, "summarise_uncertainty_calibration", None)
if summariser is None:
return
try:
calibration_records = summariser(days=lookback_days)
except Exception:
return
if not calibration_records:
return
summary["uncertainty_calibration"] = calibration_records
plotter = getattr(analyzer, "plot_uncertainty_calibration", None)
if plotter is None:
return
figures_dir = self.output_dir / "figures"
figures_dir.mkdir(parents=True, exist_ok=True)
plot_filename = (
f"uncertainty_calibration_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
)
plot_path = figures_dir / plot_filename
try:
generated_path = plotter(str(plot_path), days=lookback_days)
except Exception:
generated_path = None
if generated_path:
summary["uncertainty_calibration_plot"] = generated_path
[docs]
def _build_result_row_provenance(
self, result: Dict[str, Any], data_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Build provenance bundle for a single result row.
This creates a lightweight provenance artifact per result that includes:
- Experiment-level provenance (reference)
- Row-specific parameters (data model, estimator, etc.)
- Result metadata
"""
return {
"experiment_id": self.protocol_config.get("experiment_id"),
"timestamp": result.get("timestamp") or datetime.now().isoformat(),
"estimator": result.get("estimator"),
"data_model": data_params.get("model_name"),
"data_params": data_params,
"preprocessing": result.get("preprocessing", {}),
"estimated_hurst": result.get("estimated_hurst"),
"true_hurst": result.get("true_hurst"),
"error": result.get("error"),
"robustness_panel": result.get("robustness_panel"),
"uncertainty": result.get("uncertainty", {}),
"protocol_version": self.protocol_config.get("version", "2.0"),
"protocol_path": str(self.protocol_config_path),
}
[docs]
def _record_uncertainty_event(
self,
estimator_name: str,
data_model: Optional[str],
uncertainty: Any,
estimate: Optional[float],
true_value: Optional[float],
data_length: int,
estimator_family: Optional[str],
) -> None:
"""Persist uncertainty calibration data via the error analyzer."""
analyzer = getattr(self, "error_analyzer", None)
if analyzer is None:
return
if not isinstance(uncertainty, dict):
return
coverage_data = uncertainty.get("coverage")
if not isinstance(coverage_data, dict):
coverage_data = {}
primary_interval = uncertainty.get("primary_interval")
if isinstance(primary_interval, dict):
primary_method = primary_interval.get("method")
else:
primary_method = None
method_entries = [
("block_bootstrap", uncertainty.get("block_bootstrap")),
("wavelet_bootstrap", uncertainty.get("wavelet_bootstrap")),
("parametric_monte_carlo", uncertainty.get("parametric_monte_carlo")),
]
for method_name, method_summary in method_entries:
if not isinstance(method_summary, dict):
continue
ci = method_summary.get("confidence_interval")
if not (isinstance(ci, (list, tuple)) and len(ci) == 2):
continue
try:
ci_lower = float(ci[0])
ci_upper = float(ci[1])
except (TypeError, ValueError):
continue
if not np.isfinite(ci_lower) or not np.isfinite(ci_upper):
continue
coverage_flag = coverage_data.get(method_name)
if coverage_flag is None:
for value in coverage_data.values():
if value is not None:
coverage_flag = value
break
metadata: Dict[str, Any] = {
"confidence_level": uncertainty.get("confidence_level"),
"n_samples": method_summary.get("n_samples"),
"status": method_summary.get("status"),
"data_length": data_length,
"estimator_family": estimator_family,
"is_primary": method_name == primary_method,
"method_metadata": method_summary.get("metadata"),
}
if estimate is not None and np.isfinite(estimate):
metadata["central_estimate"] = float(estimate)
if true_value is not None and np.isfinite(true_value):
metadata["true_value"] = float(true_value)
try:
analyzer.record_uncertainty_calibration(
estimator_name=estimator_name,
data_model=data_model,
ci_lower=ci_lower,
ci_upper=ci_upper,
estimate=float(estimate) if estimate is not None else None,
true_value=float(true_value) if true_value is not None else None,
method=method_name,
coverage_flag=coverage_flag,
metadata=metadata,
)
except Exception:
continue
[docs]
def run_comprehensive_benchmark(
self,
data_length: int = 1000,
benchmark_type: str = "comprehensive",
contamination_type: Optional[str] = None,
contamination_level: float = 0.1,
save_results: bool = True,
) -> Dict[str, Any]:
"""
Run comprehensive benchmark across all estimators and data models.
Parameters
----------
data_length : int
Length of test data to generate
benchmark_type : str
Type of benchmark to run:
- 'comprehensive': All estimators (default)
- 'classical': Only classical statistical estimators
- 'ML': Only machine learning estimators (non-neural)
- 'neural': Only neural network estimators
contamination_type : str, optional
Type of contamination to apply to the data
contamination_level : float
Level/intensity of contamination (0.0 to 1.0)
save_results : bool
Whether to save results to file
Returns
-------
dict
Comprehensive benchmark results
"""
print("๐ Starting LRDBench Benchmark")
print("=" * 60)
print(f"Benchmark Type: {benchmark_type.upper()}")
if contamination_type:
print(f"Contamination: {contamination_type} (level: {contamination_level})")
print("=" * 60)
# Get estimators based on benchmark type
estimators = self.get_estimators_by_type(benchmark_type, data_length)
print(f"Testing {len(estimators)} estimators...")
all_results = {}
total_tests = 0
successful_tests = 0
# Test with different data models
for model_name in self.data_models:
print(f"\n๐ Testing with {model_name} data model...")
try:
# Generate clean data
data, params = self.generate_test_data(
model_name, data_length=data_length
)
print(f" Generated {len(data)} clean data points")
# Apply contamination if specified
if contamination_type:
data, contamination_info = self.apply_contamination(
data, contamination_type, contamination_level
)
print(
f" Applied {contamination_type} contamination (level: {contamination_level})"
)
params["contamination"] = contamination_info
model_results = []
# Test all estimators
for estimator_name in estimators:
print(f" ๐ Testing {estimator_name}...", end=" ")
result = self.run_single_estimator_test(
estimator_name, data, params
)
model_results.append(result)
if result["success"]:
print("โ
")
successful_tests += 1
else:
print(f"โ ({result['error_message']})")
total_tests += 1
all_results[model_name] = {
"data_params": params,
"estimator_results": model_results,
}
except Exception as e:
print(f" โ Error with {model_name}: {e}")
all_results[model_name] = {
"data_params": None,
"estimator_results": [],
"error": str(e),
}
# Compile summary
summary = {
"timestamp": datetime.now().isoformat(),
"benchmark_type": benchmark_type,
"contamination_type": contamination_type,
"contamination_level": contamination_level,
"data_length": data_length,
"total_tests": total_tests,
"successful_tests": successful_tests,
"success_rate": successful_tests / total_tests if total_tests > 0 else 0,
"data_models_tested": len(self.data_models),
"estimators_tested": len(estimators),
"results": all_results,
"protocol_config": json.loads(json.dumps(self.protocol_config)),
"protocol_path": str(self.protocol_config_path),
}
rng_snapshot = self.random_manager.snapshot()
summary["random_state"] = {
"global_seed": rng_snapshot.global_seed,
"streams": rng_snapshot.child_seeds,
}
summary["stratified_metrics"] = self._compute_stratified_metrics(
all_results,
data_length=data_length,
contamination_type=contamination_type,
contamination_level=contamination_level,
)
summary["provenance"] = self._build_provenance_bundle(summary)
# Compute statistical significance analysis
significance_results = self._compute_significance_tests(all_results)
summary["significance_analysis"] = significance_results
# Attach uncertainty calibration diagnostics
self._attach_uncertainty_calibration_summary(summary)
# Save results if requested
if save_results:
self.save_results(summary)
# Print summary
self.print_summary(summary)
return summary
[docs]
def run_classical_benchmark(
self,
data_length: int = 1000,
contamination_type: Optional[str] = None,
contamination_level: float = 0.1,
save_results: bool = True,
) -> Dict[str, Any]:
"""Run benchmark with only classical statistical estimators."""
return self.run_comprehensive_benchmark(
data_length=data_length,
benchmark_type="classical",
contamination_type=contamination_type,
contamination_level=contamination_level,
save_results=save_results,
)
[docs]
def run_ml_benchmark(
self,
data_length: int = 1000,
contamination_type: Optional[str] = None,
contamination_level: float = 0.1,
save_results: bool = True,
) -> Dict[str, Any]:
"""Run benchmark with only machine learning estimators (non-neural)."""
return self.run_comprehensive_benchmark(
data_length=data_length,
benchmark_type="ML",
contamination_type=contamination_type,
contamination_level=contamination_level,
save_results=save_results,
)
[docs]
def run_neural_benchmark(
self,
data_length: int = 1000,
contamination_type: Optional[str] = None,
contamination_level: float = 0.1,
save_results: bool = True,
) -> Dict[str, Any]:
"""Run benchmark with only neural network estimators."""
return self.run_comprehensive_benchmark(
data_length=data_length,
benchmark_type="neural",
contamination_type=contamination_type,
contamination_level=contamination_level,
save_results=save_results,
)
[docs]
def run_classical_estimators(
self,
data_models: Optional[list] = None,
n_samples: int = 1000,
n_trials: int = 10,
save_results: bool = True,
) -> Dict[str, Any]:
"""
Backward-compatible alias for run_classical_benchmark.
This method maintains the old API for compatibility with existing code.
"""
# Old API used n_samples for data_length
return self.run_classical_benchmark(
data_length=n_samples,
save_results=save_results,
)
[docs]
def run_advanced_metrics_benchmark(
self,
data_length: int = 1000,
benchmark_type: str = "comprehensive",
n_monte_carlo: int = 100,
convergence_threshold: float = 1e-6,
save_results: bool = True,
) -> Dict[str, Any]:
"""
Run advanced metrics benchmark focusing on convergence and bias analysis.
Parameters
----------
data_length : int
Length of test data to generate
benchmark_type : str
Type of benchmark to run
n_monte_carlo : int
Number of Monte Carlo simulations for bias analysis
convergence_threshold : float
Threshold for convergence detection
save_results : bool
Whether to save results to file
Returns
-------
dict
Advanced metrics benchmark results
"""
print("๐ Starting Advanced Metrics Benchmark")
print("=" * 60)
print(f"Benchmark Type: {benchmark_type.upper()}")
print(f"Monte Carlo Simulations: {n_monte_carlo}")
print(f"Convergence Threshold: {convergence_threshold}")
print("=" * 60)
# Get estimators
estimators = self.get_estimators_by_type(benchmark_type, data_length)
print(f"Testing {len(estimators)} estimators...")
# Initialize advanced profiler
advanced_profiler = AdvancedPerformanceProfiler(
convergence_threshold=convergence_threshold, max_iterations=100
)
all_results = {}
total_tests = 0
successful_tests = 0
# Test with different data models
for model_name in self.data_models:
print(f"\n๐ Testing with {model_name} data model...")
try:
# Generate clean data
data, params = self.generate_test_data(
model_name, data_length=data_length
)
print(f" Generated {len(data)} clean data points")
true_value = params.get("H", None)
if true_value is None:
print(
f" โ ๏ธ No true H value available for {model_name}, skipping advanced metrics"
)
continue
model_results = []
# Test all estimators with advanced profiling
for estimator_name in estimators:
print(f" ๐ Testing {estimator_name}...", end=" ")
estimator = estimators[estimator_name]
# Run advanced performance profiling
profile_results = advanced_profiler.profile_estimator_performance(
estimator, data, true_value, n_monte_carlo
)
# Extract key metrics
basic_perf = profile_results["basic_performance"]
convergence_analysis = profile_results["convergence_analysis"]
bias_analysis = profile_results["bias_analysis"]
scaling_diagnostics = profile_results.get("scaling_diagnostics")
robustness_panel = profile_results.get("robustness_panel")
comprehensive_score = profile_results["comprehensive_score"]
if basic_perf["success"]:
print("โ
")
successful_tests += 1
result = {
"estimator": estimator_name,
"success": True,
"execution_time": basic_perf["execution_time"],
"estimated_hurst": basic_perf["result"].get(
"hurst_parameter"
),
"true_hurst": true_value,
"comprehensive_score": comprehensive_score,
"convergence_analysis": convergence_analysis,
"bias_analysis": bias_analysis,
"scaling_diagnostics": scaling_diagnostics,
"robustness_panel": robustness_panel,
"full_result": basic_perf["result"],
}
else:
print(f"โ ({basic_perf['error_message']})")
result = {
"estimator": estimator_name,
"success": False,
"execution_time": basic_perf["execution_time"],
"error_message": basic_perf["error_message"],
"comprehensive_score": 0.0,
"scaling_diagnostics": None,
"robustness_panel": None,
}
model_results.append(result)
total_tests += 1
all_results[model_name] = {
"data_params": params,
"estimator_results": model_results,
}
except Exception as e:
print(f" โ Error with {model_name}: {e}")
all_results[model_name] = {
"data_params": None,
"estimator_results": [],
"error": str(e),
}
# Compile summary
summary = {
"timestamp": datetime.now().isoformat(),
"benchmark_type": f"advanced_{benchmark_type}",
"n_monte_carlo": n_monte_carlo,
"convergence_threshold": convergence_threshold,
"total_tests": total_tests,
"successful_tests": successful_tests,
"success_rate": successful_tests / total_tests if total_tests > 0 else 0,
"data_models_tested": len(self.data_models),
"estimators_tested": len(estimators),
"results": all_results,
}
# Save results if requested
if save_results:
self.save_advanced_results(summary)
# Print advanced summary
self.print_advanced_summary(summary)
return summary
[docs]
def save_advanced_results(self, results: Dict[str, Any]) -> None:
"""Save advanced benchmark results to files."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save detailed results as JSON
json_file = self.output_dir / f"advanced_benchmark_{timestamp}.json"
with open(json_file, "w") as f:
json.dump(results, f, indent=2, default=str)
# Save summary as CSV
csv_data = []
for model_name, model_data in results["results"].items():
if "estimator_results" in model_data:
for est_result in model_data["estimator_results"]:
convergence_analysis = est_result.get("convergence_analysis", {})
bias_analysis = est_result.get("bias_analysis", {})
scaling_diagnostics = (
est_result.get("scaling_diagnostics", {}) or {}
)
robustness_panel = est_result.get("robustness_panel", {}) or {}
robustness_summary = (
robustness_panel.get("summary", {})
if isinstance(robustness_panel, dict)
else {}
)
csv_data.append(
{
"data_model": model_name,
"estimator": est_result["estimator"],
"success": est_result["success"],
"execution_time": est_result["execution_time"],
"estimated_hurst": est_result.get("estimated_hurst"),
"true_hurst": est_result.get("true_hurst"),
"comprehensive_score": est_result.get(
"comprehensive_score"
),
# Convergence metrics
"convergence_rate": convergence_analysis.get(
"convergence_rate"
),
"convergence_achieved": convergence_analysis.get(
"convergence_achieved"
),
"convergence_iteration": convergence_analysis.get(
"convergence_iteration"
),
"stability_metric": convergence_analysis.get(
"stability_metric"
),
# Bias metrics
"mean_signed_error": bias_analysis.get("mean_signed_error"),
"mean_absolute_error": bias_analysis.get(
"mean_absolute_error"
),
"root_mean_squared_error": bias_analysis.get(
"root_mean_squared_error"
),
"bias_percentage": bias_analysis.get("bias_percentage"),
"significant_bias": bias_analysis.get("significant_bias"),
"t_statistic": bias_analysis.get("t_statistic"),
"p_value": bias_analysis.get("p_value"),
# Scaling diagnostics
"scaling_status": scaling_diagnostics.get("status"),
"scaling_slope": scaling_diagnostics.get("slope"),
"scaling_r_squared": scaling_diagnostics.get("r_squared"),
"scaling_std_err": scaling_diagnostics.get("std_err"),
"scaling_break_scale": (
scaling_diagnostics.get("breakpoint", {}).get(
"break_scale"
)
if isinstance(
scaling_diagnostics.get("breakpoint"), dict
)
else None
),
"scaling_n_points": scaling_diagnostics.get("n_points"),
# Robustness panels
"robust_successful_scenarios": robustness_summary.get(
"successful_scenarios"
),
"robust_n_scenarios": robustness_summary.get("n_scenarios"),
"robust_mean_abs_delta": robustness_summary.get(
"mean_abs_delta"
),
"robust_max_abs_delta": robustness_summary.get(
"max_abs_delta"
),
}
)
if csv_data:
df = pd.DataFrame(csv_data)
csv_file = self.output_dir / f"advanced_benchmark_summary_{timestamp}.csv"
df.to_csv(csv_file, index=False)
print(f"\n๐พ Advanced results saved to:")
print(f" JSON: {json_file}")
print(f" CSV: {csv_file}")
[docs]
def print_advanced_summary(self, summary: Dict[str, Any]) -> None:
"""Print advanced benchmark summary."""
print("\n" + "=" * 60)
print("๐ ADVANCED METRICS BENCHMARK SUMMARY")
print("=" * 60)
print(f"Benchmark Type: {summary.get('benchmark_type', 'Unknown').upper()}")
print(f"Monte Carlo Simulations: {summary['n_monte_carlo']}")
print(f"Convergence Threshold: {summary['convergence_threshold']}")
print(f"Total Tests: {summary['total_tests']}")
print(f"Successful: {summary['successful_tests']}")
print(f"Success Rate: {summary['success_rate']:.1%}")
print(f"Data Models: {summary['data_models_tested']}")
print(f"Estimators: {summary['estimators_tested']}")
# Show top performers by comprehensive score
print("\n๐ TOP PERFORMING ESTIMATORS (By Comprehensive Score):")
# Aggregate results by estimator
estimator_scores = {}
for model_name, model_data in summary["results"].items():
if "estimator_results" in model_data:
for est_result in model_data["estimator_results"]:
if est_result["success"]:
estimator_name = est_result["estimator"]
score = est_result.get("comprehensive_score", 0.0)
if estimator_name not in estimator_scores:
estimator_scores[estimator_name] = []
estimator_scores[estimator_name].append(score)
if estimator_scores:
# Calculate average comprehensive score for each estimator
avg_scores = []
for estimator_name, scores in estimator_scores.items():
avg_score = np.mean(scores)
avg_scores.append(
{
"estimator": estimator_name,
"avg_comprehensive_score": avg_score,
"data_models_tested": len(scores),
}
)
# Sort by average comprehensive score (higher is better)
avg_scores.sort(key=lambda x: x["avg_comprehensive_score"], reverse=True)
for i, score_data in enumerate(avg_scores[:5]):
print(f" {i+1}. {score_data['estimator']}")
print(
f" Comprehensive Score: {score_data['avg_comprehensive_score']:.4f}"
)
print(f" Data Models Tested: {score_data['data_models_tested']}")
# Scaling diagnostics overview
scaling_entries = []
robustness_entries = []
for model_data in summary["results"].values():
if "estimator_results" not in model_data:
continue
for est_result in model_data["estimator_results"]:
scaling_diag = est_result.get("scaling_diagnostics")
if (
isinstance(scaling_diag, dict)
and scaling_diag.get("status") == "ok"
):
scaling_entries.append(
(
est_result["estimator"],
scaling_diag.get("slope"),
scaling_diag.get("r_squared"),
scaling_diag.get("n_points"),
)
)
robustness_panel = est_result.get("robustness_panel")
if isinstance(robustness_panel, dict):
summary_info = robustness_panel.get("summary", {})
if summary_info:
robustness_entries.append(
(
est_result["estimator"],
summary_info.get("successful_scenarios"),
summary_info.get("n_scenarios"),
summary_info.get("mean_abs_delta"),
summary_info.get("max_abs_delta"),
)
)
if scaling_entries:
scaling_entries.sort(
key=lambda item: (
item[2] is None,
-(item[2] or 0.0),
)
)
print("\n๐ฌ SCALING DIAGNOSTICS SNAPSHOT:")
for estimator, slope, r_squared, n_points in scaling_entries[:5]:
slope_display = f"{slope:.4f}" if slope is not None else "n/a"
r2_display = f"{r_squared:.3f}" if r_squared is not None else "n/a"
print(
f" {estimator}: slope={slope_display}, Rยฒ={r2_display}, points={n_points}"
)
if robustness_entries:
robustness_entries.sort(
key=lambda item: (
item[3] is None,
-(item[3] or 0.0),
)
)
print("\n๐ก๏ธ ROBUSTNESS STRESS PANELS:")
for estimator, success, total, mean_delta, max_delta in robustness_entries[
:5
]:
success_display = f"{success}/{total}" if success is not None else "n/a"
mean_display = f"{mean_delta:.4f}" if mean_delta is not None else "n/a"
max_display = f"{max_delta:.4f}" if max_delta is not None else "n/a"
print(
f" {estimator}: success={success_display}, mean |ฮH|={mean_display}, max |ฮH|={max_display}"
)
[docs]
def save_results(self, results: Dict[str, Any]) -> None:
"""Save benchmark results to files."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save detailed results as JSON
json_file = self.output_dir / f"comprehensive_benchmark_{timestamp}.json"
with open(json_file, "w") as f:
json.dump(results, f, indent=2, default=str)
# Save summary as CSV
csv_data = []
for model_name, model_data in results["results"].items():
if "estimator_results" in model_data:
for est_result in model_data["estimator_results"]:
# Extract advanced metrics
advanced_metrics = est_result.get("advanced_metrics", {})
uncertainty = est_result.get("uncertainty", {})
primary_interval = None
coverage_flag = None
if isinstance(uncertainty, dict):
primary_interval = uncertainty.get("primary_interval")
coverage_data = uncertainty.get("coverage", {})
if isinstance(primary_interval, dict):
method_name = primary_interval.get("method")
else:
method_name = None
if isinstance(coverage_data, dict):
if method_name and method_name in coverage_data:
coverage_flag = coverage_data.get(method_name)
else:
for value in coverage_data.values():
if value is not None:
coverage_flag = value
break
else:
method_name = None
if isinstance(primary_interval, dict):
primary_ci = primary_interval.get("confidence_interval")
if primary_ci is not None and len(primary_ci) == 2:
ci_lower = float(primary_ci[0])
ci_upper = float(primary_ci[1])
else:
ci_lower = None
ci_upper = None
uncertainty_samples = primary_interval.get("n_samples")
else:
ci_lower = None
ci_upper = None
uncertainty_samples = None
csv_data.append(
{
"data_model": model_name,
"estimator": est_result["estimator"],
"success": est_result["success"],
"execution_time": est_result["execution_time"],
"estimated_hurst": est_result["estimated_hurst"],
"true_hurst": est_result["true_hurst"],
"error": est_result["error"],
"r_squared": est_result["r_squared"],
"p_value": est_result["p_value"],
"intercept": est_result["intercept"],
"slope": est_result["slope"],
"std_error": est_result["std_error"],
# Advanced metrics
"convergence_rate": advanced_metrics.get(
"convergence_rate"
),
"convergence_achieved": advanced_metrics.get(
"convergence_achieved"
),
"stability_metric": advanced_metrics.get(
"stability_metric"
),
"mean_signed_error": advanced_metrics.get(
"mean_signed_error"
),
"bias_percentage": advanced_metrics.get("bias_percentage"),
"significant_bias": advanced_metrics.get(
"significant_bias"
),
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"uncertainty_method": method_name,
"uncertainty_samples": uncertainty_samples,
"uncertainty_status": uncertainty.get("status")
if isinstance(uncertainty, dict)
else None,
"coverage_primary": coverage_flag,
}
)
if csv_data:
df = pd.DataFrame(csv_data)
csv_file = self.output_dir / f"benchmark_summary_{timestamp}.csv"
df.to_csv(csv_file, index=False)
print(f"\n๐พ Results saved to:")
print(f" JSON: {json_file}")
print(f" CSV: {csv_file}")
else:
print(f"\n๐พ Results saved to:")
print(f" JSON: {json_file}")
stratified_metrics = results.get("stratified_metrics")
if isinstance(stratified_metrics, dict):
stratified_file = self.output_dir / f"stratified_metrics_{timestamp}.json"
with open(stratified_file, "w") as f:
json.dump(stratified_metrics, f, indent=2, default=str)
print(f" Stratified: {stratified_file}")
# Generate enhanced stratified reports
try:
stratified_report = self.stratified_report_generator.generate_report(
results, output_dir=self.output_dir, formats=["markdown", "json", "csv"]
)
print(f" Enhanced stratified reports generated")
except Exception as exc:
warnings.warn(f"Enhanced stratified report generation failed: {exc}")
provenance = results.get("provenance")
if provenance:
provenance_file = self.output_dir / f"benchmark_provenance_{timestamp}.json"
with open(provenance_file, "w") as f:
json.dump(provenance, f, indent=2, default=str)
print(f" Provenance: {provenance_file}")
calibration_records = results.get("uncertainty_calibration")
if calibration_records:
calibration_dir = self.output_dir / "calibration"
calibration_dir.mkdir(parents=True, exist_ok=True)
calibration_file = (
calibration_dir / f"uncertainty_calibration_{timestamp}.json"
)
with open(calibration_file, "w") as f:
json.dump(calibration_records, f, indent=2, default=str)
print(f" Calibration: {calibration_file}")
calibration_plot = results.get("uncertainty_calibration_plot")
if calibration_plot:
print(f" Calibration plot: {calibration_plot}")
[docs]
def print_summary(self, summary: Dict[str, Any]) -> None:
"""Print benchmark summary."""
print("\n" + "=" * 60)
print("๐ BENCHMARK SUMMARY")
print("=" * 60)
print(f"Benchmark Type: {summary.get('benchmark_type', 'Unknown').upper()}")
if summary.get("contamination_type"):
print(
f"Contamination: {summary['contamination_type']} (level: {summary['contamination_level']})"
)
protocol_info = summary.get("protocol_config", {})
protocol_version = protocol_info.get("version", "unknown")
print(
f"Protocol Config: {summary.get('protocol_path', 'config/benchmark_protocol.yaml')} "
f"(version {protocol_version})"
)
preprocessing_cfg = protocol_info.get("preprocessing", {})
print(
f"Preprocessing settings: winsorize={preprocessing_cfg.get('winsorize_limits')} | "
f"outlier_threshold={preprocessing_cfg.get('outlier_threshold')}"
)
print(f"Total Tests: {summary['total_tests']}")
print(f"Successful: {summary['successful_tests']}")
print(f"Success Rate: {summary['success_rate']:.1%}")
print(f"Data Models: {summary['data_models_tested']}")
print(f"Estimators: {summary['estimators_tested']}")
# Show top performers (aggregated by estimator across all data models)
print("\n๐ TOP PERFORMING ESTIMATORS (Average across all data models):")
# Aggregate results by estimator
estimator_performance = {}
for model_name, model_data in summary["results"].items():
if "estimator_results" in model_data:
for est_result in model_data["estimator_results"]:
if est_result["success"] and est_result["error"] is not None:
estimator_name = est_result["estimator"]
if estimator_name not in estimator_performance:
estimator_performance[estimator_name] = {
"errors": [],
"execution_times": [],
"data_models": [],
"convergence_rates": [],
"mean_signed_errors": [],
"bias_percentages": [],
"stability_metrics": [],
"ci_widths": [],
"coverage_flags": [],
}
estimator_performance[estimator_name]["errors"].append(
est_result["error"]
)
estimator_performance[estimator_name]["execution_times"].append(
est_result["execution_time"]
)
estimator_performance[estimator_name]["data_models"].append(
model_name
)
# Add advanced metrics
advanced_metrics = est_result.get("advanced_metrics", {})
if advanced_metrics.get("convergence_rate") is not None:
estimator_performance[estimator_name][
"convergence_rates"
].append(advanced_metrics["convergence_rate"])
if advanced_metrics.get("mean_signed_error") is not None:
estimator_performance[estimator_name][
"mean_signed_errors"
].append(advanced_metrics["mean_signed_error"])
if advanced_metrics.get("bias_percentage") is not None:
estimator_performance[estimator_name][
"bias_percentages"
].append(advanced_metrics["bias_percentage"])
if advanced_metrics.get("stability_metric") is not None:
estimator_performance[estimator_name][
"stability_metrics"
].append(advanced_metrics["stability_metric"])
# Confidence interval statistics
ci = est_result.get("confidence_interval")
if (
isinstance(ci, (list, tuple))
and len(ci) == 2
and ci[0] is not None
and ci[1] is not None
):
try:
width = float(ci[1]) - float(ci[0])
if np.isfinite(width):
estimator_performance[estimator_name][
"ci_widths"
].append(width)
except (TypeError, ValueError):
pass
uncertainty = est_result.get("uncertainty", {})
if isinstance(uncertainty, dict):
coverage_data = uncertainty.get("coverage", {})
primary = uncertainty.get("primary_interval")
method = (
primary.get("method")
if isinstance(primary, dict)
else None
)
coverage_flag = None
if isinstance(coverage_data, dict):
if method and method in coverage_data:
coverage_flag = coverage_data.get(method)
else:
for value in coverage_data.values():
if value is not None:
coverage_flag = value
break
if coverage_flag is not None:
estimator_performance[estimator_name][
"coverage_flags"
].append(bool(coverage_flag))
if estimator_performance:
# Calculate average performance for each estimator
aggregated_performance = []
for estimator_name, perf_data in estimator_performance.items():
avg_error = np.mean(perf_data["errors"])
avg_time = np.mean(perf_data["execution_times"])
data_models_tested = len(perf_data["data_models"])
# Calculate advanced metrics averages
avg_convergence_rate = (
np.mean(perf_data["convergence_rates"])
if perf_data["convergence_rates"]
else None
)
avg_mean_signed_error = (
np.mean(perf_data["mean_signed_errors"])
if perf_data["mean_signed_errors"]
else None
)
avg_bias_percentage = (
np.mean(perf_data["bias_percentages"])
if perf_data["bias_percentages"]
else None
)
avg_stability_metric = (
np.mean(perf_data["stability_metrics"])
if perf_data["stability_metrics"]
else None
)
avg_ci_width = (
np.mean(perf_data["ci_widths"]) if perf_data["ci_widths"] else None
)
coverage_rate = (
np.mean(perf_data["coverage_flags"])
if perf_data["coverage_flags"]
else None
)
aggregated_performance.append(
{
"estimator": estimator_name,
"avg_error": avg_error,
"avg_time": avg_time,
"data_models_tested": data_models_tested,
"min_error": min(perf_data["errors"]),
"max_error": max(perf_data["errors"]),
"avg_convergence_rate": avg_convergence_rate,
"avg_mean_signed_error": avg_mean_signed_error,
"avg_bias_percentage": avg_bias_percentage,
"avg_stability_metric": avg_stability_metric,
"avg_ci_width": avg_ci_width,
"coverage_rate": coverage_rate,
}
)
# Sort by average error (lower is better)
aggregated_performance.sort(key=lambda x: x["avg_error"])
for i, perf in enumerate(aggregated_performance[:5]):
print(f" {i+1}. {perf['estimator']}")
print(
f" Avg Error: {perf['avg_error']:.4f} (Range: {perf['min_error']:.4f}-{perf['max_error']:.4f})"
)
print(
f" Avg Time: {perf['avg_time']:.3f}s | Data Models: {perf['data_models_tested']}"
)
# Display advanced metrics
if perf["avg_convergence_rate"] is not None:
print(f" Convergence Rate: {perf['avg_convergence_rate']:.4f}")
if perf["avg_mean_signed_error"] is not None:
print(
f" Mean Signed Error: {perf['avg_mean_signed_error']:.4f}"
)
if perf["avg_bias_percentage"] is not None:
print(f" Bias: {perf['avg_bias_percentage']:.2f}%")
if perf["avg_stability_metric"] is not None:
print(f" Stability: {perf['avg_stability_metric']:.4f}")
if perf["avg_ci_width"] is not None:
print(f" Mean 95% CI width: {perf['avg_ci_width']:.4f}")
if perf["coverage_rate"] is not None:
print(f" Empirical coverage: {perf['coverage_rate']:.2%}")
# Show estimated H values for this estimator across data models
estimator_name = perf["estimator"]
print(f" Estimated H values:")
for model_name, model_data in summary["results"].items():
if "estimator_results" in model_data:
for est_result in model_data["estimator_results"]:
if (
est_result["estimator"] == estimator_name
and est_result["success"]
):
true_h = est_result["true_hurst"]
est_h = est_result["estimated_hurst"]
if est_h is not None:
print(
f" {model_name}: H_est={est_h:.4f}, H_true={true_h:.4f}"
)
print()
stratified = summary.get("stratified_metrics", {})
if stratified:
print("\n๐ STRATIFIED INSIGHTS:")
status = stratified.get("status", "unavailable")
print(f"Status: {status}")
if status == "ok":
def _print_band(label: str, data: Dict[str, Any]) -> None:
if not data:
return
print(f"\n {label}:")
sorted_items = sorted(
data.items(),
key=lambda kv: (
kv[1].get("mean_error") is None,
kv[1].get("mean_error", 0.0),
),
)
for band, metrics in sorted_items:
mean_error = metrics.get("mean_error")
coverage_rate = metrics.get("coverage_rate")
success_rate = metrics.get("success_rate")
print(f" โข {band}:")
if mean_error is not None:
print(f" mean error={mean_error:.4f}")
if coverage_rate is not None:
print(f" coverage={coverage_rate:.2%}")
if success_rate is not None:
print(f" success={success_rate:.2%}")
mean_ci = metrics.get("mean_ci_width")
if mean_ci is not None:
print(f" mean CI width={mean_ci:.4f}")
data_models = metrics.get("data_models") or []
if data_models:
print(f" data models: {', '.join(data_models)}")
_print_band("Hurst regimes", stratified.get("hurst_bands", {}))
_print_band("Tail classes", stratified.get("tail_classes", {}))
_print_band("Length regimes", stratified.get("data_length_bands", {}))
_print_band(
"Contamination regimes", stratified.get("contamination", {})
)
else:
print(stratified.get("reason", "No stratified analysis available."))
significance = summary.get("significance_analysis", {})
if significance:
print("\n๐งช SIGNIFICANCE ANALYSIS:")
status = significance.get("status", "unavailable")
print(f"Status: {status}")
if status == "ok":
friedman = significance.get("friedman", {})
friedman_stat = friedman.get("statistic")
friedman_p = friedman.get("p_value")
if friedman_stat is not None and friedman_p is not None:
print(
f"Friedman ฯยฒ={friedman_stat:.4f} "
f"(p={friedman_p:.4f}) "
f"across {friedman.get('n_data_models', 0)} data models "
f"and {friedman.get('n_estimators', 0)} estimators"
)
else:
reason = friedman.get("error", "Friedman test not available.")
print(
f"Friedman test unavailable: {reason} "
f"(considered {friedman.get('n_data_models', 0)} data models, "
f"{friedman.get('n_estimators', 0)} estimators)"
)
print("Mean ranks (lower is better):")
for estimator, rank in significance.get("mean_ranks", {}).items():
print(f" {estimator}: {rank:.3f}")
significant_pairs = [
res
for res in significance.get("post_hoc", [])
if res.get("significant")
]
if significant_pairs:
print("Significant pairwise differences after Holm correction:")
for res in significant_pairs:
pair = res.get("pair", [])
print(
f" {pair[0]} vs {pair[1]}: "
f"p={res.get('p_value'):.4f}, "
f"Holm-adjusted p={res.get('holm_p_value'):.4f}"
)
else:
print(
"No pairwise differences remained significant after Holm correction."
)
else:
reason = significance.get("reason", "insufficient data")
print(f"Significance testing not performed: {reason}")
calibration = summary.get("uncertainty_calibration")
if isinstance(calibration, list) and calibration:
print("\n๐ฏ UNCERTAINTY CALIBRATION:")
method_stats: Dict[str, Dict[str, float]] = {}
family_stats: Dict[str, Dict[str, float]] = {}
for entry in calibration:
coverage = entry.get("empirical_coverage")
n = entry.get("n")
if coverage is None or n in (None, 0):
continue
method = entry.get("method") or "unknown"
family = entry.get("estimator_family") or "unspecified"
method_bucket = method_stats.setdefault(
method, {"weighted": 0.0, "total": 0.0}
)
method_bucket["weighted"] += float(coverage) * float(n)
method_bucket["total"] += float(n)
family_bucket = family_stats.setdefault(
family, {"weighted": 0.0, "total": 0.0}
)
family_bucket["weighted"] += float(coverage) * float(n)
family_bucket["total"] += float(n)
if method_stats:
print(" Method-level empirical coverage:")
for method, mstats in sorted(
method_stats.items(), key=lambda kv: kv[0]
):
if mstats["total"] > 0:
rate = mstats["weighted"] / mstats["total"]
print(
f" โข {method}: {rate:.2%} across {int(mstats['total'])} runs"
)
if family_stats:
print(" Family-level coverage (all methods combined):")
for family, fstats in sorted(
family_stats.items(), key=lambda kv: kv[0]
):
if fstats["total"] > 0:
rate = fstats["weighted"] / fstats["total"]
print(
f" โข {family}: {rate:.2%} across {int(fstats['total'])} runs"
)
calibration_plot = summary.get("uncertainty_calibration_plot")
if calibration_plot:
print(f" Calibration plot: {calibration_plot}")
# Show detailed breakdown by data model
print("\n๐ DETAILED PERFORMANCE BY DATA MODEL:")
for model_name, model_data in summary["results"].items():
if "estimator_results" in model_data and model_data["estimator_results"]:
print(f"\n {model_name}:")
successful_results = [
r
for r in model_data["estimator_results"]
if r["success"] and r["error"] is not None
]
if successful_results:
# Sort by error for this data model
successful_results.sort(key=lambda x: x["error"])
for i, result in enumerate(
successful_results[:3]
): # Top 3 for each model
print(
f" {i+1}. {result['estimator']}: Error {result['error']:.4f}, Time {result['execution_time']:.3f}s"
)
else:
print(" No successful estimators")
print("\n๐ฏ Benchmark completed successfully!")
[docs]
def export_results(self, results: Dict[str, Any], output_path: str) -> None:
"""
Export benchmark results to a file.
Parameters
----------
results : dict
Benchmark results dictionary
output_path : str
Path to save the results (JSON format)
"""
import json
from pathlib import Path
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"๐พ Results exported to: {output_file}")
def main():
"""
Main function for running comprehensive benchmarks.
This serves as the entry point for the lrdbench command.
"""
print("๐ LRDBench - Comprehensive Benchmark System")
print("=" * 50)
# Initialize benchmark system
benchmark = ComprehensiveBenchmark()
# Run comprehensive benchmark (default)
print("\n๐ Running COMPREHENSIVE benchmark (all estimators)...")
results = benchmark.run_comprehensive_benchmark(
data_length=1000, benchmark_type="comprehensive", save_results=True
)
print(f"\nโ
Benchmark completed with {results['success_rate']:.1%} success rate!")
print("\n๐ก Available benchmark types:")
print(" - 'comprehensive': All estimators (default)")
print(" - 'classical': Only classical statistical estimators")
print(" - 'ML': Only machine learning estimators (non-neural)")
print(" - 'neural': Only neural network estimators")
print("\n๐ก Available contamination types:")
print(" - 'additive_gaussian': Add Gaussian noise")
print(" - 'multiplicative_noise': Multiplicative noise")
print(" - 'outliers': Add outliers")
print(" - 'trend': Add trend")
print(" - 'seasonal': Add seasonal patterns")
print(" - 'missing_data': Remove data points")
print("\n Use the ComprehensiveBenchmark class in your own code:")
print(" from analysis.benchmark import ComprehensiveBenchmark")
print(" benchmark = ComprehensiveBenchmark()")
print(" results = benchmark.run_comprehensive_benchmark(")
print(" benchmark_type='classical', # or 'ML', 'neural'")
print(" contamination_type='additive_gaussian', # optional")
print(" contamination_level=0.2 # 0.0 to 1.0")
print(" )")
if __name__ == "__main__":
main()