#!/usr/bin/env python3
"""
Comprehensive Diagnostics Module for LRD Estimation
Provides automated log-log checks, residual tests, goodness-of-fit analysis,
and scale-window sensitivity analysis for power-law fits.
"""
import warnings
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from scipy import stats
[docs]
class PowerLawDiagnostics:
"""
Comprehensive diagnostics for power-law fits in LRD estimation.
Includes log-log checks, residual analysis, and goodness-of-fit tests.
"""
[docs]
def __init__(self, min_r_squared: float = 0.5, min_points: int = 6):
"""
Initialise power-law diagnostics.
Parameters
----------
min_r_squared : float
Minimum R² threshold for acceptable fits
min_points : int
Minimum number of scale points required
"""
self.min_r_squared = min_r_squared
self.min_points = min_points
[docs]
def diagnose(
self,
scales: np.ndarray,
statistics: np.ndarray,
slope: Optional[float] = None,
intercept: Optional[float] = None,
) -> Dict[str, Any]:
"""
Run comprehensive diagnostic suite on power-law fit.
Parameters
----------
scales : np.ndarray
Scale values (e.g., frequencies, windows, wavelet scales)
statistics : np.ndarray
Corresponding statistics (e.g., PSD, fluctuation function)
slope : float, optional
Pre-computed slope of log-log fit
intercept : float, optional
Pre-computed intercept of log-log fit
Returns
-------
dict
Comprehensive diagnostic results
"""
scales = np.asarray(scales, dtype=np.float64)
statistics = np.asarray(statistics, dtype=np.float64)
# Validate inputs
valid_mask = (
np.isfinite(scales)
& np.isfinite(statistics)
& (scales > 0)
& (statistics > 0)
)
scales = scales[valid_mask]
statistics = statistics[valid_mask]
if len(scales) < self.min_points:
return {
"status": "insufficient_data",
"reason": f"Need at least {self.min_points} valid scale points, got {len(scales)}",
"n_points": len(scales),
}
# Transform to log-log space
log_scales = np.log2(scales)
log_stats = np.log2(statistics)
# Compute or validate linear fit
if slope is None or intercept is None:
slope, intercept, r_value, p_value, std_err = stats.linregress(
log_scales, log_stats
)
else:
# Recompute correlation metrics with provided fit
fitted_values = slope * log_scales + intercept
residuals = log_stats - fitted_values
ss_res = np.sum(residuals**2)
ss_tot = np.sum((log_stats - np.mean(log_stats)) ** 2)
r_value = np.sqrt(1 - ss_res / ss_tot) if ss_tot > 0 else 0
p_value = None # Would need full regression for p-value
std_err = (
np.sqrt(ss_res / (len(log_scales) - 2))
if len(log_scales) > 2
else np.inf
)
r_squared = r_value**2
# Run diagnostic tests
linearity_check = self._check_linearity(log_scales, log_stats, r_squared)
residual_analysis = self._analyse_residuals(
log_scales, log_stats, slope, intercept
)
goodness_of_fit = self._assess_goodness_of_fit(
log_scales, log_stats, slope, intercept
)
breakpoint_detection = self._detect_breakpoints(log_scales, log_stats)
# Overall assessment
passes_checks = (
linearity_check["passes"]
and residual_analysis["normality"]["passes"]
and not residual_analysis["autocorrelation"]["significant"]
and r_squared >= self.min_r_squared
)
return {
"status": "ok",
"n_points": len(scales),
"log_log_fit": {
"slope": float(slope),
"intercept": float(intercept),
"r_squared": float(r_squared),
"r_value": float(r_value),
"p_value": float(p_value) if p_value is not None else None,
"std_err": float(std_err),
},
"linearity_check": linearity_check,
"residual_analysis": residual_analysis,
"goodness_of_fit": goodness_of_fit,
"breakpoint_detection": breakpoint_detection,
"overall_assessment": {
"passes_diagnostics": passes_checks,
"quality_score": self._compute_quality_score(
linearity_check, residual_analysis, r_squared
),
"warnings": self._generate_warnings(
linearity_check, residual_analysis, breakpoint_detection
),
},
}
[docs]
def _check_linearity(
self, log_scales: np.ndarray, log_stats: np.ndarray, r_squared: float
) -> Dict[str, Any]:
"""Check linearity of log-log relationship."""
passes = r_squared >= self.min_r_squared
# Additional linearity checks
# Runs test for randomness of residuals
slope, intercept = np.polyfit(log_scales, log_stats, 1)
residuals = log_stats - (slope * log_scales + intercept)
# Count runs (sequences of same sign)
signs = np.sign(residuals)
runs = np.sum(signs[:-1] != signs[1:]) + 1
# Expected runs under randomness
n_pos = np.sum(signs > 0)
n_neg = np.sum(signs < 0)
n = len(signs)
if n_pos > 0 and n_neg > 0:
expected_runs = (2 * n_pos * n_neg) / n + 1
var_runs = (2 * n_pos * n_neg * (2 * n_pos * n_neg - n)) / (
n**2 * (n - 1)
)
z_runs = (runs - expected_runs) / np.sqrt(var_runs) if var_runs > 0 else 0
runs_test_p_value = 2 * (1 - stats.norm.cdf(abs(z_runs)))
else:
runs_test_p_value = None
z_runs = None
return {
"passes": passes,
"r_squared": float(r_squared),
"meets_threshold": passes,
"runs_test": {
"n_runs": int(runs),
"z_statistic": float(z_runs) if z_runs is not None else None,
"p_value": float(runs_test_p_value)
if runs_test_p_value is not None
else None,
"random_residuals": runs_test_p_value > 0.05
if runs_test_p_value is not None
else None,
},
}
[docs]
def _analyse_residuals(
self,
log_scales: np.ndarray,
log_stats: np.ndarray,
slope: float,
intercept: float,
) -> Dict[str, Any]:
"""Comprehensive residual analysis."""
# Compute residuals
fitted_values = slope * log_scales + intercept
residuals = log_stats - fitted_values
standardised_residuals = (residuals - np.mean(residuals)) / np.std(residuals)
# Normality test (Shapiro-Wilk)
if len(residuals) >= 3:
try:
shapiro_stat, shapiro_p = stats.shapiro(residuals)
normality_passes = shapiro_p > 0.05
except Exception:
shapiro_stat, shapiro_p = None, None
normality_passes = None
else:
shapiro_stat, shapiro_p = None, None
normality_passes = None
# Autocorrelation test (Ljung-Box)
n_lags = min(10, len(residuals) // 3)
if n_lags >= 1:
try:
acf_values = self._compute_acf(residuals, n_lags)
# Simplified Ljung-Box statistic
lb_stat = (
len(residuals)
* (len(residuals) + 2)
* np.sum(
acf_values[1:] ** 2
/ (len(residuals) - np.arange(1, len(acf_values)))
)
)
lb_p_value = 1 - stats.chi2.cdf(lb_stat, n_lags)
autocorr_significant = lb_p_value < 0.05
except Exception:
acf_values = None
lb_stat, lb_p_value = None, None
autocorr_significant = None
else:
acf_values = None
lb_stat, lb_p_value = None, None
autocorr_significant = None
# Heteroscedasticity test (Breusch-Pagan)
try:
# Regress squared residuals on log_scales
if len(residuals) >= 3:
bp_slope, bp_intercept = np.polyfit(log_scales, residuals**2, 1)
fitted_sq_resid = bp_slope * log_scales + bp_intercept
ss_res = np.sum((residuals**2 - fitted_sq_resid) ** 2)
ss_tot = np.sum((residuals**2 - np.mean(residuals**2)) ** 2)
bp_r_squared = 1 - ss_res / ss_tot if ss_tot > 0 else 0
bp_stat = len(residuals) * bp_r_squared
bp_p_value = 1 - stats.chi2.cdf(bp_stat, 1)
heteroscedasticity = bp_p_value < 0.05
else:
bp_stat, bp_p_value, heteroscedasticity = None, None, None
except Exception:
bp_stat, bp_p_value, heteroscedasticity = None, None, None
return {
"normality": {
"shapiro_statistic": float(shapiro_stat)
if shapiro_stat is not None
else None,
"shapiro_p_value": float(shapiro_p) if shapiro_p is not None else None,
"passes": normality_passes,
"interpretation": "Residuals appear normally distributed"
if normality_passes
else "Residuals may not be normally distributed",
},
"autocorrelation": {
"ljung_box_statistic": float(lb_stat) if lb_stat is not None else None,
"ljung_box_p_value": float(lb_p_value)
if lb_p_value is not None
else None,
"n_lags": int(n_lags) if n_lags >= 1 else None,
"significant": autocorr_significant,
"interpretation": "No significant autocorrelation"
if not autocorr_significant
else "Residuals show autocorrelation",
},
"heteroscedasticity": {
"breusch_pagan_statistic": float(bp_stat)
if bp_stat is not None
else None,
"breusch_pagan_p_value": float(bp_p_value)
if bp_p_value is not None
else None,
"present": heteroscedasticity,
"interpretation": "Homoscedastic residuals"
if not heteroscedasticity
else "Heteroscedastic residuals detected",
},
"statistics": {
"mean": float(np.mean(residuals)),
"std": float(np.std(residuals)),
"min": float(np.min(residuals)),
"max": float(np.max(residuals)),
"skewness": float(stats.skew(residuals))
if len(residuals) >= 3
else None,
"kurtosis": float(stats.kurtosis(residuals))
if len(residuals) >= 3
else None,
},
}
[docs]
def _compute_acf(self, data: np.ndarray, n_lags: int) -> np.ndarray:
"""Compute autocorrelation function."""
data = data - np.mean(data)
acf = np.correlate(data, data, mode="full")
acf = acf[len(acf) // 2 :]
acf = acf / acf[0]
return acf[: n_lags + 1]
[docs]
def _assess_goodness_of_fit(
self,
log_scales: np.ndarray,
log_stats: np.ndarray,
slope: float,
intercept: float,
) -> Dict[str, Any]:
"""Assess goodness-of-fit using multiple criteria."""
n = len(log_scales)
k = 2 # Number of parameters (slope and intercept)
# Compute residuals and metrics
fitted_values = slope * log_scales + intercept
residuals = log_stats - fitted_values
rss = np.sum(residuals**2)
tss = np.sum((log_stats - np.mean(log_stats)) ** 2)
r_squared = 1 - rss / tss if tss > 0 else 0
adjusted_r_squared = (
1 - (1 - r_squared) * (n - 1) / (n - k) if n > k else r_squared
)
# AIC and BIC
log_likelihood = -n / 2 * np.log(2 * np.pi * rss / n) - n / 2
aic = 2 * k - 2 * log_likelihood
bic = k * np.log(n) - 2 * log_likelihood
# Mean absolute error in log-log space
mae = np.mean(np.abs(residuals))
return {
"r_squared": float(r_squared),
"adjusted_r_squared": float(adjusted_r_squared),
"aic": float(aic),
"bic": float(bic),
"mae_log_space": float(mae),
"rmse_log_space": float(np.sqrt(rss / n)),
"interpretation": {
"aic_interpretation": "Lower AIC indicates better fit",
"bic_interpretation": "Lower BIC indicates better fit with parsimony",
"adjusted_r_squared_interpretation": f"Explains {adjusted_r_squared * 100:.1f}% of variance (adjusted)",
},
}
[docs]
def _detect_breakpoints(
self, log_scales: np.ndarray, log_stats: np.ndarray
) -> Dict[str, Any]:
"""Detect potential breakpoints in log-log relationship."""
n_points = len(log_scales)
if n_points < 6:
return {
"status": "insufficient_data",
"reason": "Need at least 6 points for breakpoint detection",
}
best_score = np.inf
best_breakpoint: Optional[Dict[str, Any]] = None
# Try all possible breakpoints
for split_idx in range(2, n_points - 2):
left_x = log_scales[:split_idx]
left_y = log_stats[:split_idx]
right_x = log_scales[split_idx:]
right_y = log_stats[split_idx:]
try:
left_fit = stats.linregress(left_x, left_y)
right_fit = stats.linregress(right_x, right_y)
except Exception:
continue
# Compute residual sum of squares for two-segment fit
left_resid = left_y - (left_fit.intercept + left_fit.slope * left_x)
right_resid = right_y - (right_fit.intercept + right_fit.slope * right_x)
rss_two_segment = np.sum(left_resid**2) + np.sum(right_resid**2)
if rss_two_segment < best_score:
best_score = rss_two_segment
best_breakpoint = {
"break_index": int(split_idx),
"break_scale": float(2 ** log_scales[split_idx]),
"left_slope": float(left_fit.slope),
"right_slope": float(right_fit.slope),
"slope_difference": float(abs(left_fit.slope - right_fit.slope)),
"left_r_squared": float(left_fit.rvalue**2),
"right_r_squared": float(right_fit.rvalue**2),
"rss": float(rss_two_segment),
}
# Compare with single-segment fit
single_fit = stats.linregress(log_scales, log_stats)
single_resid = log_stats - (
single_fit.intercept + single_fit.slope * log_scales
)
rss_single = np.sum(single_resid**2)
# F-test for improvement
if best_breakpoint is not None:
f_stat = ((rss_single - best_score) / 2) / (best_score / (n_points - 4))
f_p_value = 1 - stats.f.cdf(f_stat, 2, n_points - 4)
breakpoint_significant = f_p_value < 0.05
return {
"status": "ok",
"breakpoint_detected": breakpoint_significant,
"best_breakpoint": best_breakpoint if breakpoint_significant else None,
"f_statistic": float(f_stat),
"f_p_value": float(f_p_value),
"rss_single_segment": float(rss_single),
"rss_two_segment": float(best_score),
"improvement_ratio": float(rss_single / best_score)
if best_score > 0
else None,
}
else:
return {
"status": "ok",
"breakpoint_detected": False,
"reason": "No suitable breakpoint found",
}
[docs]
def _compute_quality_score(
self,
linearity_check: Dict[str, Any],
residual_analysis: Dict[str, Any],
r_squared: float,
) -> float:
"""Compute overall quality score (0-1 scale)."""
score = 0.0
weights = 0.0
# R² contribution (40% weight)
if r_squared is not None:
score += 0.4 * min(1.0, r_squared / 0.9)
weights += 0.4
# Normality contribution (20% weight)
if residual_analysis["normality"]["passes"] is not None:
score += 0.2 if residual_analysis["normality"]["passes"] else 0.0
weights += 0.2
# No autocorrelation contribution (20% weight)
if residual_analysis["autocorrelation"]["significant"] is not None:
score += (
0.2 if not residual_analysis["autocorrelation"]["significant"] else 0.0
)
weights += 0.2
# Homoscedasticity contribution (20% weight)
if residual_analysis["heteroscedasticity"]["present"] is not None:
score += (
0.2 if not residual_analysis["heteroscedasticity"]["present"] else 0.0
)
weights += 0.2
return score / weights if weights > 0 else 0.0
[docs]
def _generate_warnings(
self,
linearity_check: Dict[str, Any],
residual_analysis: Dict[str, Any],
breakpoint_detection: Dict[str, Any],
) -> List[str]:
"""Generate human-readable warnings based on diagnostics."""
warnings_list = []
if not linearity_check.get("passes", False):
warnings_list.append(
f"Low R² ({linearity_check.get('r_squared', 0):.3f}) indicates poor log-log linearity"
)
if residual_analysis["normality"].get("passes") is False:
warnings_list.append("Residuals deviate from normality")
if residual_analysis["autocorrelation"].get("significant"):
warnings_list.append("Significant residual autocorrelation detected")
if residual_analysis["heteroscedasticity"].get("present"):
warnings_list.append("Heteroscedastic residuals detected")
if breakpoint_detection.get("breakpoint_detected"):
bp = breakpoint_detection.get("best_breakpoint", {})
warnings_list.append(
f"Significant breakpoint at scale {bp.get('break_scale', 'unknown'):.2f}"
)
return warnings_list
[docs]
class ScaleWindowSensitivityAnalyser:
"""
Analyse sensitivity of H estimates to scale window selection.
"""
[docs]
def __init__(
self,
perturbation_levels: Optional[List[float]] = None,
leave_one_out: bool = True,
):
"""
Initialise scale window sensitivity analyser.
Parameters
----------
perturbation_levels : list of float, optional
Multiplicative perturbation factors to apply to scale bounds
leave_one_out : bool
Whether to perform leave-one-out analysis
"""
self.perturbation_levels = perturbation_levels or [0.9, 0.95, 1.05, 1.1]
self.leave_one_out = leave_one_out
[docs]
def analyse(
self,
estimator,
data: np.ndarray,
base_result: Dict[str, Any],
original_scales: Optional[np.ndarray] = None,
) -> Dict[str, Any]:
"""
Analyse sensitivity to scale window perturbations.
Parameters
----------
estimator : BaseEstimator
Estimator instance to test
data : np.ndarray
Input data
base_result : dict
Baseline estimation result
original_scales : np.ndarray, optional
Original scale values used
Returns
-------
dict
Sensitivity analysis results
"""
base_h = base_result.get("hurst_parameter")
if base_h is None:
return {
"status": "unavailable",
"reason": "No baseline Hurst parameter available",
}
perturbation_results = []
leave_one_out_results = []
# Perturbation analysis
for factor in self.perturbation_levels:
try:
# Attempt to modify estimator parameters
perturbed_result = self._perturb_and_estimate(estimator, data, factor)
if perturbed_result.get("hurst_parameter") is not None:
delta_h = perturbed_result["hurst_parameter"] - base_h
perturbation_results.append(
{
"factor": float(factor),
"h_estimate": float(perturbed_result["hurst_parameter"]),
"delta_h": float(delta_h),
"success": True,
}
)
else:
perturbation_results.append(
{
"factor": float(factor),
"success": False,
"reason": "Estimation failed",
}
)
except Exception as e:
perturbation_results.append(
{"factor": float(factor), "success": False, "reason": str(e)}
)
# Leave-one-out analysis
if self.leave_one_out and original_scales is not None:
leave_one_out_results = self._leave_one_out_analysis(
estimator, data, base_h, original_scales
)
# Summary statistics
successful_perturbations = [
r for r in perturbation_results if r.get("success", False)
]
if successful_perturbations:
delta_h_values = [r["delta_h"] for r in successful_perturbations]
sensitivity_summary = {
"mean_abs_delta": float(np.mean(np.abs(delta_h_values))),
"max_abs_delta": float(np.max(np.abs(delta_h_values))),
"std_delta": float(np.std(delta_h_values)),
"n_successful": len(successful_perturbations),
"n_total": len(perturbation_results),
}
else:
sensitivity_summary = {
"mean_abs_delta": None,
"max_abs_delta": None,
"std_delta": None,
"n_successful": 0,
"n_total": len(perturbation_results),
}
return {
"status": "ok",
"base_h": float(base_h),
"perturbation_results": perturbation_results,
"leave_one_out_results": leave_one_out_results,
"sensitivity_summary": sensitivity_summary,
"interpretation": self._interpret_sensitivity(sensitivity_summary),
}
[docs]
def _perturb_and_estimate(
self, estimator, data: np.ndarray, factor: float
) -> Dict[str, Any]:
"""Apply perturbation and re-estimate."""
# Try to modify scale parameters if estimator supports it
# This is a simplified approach; specific estimators may need custom handling
# Attempt to extract and modify scale parameters
params = {}
# Common parameter names across estimators
param_names = [
"min_window",
"max_window",
"min_freq_ratio",
"max_freq_ratio",
"min_level",
"max_level",
]
for param_name in param_names:
if hasattr(estimator, param_name):
original_value = getattr(estimator, param_name)
if (
isinstance(original_value, (int, float))
and original_value is not None
):
params[param_name] = original_value
# Apply perturbation
if params:
# Store original values
original_params = params.copy()
# Modify parameters
for param_name, value in params.items():
if isinstance(value, int):
setattr(estimator, param_name, int(value * factor))
else:
setattr(estimator, param_name, value * factor)
try:
# Re-estimate
result = estimator.estimate(data)
finally:
# Restore original parameters
for param_name, value in original_params.items():
setattr(estimator, param_name, value)
return result
else:
# If we can't perturb, just return the original estimate
return estimator.estimate(data)
[docs]
def _leave_one_out_analysis(
self, estimator, data: np.ndarray, base_h: float, scales: np.ndarray
) -> List[Dict[str, Any]]:
"""
Leave-one-out influence analysis.
Note: This is a placeholder; full implementation would require
access to internal scale computation in each estimator.
"""
# This is a simplified version
# Full implementation would need estimator-specific logic
return []
[docs]
def _interpret_sensitivity(self, summary: Dict[str, Any]) -> str:
"""Generate interpretation of sensitivity results."""
max_delta = summary.get("max_abs_delta")
if max_delta is None:
return "Sensitivity analysis unavailable"
if max_delta < 0.05:
return "Low sensitivity: estimates are robust to scale window perturbations"
elif max_delta < 0.1:
return "Moderate sensitivity: estimates show some variability with scale window changes"
else:
return "High sensitivity: estimates are highly sensitive to scale window selection"
[docs]
def run_comprehensive_diagnostics(
scales: np.ndarray,
statistics: np.ndarray,
estimator=None,
data: Optional[np.ndarray] = None,
config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Run complete diagnostic suite on power-law fit.
Parameters
----------
scales : np.ndarray
Scale values
statistics : np.ndarray
Corresponding statistics
estimator : BaseEstimator, optional
Estimator instance for sensitivity analysis
data : np.ndarray, optional
Original data for sensitivity analysis
config : dict, optional
Configuration dict with diagnostic settings
Returns
-------
dict
Complete diagnostic results
"""
if config is None:
config = {}
diagnostics_cfg = config.get("diagnostics", {})
# Power-law diagnostics
log_log_cfg = diagnostics_cfg.get("log_log_checks", {})
power_law_diag = PowerLawDiagnostics(
min_r_squared=log_log_cfg.get("min_r_squared", 0.5),
min_points=log_log_cfg.get("min_points", 6),
)
power_law_results = power_law_diag.diagnose(scales, statistics)
# Scale window sensitivity (if estimator and data provided)
sensitivity_results = {}
if estimator is not None and data is not None:
sensitivity_cfg = diagnostics_cfg.get("scale_window_sensitivity", {})
if sensitivity_cfg.get("enabled", True):
sensitivity_analyser = ScaleWindowSensitivityAnalyser(
perturbation_levels=sensitivity_cfg.get("perturbation_levels"),
leave_one_out=sensitivity_cfg.get("leave_one_out", True),
)
# Need to get base result from estimator
try:
base_result = {
"hurst_parameter": estimator.results.get("hurst_parameter")
}
sensitivity_results = sensitivity_analyser.analyse(
estimator, data, base_result, scales
)
except Exception as e:
sensitivity_results = {"status": "failed", "error": str(e)}
return {
"power_law_diagnostics": power_law_results,
"scale_window_sensitivity": sensitivity_results
if sensitivity_results
else {"status": "not_performed"},
}
[docs]
class StructuralBreakDetector:
"""
Detector for structural breaks in time series.
Implements multiple tests for detecting change points that would invalidate
the stationarity assumptions required by classical LRD estimators.
Methods
-------
cusum_test : Standard CUSUM test for mean shifts
recursive_cusum : Sequential/online CUSUM detection
chow_test : Test for known break point
icss_algorithm : Iterative Cumulative Sum of Squares for variance changes
detect_all : Run all tests and return comprehensive results
"""
[docs]
def __init__(self, significance_level: float = 0.05, min_segment_length: int = 50):
"""
Initialize structural break detector.
Parameters
----------
significance_level : float
Significance level for hypothesis tests
min_segment_length : int
Minimum segment length for break detection
"""
self.significance_level = significance_level
self.min_segment_length = min_segment_length
[docs]
def cusum_test(self, data: np.ndarray, normalize: bool = True) -> Dict[str, Any]:
"""
Standard CUSUM test for detecting mean shifts.
The CUSUM (Cumulative Sum) test detects changes in the mean of a
time series by monitoring cumulative deviations from the sample mean.
Parameters
----------
data : np.ndarray
Input time series
normalize : bool
Whether to normalize the CUSUM statistic
Returns
-------
dict
Test results including:
- statistic: Maximum absolute CUSUM value
- critical_value: Critical value at significance level
- break_detected: Whether a break was detected
- break_index: Estimated break location (if detected)
- cusum_path: Full CUSUM path for visualization
"""
n = len(data)
if n < 2 * self.min_segment_length:
return {
"status": "insufficient_data",
"reason": f"Need at least {2 * self.min_segment_length} points",
}
# Compute CUSUM
mean_val = np.mean(data)
cumsum = np.cumsum(data - mean_val)
if normalize:
std_val = np.std(data)
if std_val > 0:
cumsum = cumsum / (std_val * np.sqrt(n))
# Test statistic: maximum absolute deviation
statistic = np.max(np.abs(cumsum))
break_index = np.argmax(np.abs(cumsum))
# Critical value (Brownian bridge approximation)
# Using Andrew (1993) critical values for significance level
critical_values = {0.01: 1.63, 0.05: 1.36, 0.10: 1.22}
critical_value = critical_values.get(
self.significance_level, 1.36 # Default to 0.05
)
break_detected = statistic > critical_value
return {
"status": "ok",
"test_name": "CUSUM",
"statistic": float(statistic),
"critical_value": float(critical_value),
"break_detected": break_detected,
"break_index": int(break_index) if break_detected else None,
"break_position": float(break_index / n) if break_detected else None,
"p_value_approx": self._cusum_pvalue(statistic),
"cusum_path": cumsum.tolist(),
}
[docs]
def _cusum_pvalue(self, statistic: float) -> float:
"""Approximate p-value for CUSUM statistic."""
# Kolmogorov distribution approximation
if statistic <= 0:
return 1.0
# Using asymptotic formula
k = statistic
p_value = 2 * np.sum(
[(-1) ** (j - 1) * np.exp(-2 * j**2 * k**2) for j in range(1, 101)]
)
return max(0, min(1, 1 - p_value))
[docs]
def recursive_cusum(
self, data: np.ndarray, window_size: Optional[int] = None
) -> Dict[str, Any]:
"""
Recursive CUSUM for sequential/online break detection.
Monitors the process sequentially and detects when the cumulative
sum exceeds threshold, suitable for online monitoring.
Parameters
----------
data : np.ndarray
Input time series
window_size : int, optional
Initial window for establishing baseline
Returns
-------
dict
Detection results including break points and timing
"""
n = len(data)
if window_size is None:
window_size = min(100, n // 4)
if n < window_size + self.min_segment_length:
return {
"status": "insufficient_data",
"reason": f"Need at least {window_size + self.min_segment_length} points",
}
# Establish baseline from initial window
baseline_mean = np.mean(data[:window_size])
baseline_std = np.std(data[:window_size])
if baseline_std <= 0:
baseline_std = 1.0
# Compute recursive residuals and CUSUM
cusum_plus = np.zeros(n - window_size)
cusum_minus = np.zeros(n - window_size)
# Control limit (typically 4-5 for detecting 1-sigma shifts)
control_limit = 5.0
slack = 0.5 # Allowance parameter
breaks_detected = []
for i in range(window_size, n):
idx = i - window_size
z = (data[i] - baseline_mean) / baseline_std
# Two-sided CUSUM
cusum_plus[idx] = (
max(0, cusum_plus[idx - 1] + z - slack)
if idx > 0
else max(0, z - slack)
)
cusum_minus[idx] = (
max(0, cusum_minus[idx - 1] - z - slack)
if idx > 0
else max(0, -z - slack)
)
# Check for break
if cusum_plus[idx] > control_limit or cusum_minus[idx] > control_limit:
breaks_detected.append(i)
# Reset after detection
cusum_plus[idx] = 0
cusum_minus[idx] = 0
return {
"status": "ok",
"test_name": "Recursive CUSUM",
"breaks_detected": len(breaks_detected) > 0,
"n_breaks": len(breaks_detected),
"break_indices": breaks_detected,
"break_positions": [b / n for b in breaks_detected],
"control_limit": control_limit,
"cusum_plus": cusum_plus.tolist(),
"cusum_minus": cusum_minus.tolist(),
}
[docs]
def chow_test(
self, data: np.ndarray, break_index: Optional[int] = None
) -> Dict[str, Any]:
"""
Chow test for structural break at a known or estimated point.
Tests whether regression coefficients (here, just the mean) differ
before and after a specified break point.
Parameters
----------
data : np.ndarray
Input time series
break_index : int, optional
Index of hypothesized break. If None, tests at midpoint.
Returns
-------
dict
Test results including F-statistic and p-value
"""
n = len(data)
if break_index is None:
break_index = n // 2
if (
break_index < self.min_segment_length
or n - break_index < self.min_segment_length
):
return {
"status": "invalid_break_point",
"reason": "Break point too close to boundary",
}
# Split data
data_before = data[:break_index]
data_after = data[break_index:]
n1, n2 = len(data_before), len(data_after)
# Compute means and variances
mean_before = np.mean(data_before)
mean_after = np.mean(data_after)
mean_pooled = np.mean(data)
# Sum of squared residuals
ssr_unrestricted = np.sum((data_before - mean_before) ** 2) + np.sum(
(data_after - mean_after) ** 2
)
ssr_restricted = np.sum((data - mean_pooled) ** 2)
# F-statistic
# k = 1 (testing difference in means)
k = 1
if ssr_unrestricted > 0:
f_statistic = ((ssr_restricted - ssr_unrestricted) / k) / (
ssr_unrestricted / (n - 2 * k)
)
else:
f_statistic = 0.0
# P-value from F-distribution
p_value = 1 - stats.f.cdf(f_statistic, k, n - 2 * k)
break_detected = p_value < self.significance_level
return {
"status": "ok",
"test_name": "Chow Test",
"break_index": int(break_index),
"break_position": float(break_index / n),
"f_statistic": float(f_statistic),
"p_value": float(p_value),
"break_detected": break_detected,
"mean_before": float(mean_before),
"mean_after": float(mean_after),
"mean_difference": float(mean_after - mean_before),
}
[docs]
def icss_algorithm(
self, data: np.ndarray, max_iterations: int = 100
) -> Dict[str, Any]:
"""
Iterative Cumulative Sum of Squares (ICSS) algorithm.
Detects multiple variance change points using the algorithm of
Inclán and Tiao (1994).
Parameters
----------
data : np.ndarray
Input time series
max_iterations : int
Maximum iterations for refinement
Returns
-------
dict
Detected variance change points
"""
n = len(data)
if n < 2 * self.min_segment_length:
return {
"status": "insufficient_data",
"reason": f"Need at least {2 * self.min_segment_length} points",
}
# Center the data
data_centered = data - np.mean(data)
# Compute cumulative sum of squares
css = np.cumsum(data_centered**2)
total_ss = css[-1]
if total_ss <= 0:
return {"status": "constant_variance", "break_detected": False}
# Compute D_k statistic (normalized deviation from linear growth)
k = np.arange(1, n + 1)
expected_ss = (k / n) * total_ss
d_k = (css - expected_ss) / np.sqrt(total_ss)
# Find maximum absolute deviation
max_d = np.max(np.abs(d_k))
break_index = np.argmax(np.abs(d_k))
# Critical value (asymptotic distribution)
# Using sqrt(2 * log(log(n))) * 1.358 as approximation
if n > 10:
critical_value = np.sqrt(2 * np.log(np.log(n))) * 1.358
else:
critical_value = 1.36
# Simple single-break detection
break_detected = max_d > critical_value
# For multiple breaks, iterate (simplified version)
break_points = []
if break_detected:
break_points.append(int(break_index))
return {
"status": "ok",
"test_name": "ICSS",
"statistic": float(max_d),
"critical_value": float(critical_value),
"break_detected": break_detected,
"n_breaks": len(break_points),
"break_indices": break_points,
"break_positions": [bp / n for bp in break_points],
"d_k_path": d_k.tolist(),
}
[docs]
def detect_all(
self, data: np.ndarray, include_paths: bool = False
) -> Dict[str, Any]:
"""
Run all structural break tests and return comprehensive results.
Parameters
----------
data : np.ndarray
Input time series
include_paths : bool
Whether to include full paths (can be large)
Returns
-------
dict
Comprehensive break detection results with warnings
"""
data = np.asarray(data, dtype=np.float64)
# Run all tests
cusum_result = self.cusum_test(data)
recursive_result = self.recursive_cusum(data)
chow_result = self.chow_test(data)
icss_result = self.icss_algorithm(data)
# Remove paths if not requested
if not include_paths:
for result in [cusum_result, recursive_result, icss_result]:
for key in ["cusum_path", "cusum_plus", "cusum_minus", "d_k_path"]:
result.pop(key, None)
# Summary
any_break_detected = (
cusum_result.get("break_detected", False)
or recursive_result.get("breaks_detected", False)
or chow_result.get("break_detected", False)
or icss_result.get("break_detected", False)
)
# Generate warnings
warnings_list = []
if cusum_result.get("break_detected"):
warnings_list.append(
f"CUSUM detected mean shift at position {cusum_result.get('break_position', 0):.2%}"
)
if recursive_result.get("n_breaks", 0) > 0:
warnings_list.append(
f"Recursive CUSUM detected {recursive_result['n_breaks']} break(s)"
)
if chow_result.get("break_detected"):
warnings_list.append(
f"Chow test detected break (p={chow_result.get('p_value', 1):.4f})"
)
if icss_result.get("break_detected"):
warnings_list.append(
f"ICSS detected {icss_result.get('n_breaks', 0)} variance change point(s)"
)
if any_break_detected:
warnings_list.insert(
0,
"⚠️ STATIONARITY WARNING: Structural breaks detected. "
"Classical LRD estimator results may be unreliable.",
)
return {
"status": "ok",
"any_break_detected": any_break_detected,
"stationarity_valid": not any_break_detected,
"cusum": cusum_result,
"recursive_cusum": recursive_result,
"chow": chow_result,
"icss": icss_result,
"warnings": warnings_list,
"recommendation": (
"Consider segmented analysis or nonstationary methods"
if any_break_detected
else "Data appears stationary; proceed with classical estimation"
),
}