Source code for lrdbenchmark.analysis.uncertainty.coverage_analyzer

#!/usr/bin/env python3
"""
Coverage Probability Analyzer for LRDBenchmark.

Provides Monte Carlo estimation of confidence interval coverage probabilities
to assess the reliability of uncertainty quantification methods.
"""

import warnings
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Type

import numpy as np


[docs] @dataclass class CoverageResult: """Results from coverage probability analysis.""" method: str nominal_level: float empirical_coverage: float n_trials: int n_covered: int coverage_error: float standard_error: float calibrated: bool
[docs] def to_dict(self) -> Dict[str, Any]: return { "method": self.method, "nominal_level": self.nominal_level, "empirical_coverage": self.empirical_coverage, "n_trials": self.n_trials, "n_covered": self.n_covered, "coverage_error": self.coverage_error, "standard_error": self.standard_error, "calibrated": self.calibrated, }
[docs] class CoverageAnalyzer: """ Monte Carlo analyzer for confidence interval coverage probabilities. Assesses whether UQ methods produce calibrated confidence intervals by measuring how often the true value falls within the reported CI. Example ------- >>> analyzer = CoverageAnalyzer(n_trials=200) >>> results = analyzer.analyze_estimator_coverage( ... estimator_cls=DFAEstimator, ... data_model_cls=FBMModel, ... true_H=0.7, ... length=1000 ... ) >>> print(f"Coverage: {results['block_bootstrap'].empirical_coverage:.2%}") """
[docs] def __init__( self, n_trials: int = 100, confidence_level: float = 0.95, tolerance: float = 0.05, random_state: Optional[int] = None, ): """ Initialize coverage analyzer. Parameters ---------- n_trials : int Number of Monte Carlo trials confidence_level : float Nominal confidence level tolerance : float Acceptable deviation from nominal coverage (for calibration check) random_state : int, optional Random seed """ self.n_trials = n_trials self.confidence_level = confidence_level self.tolerance = tolerance self.rng = np.random.default_rng(random_state)
[docs] def analyze_estimator_coverage( self, estimator_cls: Type, data_model_cls: Type, true_H: float, length: int, estimator_params: Optional[Dict[str, Any]] = None, data_model_params: Optional[Dict[str, Any]] = None, uq_methods: Optional[List[str]] = None, n_bootstrap: int = 64, ) -> Dict[str, CoverageResult]: """ Analyze coverage probability for an estimator-model combination. Parameters ---------- estimator_cls : Type Estimator class (e.g., DFAEstimator) data_model_cls : Type Data generation model class (e.g., FBMModel) true_H : float True Hurst parameter length : int Length of generated time series estimator_params : dict, optional Parameters for estimator data_model_params : dict, optional Additional parameters for data model uq_methods : list of str, optional UQ methods to test: 'block_bootstrap', 'percentile', 'studentized' n_bootstrap : int Number of bootstrap samples per trial Returns ------- dict Coverage results for each UQ method """ estimator_params = estimator_params or {} data_model_params = data_model_params or {} uq_methods = uq_methods or ["block_bootstrap", "percentile"] # Track coverage for each method coverage_counts = {method: 0 for method in uq_methods} successful_trials = {method: 0 for method in uq_methods} for trial in range(self.n_trials): # Generate data seed = int(self.rng.integers(0, 2**32)) try: model = data_model_cls(H=true_H, **data_model_params) data = model.generate(length=length, seed=seed) except Exception as e: warnings.warn(f"Data generation failed: {e}") continue # Create estimator and get point estimate try: estimator = estimator_cls(**estimator_params) result = estimator.estimate(data) point_estimate = result.get("hurst_parameter") if point_estimate is None or not np.isfinite(point_estimate): continue except Exception as e: continue # Compute confidence intervals for each method for method in uq_methods: try: ci = self._compute_ci( estimator_cls, estimator_params, data, method, n_bootstrap, seed ) if ci is not None: lower, upper = ci if lower <= true_H <= upper: coverage_counts[method] += 1 successful_trials[method] += 1 except Exception: continue # Compute coverage statistics results = {} for method in uq_methods: n_successful = successful_trials[method] n_covered = coverage_counts[method] if n_successful > 0: empirical_coverage = n_covered / n_successful coverage_error = empirical_coverage - self.confidence_level # Standard error of proportion se = np.sqrt( empirical_coverage * (1 - empirical_coverage) / n_successful ) calibrated = abs(coverage_error) <= self.tolerance else: empirical_coverage = 0.0 coverage_error = -self.confidence_level se = 0.0 calibrated = False results[method] = CoverageResult( method=method, nominal_level=self.confidence_level, empirical_coverage=empirical_coverage, n_trials=n_successful, n_covered=n_covered, coverage_error=coverage_error, standard_error=se, calibrated=calibrated, ) return results
[docs] def _compute_ci( self, estimator_cls: Type, estimator_params: Dict[str, Any], data: np.ndarray, method: str, n_bootstrap: int, seed: int, ) -> Optional[tuple]: """Compute confidence interval using specified method.""" rng = np.random.default_rng(seed + hash(method) % 2**16) n = len(data) if n < 32: return None # Block bootstrap block_size = max(8, int(np.sqrt(n))) n_blocks = int(np.ceil(n / block_size)) estimates = [] for _ in range(n_bootstrap): # Resample blocks starts = rng.integers(0, n - block_size + 1, size=n_blocks) blocks = [data[s : s + block_size] for s in starts] resampled = np.concatenate(blocks)[:n] try: estimator = estimator_cls(**estimator_params) result = estimator.estimate(resampled) h_est = result.get("hurst_parameter") if h_est is not None and np.isfinite(h_est): estimates.append(h_est) except Exception: continue if len(estimates) < 10: return None estimates = np.array(estimates) if method == "block_bootstrap" or method == "percentile": # Percentile method alpha = 1 - self.confidence_level lower = np.percentile(estimates, alpha / 2 * 100) upper = np.percentile(estimates, (1 - alpha / 2) * 100) return (lower, upper) elif method == "studentized": # Studentized bootstrap (bias-corrected and accelerated) point_est = np.mean(estimates) se = np.std(estimates, ddof=1) # Use t-distribution critical value from scipy import stats df = len(estimates) - 1 t_crit = stats.t.ppf((1 + self.confidence_level) / 2, df) lower = point_est - t_crit * se upper = point_est + t_crit * se return (lower, upper) return None
[docs] def generate_coverage_report( self, results: Dict[str, CoverageResult] ) -> Dict[str, Any]: """ Generate a summary report from coverage results. Returns ------- dict Summary with calibration status and recommendations """ report = { "methods": {}, "best_method": None, "all_calibrated": True, "recommendations": [], } best_error = float("inf") for method, result in results.items(): report["methods"][method] = result.to_dict() if not result.calibrated: report["all_calibrated"] = False if result.coverage_error < 0: report["recommendations"].append( f"{method}: Coverage too low ({result.empirical_coverage:.1%} vs " f"{self.confidence_level:.1%}). CIs are too narrow." ) else: report["recommendations"].append( f"{method}: Coverage too high ({result.empirical_coverage:.1%} vs " f"{self.confidence_level:.1%}). CIs are too wide." ) if abs(result.coverage_error) < best_error: best_error = abs(result.coverage_error) report["best_method"] = method if report["all_calibrated"]: report["recommendations"].append( "All methods are well-calibrated for this estimator-model combination." ) return report
[docs] def run_comprehensive_coverage_analysis( estimator_cls: Type, data_model_cls: Type, H_values: List[float], length: int = 1000, n_trials: int = 100, seed: int = 42, ) -> Dict[str, Any]: """ Run coverage analysis across multiple H values. Parameters ---------- estimator_cls : Type Estimator class data_model_cls : Type Data model class H_values : list of float H values to test length : int Series length n_trials : int Trials per H value seed : int Random seed Returns ------- dict Comprehensive coverage results """ analyzer = CoverageAnalyzer(n_trials=n_trials, random_state=seed) all_results = {} for H in H_values: results = analyzer.analyze_estimator_coverage( estimator_cls=estimator_cls, data_model_cls=data_model_cls, true_H=H, length=length, ) all_results[H] = { method: result.to_dict() for method, result in results.items() } # Summary statistics summary = { "H_values": H_values, "results_by_H": all_results, "mean_coverage": {}, "overall_calibrated": True, } methods = list(all_results[H_values[0]].keys()) for method in methods: coverages = [all_results[H][method]["empirical_coverage"] for H in H_values] summary["mean_coverage"][method] = np.mean(coverages) # Check if all H values are calibrated for H in H_values: if not all_results[H][method]["calibrated"]: summary["overall_calibrated"] = False return summary