Source code for lrdbenchmark.models.contamination.contamination_factory

#!/usr/bin/env python3
"""
Advanced Contamination Factory for Realistic Data Confounding

This module provides a sophisticated contamination factory that creates realistic
confounding profiles based on real-world scenarios encountered in time series analysis.
It goes beyond simple noise addition to simulate complex, realistic data artifacts.

Real-world scenarios include:
1. Financial time series (market crashes, volatility clustering, regime changes)
2. Physiological signals (sensor drift, motion artifacts, equipment failures)
3. Environmental data (seasonal effects, extreme events, measurement drift)
4. Network traffic (bursts, congestion, equipment failures)
5. Industrial sensors (calibration drift, sensor aging, environmental interference)
"""

import random
import warnings
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from scipy import signal, stats


[docs] class ConfoundingScenario(Enum): """Real-world confounding scenarios.""" # Financial scenarios FINANCIAL_CRASH = "financial_crash" FINANCIAL_VOLATILITY_CLUSTERING = "financial_volatility_clustering" FINANCIAL_REGIME_CHANGE = "financial_regime_change" # Physiological scenarios PHYSIOLOGICAL_SENSOR_DRIFT = "physiological_sensor_drift" PHYSIOLOGICAL_MOTION_ARTIFACTS = "physiological_motion_artifacts" PHYSIOLOGICAL_EQUIPMENT_FAILURE = "physiological_equipment_failure" # Environmental scenarios ENVIRONMENTAL_SEASONAL = "environmental_seasonal" ENVIRONMENTAL_EXTREME_EVENTS = "environmental_extreme_events" ENVIRONMENTAL_MEASUREMENT_DRIFT = "environmental_measurement_drift" # Network scenarios NETWORK_BURSTS = "network_bursts" NETWORK_CONGESTION = "network_congestion" NETWORK_EQUIPMENT_FAILURE = "network_equipment_failure" # Industrial scenarios INDUSTRIAL_CALIBRATION_DRIFT = "industrial_calibration_drift" INDUSTRIAL_SENSOR_AGING = "industrial_sensor_aging" INDUSTRIAL_ENVIRONMENTAL_INTERFERENCE = "industrial_environmental_interference" # EEG scenarios EEG_OCULAR_ARTIFACTS = "eeg_ocular_artifacts" EEG_MUSCLE_ARTIFACTS = "eeg_muscle_artifacts" EEG_CARDIAC_ARTIFACTS = "eeg_cardiac_artifacts" EEG_ELECTRODE_POPPING = "eeg_electrode_popping" EEG_ELECTRODE_DRIFT = "eeg_electrode_drift" EEG_60HZ_NOISE = "eeg_60hz_noise" EEG_SWEAT_ARTIFACTS = "eeg_sweat_artifacts" EEG_MOVEMENT_ARTIFACTS = "eeg_movement_artifacts" # Mixed realistic scenarios MIXED_REALISTIC_LIGHT = "mixed_realistic_light" MIXED_REALISTIC_MODERATE = "mixed_realistic_moderate" MIXED_REALISTIC_SEVERE = "mixed_realistic_severe"
[docs] @dataclass class ConfoundingProfile: """Configuration for a specific confounding profile.""" scenario: ConfoundingScenario intensity: float # 0.0 to 1.0 parameters: Dict[str, float] description: str def __hash__(self): return hash( ( self.scenario, self.intensity, tuple(sorted(self.parameters.items())), self.description, ) )
[docs] class ContaminationFactory: """ Advanced contamination factory that creates realistic confounding profiles. This factory generates complex, realistic data artifacts that mimic real-world scenarios encountered in time series analysis across various domains. """
[docs] def __init__(self, random_seed: Optional[int] = None): """ Initialize the contamination factory. Parameters ---------- random_seed : int, optional Random seed for reproducible results """ if random_seed is not None: np.random.seed(random_seed) random.seed(random_seed) self.profiles = self._initialize_profiles()
[docs] def _initialize_profiles(self) -> Dict[ConfoundingScenario, ConfoundingProfile]: """Initialize predefined confounding profiles.""" profiles = {} # Financial scenarios profiles[ConfoundingScenario.FINANCIAL_CRASH] = ConfoundingProfile( scenario=ConfoundingScenario.FINANCIAL_CRASH, intensity=0.8, parameters={ "crash_probability": 0.05, "crash_magnitude": 3.0, "recovery_time": 0.3, "volatility_increase": 2.0, }, description="Simulates market crashes with increased volatility and gradual recovery", ) profiles[ ConfoundingScenario.FINANCIAL_VOLATILITY_CLUSTERING ] = ConfoundingProfile( scenario=ConfoundingScenario.FINANCIAL_VOLATILITY_CLUSTERING, intensity=0.6, parameters={ "cluster_duration": 0.2, "volatility_multiplier": 1.5, "cluster_probability": 0.1, }, description="Simulates volatility clustering common in financial time series", ) profiles[ConfoundingScenario.FINANCIAL_REGIME_CHANGE] = ConfoundingProfile( scenario=ConfoundingScenario.FINANCIAL_REGIME_CHANGE, intensity=0.7, parameters={ "regime_change_probability": 0.02, "mean_shift": 0.5, "volatility_shift": 0.3, "transition_smoothness": 0.1, }, description="Simulates regime changes in financial markets", ) # Physiological scenarios profiles[ConfoundingScenario.PHYSIOLOGICAL_SENSOR_DRIFT] = ConfoundingProfile( scenario=ConfoundingScenario.PHYSIOLOGICAL_SENSOR_DRIFT, intensity=0.4, parameters={ "drift_rate": 0.001, "drift_variability": 0.1, "calibration_events": 0.05, }, description="Simulates gradual sensor drift with occasional recalibration", ) profiles[ ConfoundingScenario.PHYSIOLOGICAL_MOTION_ARTIFACTS ] = ConfoundingProfile( scenario=ConfoundingScenario.PHYSIOLOGICAL_MOTION_ARTIFACTS, intensity=0.6, parameters={ "motion_probability": 0.1, "motion_duration": 0.05, "motion_amplitude": 2.0, "motion_frequency": 0.3, }, description="Simulates motion artifacts in physiological recordings", ) profiles[ ConfoundingScenario.PHYSIOLOGICAL_EQUIPMENT_FAILURE ] = ConfoundingProfile( scenario=ConfoundingScenario.PHYSIOLOGICAL_EQUIPMENT_FAILURE, intensity=0.8, parameters={ "failure_probability": 0.01, "failure_duration": 0.1, "failure_magnitude": 5.0, "recovery_time": 0.2, }, description="Simulates equipment failures with temporary signal loss", ) # Environmental scenarios profiles[ConfoundingScenario.ENVIRONMENTAL_SEASONAL] = ConfoundingProfile( scenario=ConfoundingScenario.ENVIRONMENTAL_SEASONAL, intensity=0.5, parameters={ "seasonal_amplitude": 0.3, "seasonal_period": 0.25, "trend_component": 0.1, "noise_level": 0.05, }, description="Simulates seasonal effects with trend and noise", ) profiles[ConfoundingScenario.ENVIRONMENTAL_EXTREME_EVENTS] = ConfoundingProfile( scenario=ConfoundingScenario.ENVIRONMENTAL_EXTREME_EVENTS, intensity=0.7, parameters={ "extreme_probability": 0.02, "extreme_magnitude": 4.0, "extreme_duration": 0.05, "recovery_rate": 0.1, }, description="Simulates extreme environmental events (storms, etc.)", ) profiles[ ConfoundingScenario.ENVIRONMENTAL_MEASUREMENT_DRIFT ] = ConfoundingProfile( scenario=ConfoundingScenario.ENVIRONMENTAL_MEASUREMENT_DRIFT, intensity=0.3, parameters={ "drift_rate": 0.0005, "temperature_dependence": 0.2, "humidity_dependence": 0.1, }, description="Simulates measurement drift due to environmental conditions", ) # Network scenarios profiles[ConfoundingScenario.NETWORK_BURSTS] = ConfoundingProfile( scenario=ConfoundingScenario.NETWORK_BURSTS, intensity=0.6, parameters={ "burst_probability": 0.05, "burst_duration": 0.02, "burst_amplitude": 3.0, "burst_frequency": 0.1, }, description="Simulates network traffic bursts", ) profiles[ConfoundingScenario.NETWORK_CONGESTION] = ConfoundingProfile( scenario=ConfoundingScenario.NETWORK_CONGESTION, intensity=0.5, parameters={ "congestion_probability": 0.1, "congestion_duration": 0.15, "congestion_severity": 0.5, "recovery_time": 0.1, }, description="Simulates network congestion effects", ) profiles[ConfoundingScenario.NETWORK_EQUIPMENT_FAILURE] = ConfoundingProfile( scenario=ConfoundingScenario.NETWORK_EQUIPMENT_FAILURE, intensity=0.8, parameters={ "failure_probability": 0.005, "failure_duration": 0.2, "failure_magnitude": 10.0, "recovery_time": 0.3, }, description="Simulates network equipment failures", ) # Industrial scenarios profiles[ConfoundingScenario.INDUSTRIAL_CALIBRATION_DRIFT] = ConfoundingProfile( scenario=ConfoundingScenario.INDUSTRIAL_CALIBRATION_DRIFT, intensity=0.4, parameters={ "drift_rate": 0.0002, "drift_variability": 0.05, "calibration_interval": 0.1, "calibration_accuracy": 0.02, }, description="Simulates industrial sensor calibration drift", ) profiles[ConfoundingScenario.INDUSTRIAL_SENSOR_AGING] = ConfoundingProfile( scenario=ConfoundingScenario.INDUSTRIAL_SENSOR_AGING, intensity=0.3, parameters={ "aging_rate": 0.0001, "aging_variability": 0.02, "noise_increase": 0.1, "sensitivity_decrease": 0.05, }, description="Simulates sensor aging effects", ) profiles[ ConfoundingScenario.INDUSTRIAL_ENVIRONMENTAL_INTERFERENCE ] = ConfoundingProfile( scenario=ConfoundingScenario.INDUSTRIAL_ENVIRONMENTAL_INTERFERENCE, intensity=0.5, parameters={ "interference_frequency": 0.05, "interference_amplitude": 0.3, "interference_duration": 0.1, "interference_probability": 0.2, }, description="Simulates environmental interference in industrial sensors", ) # EEG scenarios profiles[ConfoundingScenario.EEG_OCULAR_ARTIFACTS] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_OCULAR_ARTIFACTS, intensity=0.7, parameters={ "blink_probability": 0.1, "saccade_probability": 0.05, "eye_movement_amplitude": 50.0, # μV "blink_duration": 0.3, # seconds "saccade_duration": 0.1, # seconds "frequency_min": 0.5, # Hz "frequency_max": 4.0, # Hz }, description="Simulates ocular artifacts (blinks, saccades) in EEG recordings", ) profiles[ConfoundingScenario.EEG_MUSCLE_ARTIFACTS] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_MUSCLE_ARTIFACTS, intensity=0.6, parameters={ "muscle_activity_probability": 0.15, "muscle_amplitude": 100.0, # μV "muscle_duration": 0.5, # seconds "frequency_min": 20.0, # Hz "frequency_max": 100.0, # Hz "burst_probability": 0.3, }, description="Simulates muscle artifacts (EMG) in EEG recordings", ) profiles[ConfoundingScenario.EEG_CARDIAC_ARTIFACTS] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_CARDIAC_ARTIFACTS, intensity=0.4, parameters={ "heart_rate": 1.2, # Hz (72 BPM) "ecg_amplitude": 20.0, # μV "qrs_duration": 0.1, # seconds "variability": 0.1, # heart rate variability "conduction_delay": 0.05, # seconds }, description="Simulates cardiac artifacts (ECG) in EEG recordings", ) profiles[ConfoundingScenario.EEG_ELECTRODE_POPPING] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_ELECTRODE_POPPING, intensity=0.8, parameters={ "pop_probability": 0.02, "pop_amplitude": 200.0, # μV "pop_duration": 0.01, # seconds "recovery_time": 0.1, # seconds "impedance_threshold": 0.5, }, description="Simulates electrode popping artifacts in EEG recordings", ) profiles[ConfoundingScenario.EEG_ELECTRODE_DRIFT] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_ELECTRODE_DRIFT, intensity=0.3, parameters={ "drift_rate": 0.5, # μV/second "drift_variability": 0.1, "baseline_shift": 10.0, # μV "impedance_change": 0.2, "calibration_drift": 0.05, }, description="Simulates electrode drift and impedance changes in EEG recordings", ) profiles[ConfoundingScenario.EEG_60HZ_NOISE] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_60HZ_NOISE, intensity=0.5, parameters={ "line_frequency": 60.0, # Hz "noise_amplitude": 5.0, # μV "harmonic_amplitude": 2.0, # μV "phase_noise": 0.1, "frequency_drift": 0.1, # Hz }, description="Simulates 60Hz power line noise and harmonics in EEG recordings", ) profiles[ConfoundingScenario.EEG_SWEAT_ARTIFACTS] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_SWEAT_ARTIFACTS, intensity=0.4, parameters={ "sweat_probability": 0.1, "sweat_amplitude": 30.0, # μV "sweat_duration": 2.0, # seconds "impedance_reduction": 0.3, "slow_drift": 0.2, # μV/second }, description="Simulates sweat artifacts and impedance changes in EEG recordings", ) profiles[ConfoundingScenario.EEG_MOVEMENT_ARTIFACTS] = ConfoundingProfile( scenario=ConfoundingScenario.EEG_MOVEMENT_ARTIFACTS, intensity=0.6, parameters={ "movement_probability": 0.05, "movement_amplitude": 80.0, # μV "movement_duration": 1.0, # seconds "frequency_min": 0.1, # Hz "frequency_max": 10.0, # Hz "acceleration_effect": 0.5, }, description="Simulates head/body movement artifacts in EEG recordings", ) # Mixed realistic scenarios profiles[ConfoundingScenario.MIXED_REALISTIC_LIGHT] = ConfoundingProfile( scenario=ConfoundingScenario.MIXED_REALISTIC_LIGHT, intensity=0.3, parameters={ "noise_level": 0.05, "drift_rate": 0.001, "artifact_probability": 0.02, "trend_component": 0.02, }, description="Light realistic contamination with minimal artifacts", ) profiles[ConfoundingScenario.MIXED_REALISTIC_MODERATE] = ConfoundingProfile( scenario=ConfoundingScenario.MIXED_REALISTIC_MODERATE, intensity=0.5, parameters={ "noise_level": 0.1, "drift_rate": 0.002, "artifact_probability": 0.05, "trend_component": 0.05, "seasonal_amplitude": 0.1, }, description="Moderate realistic contamination with common artifacts", ) profiles[ConfoundingScenario.MIXED_REALISTIC_SEVERE] = ConfoundingProfile( scenario=ConfoundingScenario.MIXED_REALISTIC_SEVERE, intensity=0.8, parameters={ "noise_level": 0.2, "drift_rate": 0.005, "artifact_probability": 0.1, "trend_component": 0.1, "seasonal_amplitude": 0.2, "extreme_events": 0.05, }, description="Severe realistic contamination with multiple artifacts", ) return profiles
[docs] def create_confounding_profile( self, scenario: ConfoundingScenario, intensity: Optional[float] = None, custom_parameters: Optional[Dict[str, float]] = None, ) -> ConfoundingProfile: """ Create a confounding profile for a specific scenario. Parameters ---------- scenario : ConfoundingScenario The confounding scenario to create intensity : float, optional Intensity of the confounding (0.0 to 1.0). If None, uses default. custom_parameters : dict, optional Custom parameters to override defaults Returns ------- ConfoundingProfile The created confounding profile """ if scenario not in self.profiles: raise ValueError(f"Unknown scenario: {scenario}") base_profile = self.profiles[scenario] # Create a copy with modifications profile = ConfoundingProfile( scenario=scenario, intensity=intensity if intensity is not None else base_profile.intensity, parameters=base_profile.parameters.copy(), description=base_profile.description, ) # Apply custom parameters if custom_parameters: profile.parameters.update(custom_parameters) # Scale parameters by intensity for key, value in profile.parameters.items(): profile.parameters[key] = value * profile.intensity return profile
[docs] def apply_confounding( self, data: np.ndarray, scenario: ConfoundingScenario, intensity: Optional[float] = None, ) -> Tuple[np.ndarray, str]: """ Apply confounding to data based on a specific scenario. Parameters ---------- data : np.ndarray Input time series data scenario : ConfoundingScenario Confounding scenario to apply intensity : float, optional Intensity of confounding (0.0 to 1.0). If None, uses default. Returns ------- tuple (contaminated_data, description) """ # Create profile profile = self.create_confounding_profile(scenario, intensity) contaminated = data.copy() n = len(data) params = profile.parameters if profile.scenario == ConfoundingScenario.FINANCIAL_CRASH: contaminated = self._apply_financial_crash(contaminated, params) elif profile.scenario == ConfoundingScenario.FINANCIAL_VOLATILITY_CLUSTERING: contaminated = self._apply_volatility_clustering(contaminated, params) elif profile.scenario == ConfoundingScenario.FINANCIAL_REGIME_CHANGE: contaminated = self._apply_regime_change(contaminated, params) elif profile.scenario == ConfoundingScenario.PHYSIOLOGICAL_SENSOR_DRIFT: contaminated = self._apply_sensor_drift(contaminated, params) elif profile.scenario == ConfoundingScenario.PHYSIOLOGICAL_MOTION_ARTIFACTS: contaminated = self._apply_motion_artifacts(contaminated, params) elif profile.scenario == ConfoundingScenario.PHYSIOLOGICAL_EQUIPMENT_FAILURE: contaminated = self._apply_equipment_failure(contaminated, params) elif profile.scenario == ConfoundingScenario.ENVIRONMENTAL_SEASONAL: contaminated = self._apply_seasonal_effects(contaminated, params) elif profile.scenario == ConfoundingScenario.ENVIRONMENTAL_EXTREME_EVENTS: contaminated = self._apply_extreme_events(contaminated, params) elif profile.scenario == ConfoundingScenario.ENVIRONMENTAL_MEASUREMENT_DRIFT: contaminated = self._apply_measurement_drift(contaminated, params) elif profile.scenario == ConfoundingScenario.NETWORK_BURSTS: contaminated = self._apply_network_bursts(contaminated, params) elif profile.scenario == ConfoundingScenario.NETWORK_CONGESTION: contaminated = self._apply_network_congestion(contaminated, params) elif profile.scenario == ConfoundingScenario.NETWORK_EQUIPMENT_FAILURE: contaminated = self._apply_network_equipment_failure(contaminated, params) elif profile.scenario == ConfoundingScenario.INDUSTRIAL_CALIBRATION_DRIFT: contaminated = self._apply_calibration_drift(contaminated, params) elif profile.scenario == ConfoundingScenario.INDUSTRIAL_SENSOR_AGING: contaminated = self._apply_sensor_aging(contaminated, params) elif ( profile.scenario == ConfoundingScenario.INDUSTRIAL_ENVIRONMENTAL_INTERFERENCE ): contaminated = self._apply_environmental_interference(contaminated, params) elif profile.scenario == ConfoundingScenario.MIXED_REALISTIC_LIGHT: contaminated = self._apply_mixed_realistic_light(contaminated, params) elif profile.scenario == ConfoundingScenario.MIXED_REALISTIC_MODERATE: contaminated = self._apply_mixed_realistic_moderate(contaminated, params) elif profile.scenario == ConfoundingScenario.MIXED_REALISTIC_SEVERE: contaminated = self._apply_mixed_realistic_severe(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_OCULAR_ARTIFACTS: contaminated = self._apply_eeg_ocular_artifacts(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_MUSCLE_ARTIFACTS: contaminated = self._apply_eeg_muscle_artifacts(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_CARDIAC_ARTIFACTS: contaminated = self._apply_eeg_cardiac_artifacts(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_ELECTRODE_POPPING: contaminated = self._apply_eeg_electrode_popping(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_ELECTRODE_DRIFT: contaminated = self._apply_eeg_electrode_drift(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_60HZ_NOISE: contaminated = self._apply_eeg_60hz_noise(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_SWEAT_ARTIFACTS: contaminated = self._apply_eeg_sweat_artifacts(contaminated, params) elif profile.scenario == ConfoundingScenario.EEG_MOVEMENT_ARTIFACTS: contaminated = self._apply_eeg_movement_artifacts(contaminated, params) else: warnings.warn(f"Unknown scenario: {profile.scenario}") return contaminated, profile.description
[docs] def _apply_financial_crash( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply financial crash confounding.""" n = len(data) contaminated = data.copy() # Generate crash events crash_mask = np.random.random(n) < params["crash_probability"] crash_indices = np.where(crash_mask)[0] for crash_idx in crash_indices: # Apply crash crash_magnitude = params["crash_magnitude"] * np.random.choice([-1, 1]) contaminated[crash_idx] += crash_magnitude # Apply recovery and increased volatility recovery_length = int(params["recovery_time"] * n) for i in range(1, min(recovery_length, n - crash_idx)): if crash_idx + i < n: # Gradual recovery recovery_factor = 1 - (i / recovery_length) contaminated[crash_idx + i] += ( crash_magnitude * recovery_factor * 0.1 ) # Increased volatility volatility_noise = np.random.normal( 0, params["volatility_increase"] * 0.1 ) contaminated[crash_idx + i] += volatility_noise return contaminated
[docs] def _apply_volatility_clustering( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply volatility clustering confounding.""" n = len(data) contaminated = data.copy() # Generate volatility clusters cluster_mask = np.random.random(n) < params["cluster_probability"] cluster_length = int(params["cluster_duration"] * n) for i in range(n): if cluster_mask[i]: # Start a volatility cluster for j in range(min(cluster_length, n - i)): if i + j < n: volatility_multiplier = params["volatility_multiplier"] noise = np.random.normal(0, volatility_multiplier * 0.1) contaminated[i + j] += noise return contaminated
[docs] def _apply_regime_change( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply regime change confounding.""" n = len(data) contaminated = data.copy() # Generate regime changes regime_mask = np.random.random(n) < params["regime_change_probability"] regime_indices = np.where(regime_mask)[0] current_regime = 0 for i in range(n): if i in regime_indices: current_regime = 1 - current_regime # Switch regime # Apply regime effects mean_shift = params["mean_shift"] * (2 * current_regime - 1) volatility_shift = 1 + params["volatility_shift"] * (2 * current_regime - 1) contaminated[i] += mean_shift contaminated[i] += np.random.normal(0, volatility_shift * 0.1) return contaminated
[docs] def _apply_sensor_drift( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply sensor drift confounding.""" n = len(data) contaminated = data.copy() # Generate drift t = np.arange(n) / n drift = params["drift_rate"] * t + np.random.normal( 0, params["drift_variability"], n ) # Add calibration events calibration_mask = np.random.random(n) < params["calibration_events"] for i in range(n): if calibration_mask[i]: # Reset drift drift[i:] -= drift[i] contaminated += drift return contaminated
[docs] def _apply_motion_artifacts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply motion artifacts confounding.""" n = len(data) contaminated = data.copy() # Generate motion events motion_mask = np.random.random(n) < params["motion_probability"] motion_length = int(params["motion_duration"] * n) for i in range(n): if motion_mask[i]: # Apply motion artifact for j in range(min(motion_length, n - i)): if i + j < n: motion_amplitude = params["motion_amplitude"] motion_freq = params["motion_frequency"] motion_signal = motion_amplitude * np.sin( 2 * np.pi * motion_freq * j ) contaminated[i + j] += motion_signal return contaminated
[docs] def _apply_equipment_failure( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply equipment failure confounding.""" n = len(data) contaminated = data.copy() # Generate failure events failure_mask = np.random.random(n) < params["failure_probability"] failure_length = int(params["failure_duration"] * n) recovery_length = int(params["recovery_time"] * n) for i in range(n): if failure_mask[i]: # Apply failure failure_magnitude = params["failure_magnitude"] contaminated[i] += failure_magnitude * np.random.choice([-1, 1]) # Apply recovery for j in range(1, min(recovery_length, n - i)): if i + j < n: recovery_factor = 1 - (j / recovery_length) contaminated[i + j] += failure_magnitude * recovery_factor * 0.1 return contaminated
[docs] def _apply_seasonal_effects( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply seasonal effects confounding.""" n = len(data) contaminated = data.copy() # Generate seasonal component t = np.arange(n) / n seasonal = params["seasonal_amplitude"] * np.sin( 2 * np.pi * t / params["seasonal_period"] ) seasonal += ( 0.3 * params["seasonal_amplitude"] * np.sin(4 * np.pi * t / params["seasonal_period"]) ) # Add trend trend = params["trend_component"] * t # Add noise noise = np.random.normal(0, params["noise_level"], n) contaminated += seasonal + trend + noise return contaminated
[docs] def _apply_extreme_events( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply extreme events confounding.""" n = len(data) contaminated = data.copy() # Generate extreme events extreme_mask = np.random.random(n) < params["extreme_probability"] extreme_length = int(params["extreme_duration"] * n) for i in range(n): if extreme_mask[i]: # Apply extreme event extreme_magnitude = params["extreme_magnitude"] * np.random.choice( [-1, 1] ) for j in range(min(extreme_length, n - i)): if i + j < n: contaminated[i + j] += extreme_magnitude * ( 1 - j / extreme_length ) return contaminated
[docs] def _apply_measurement_drift( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply measurement drift confounding.""" n = len(data) contaminated = data.copy() # Generate environmental conditions temperature = np.random.normal(20, 5, n) # Temperature variation humidity = np.random.normal(50, 10, n) # Humidity variation # Apply drift based on environmental conditions drift = ( params["drift_rate"] * np.arange(n) / n + params["temperature_dependence"] * (temperature - 20) / 100 + params["humidity_dependence"] * (humidity - 50) / 100 ) contaminated += drift return contaminated
[docs] def _apply_network_bursts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply network bursts confounding.""" n = len(data) contaminated = data.copy() # Generate burst events burst_mask = np.random.random(n) < params["burst_probability"] burst_length = int(params["burst_duration"] * n) for i in range(n): if burst_mask[i]: # Apply burst burst_amplitude = params["burst_amplitude"] for j in range(min(burst_length, n - i)): if i + j < n: burst_signal = burst_amplitude * np.exp( -j / (burst_length * 0.5) ) contaminated[i + j] += burst_signal return contaminated
[docs] def _apply_network_congestion( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply network congestion confounding.""" n = len(data) contaminated = data.copy() # Generate congestion events congestion_mask = np.random.random(n) < params["congestion_probability"] congestion_length = int(params["congestion_duration"] * n) for i in range(n): if congestion_mask[i]: # Apply congestion congestion_severity = params["congestion_severity"] for j in range(min(congestion_length, n - i)): if i + j < n: # Gradual onset and recovery congestion_factor = np.sin(np.pi * j / congestion_length) contaminated[i + j] += congestion_severity * congestion_factor return contaminated
[docs] def _apply_network_equipment_failure( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply network equipment failure confounding.""" n = len(data) contaminated = data.copy() # Generate failure events failure_mask = np.random.random(n) < params["failure_probability"] failure_length = int(params["failure_duration"] * n) recovery_length = int(params["recovery_time"] * n) for i in range(n): if failure_mask[i]: # Apply failure failure_magnitude = params["failure_magnitude"] for j in range(min(failure_length, n - i)): if i + j < n: contaminated[i + j] += failure_magnitude # Apply recovery for j in range( failure_length, min(failure_length + recovery_length, n - i) ): if i + j < n: recovery_factor = 1 - ((j - failure_length) / recovery_length) contaminated[i + j] += failure_magnitude * recovery_factor return contaminated
[docs] def _apply_calibration_drift( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply calibration drift confounding.""" n = len(data) contaminated = data.copy() # Generate drift t = np.arange(n) / n drift = params["drift_rate"] * t + np.random.normal( 0, params["drift_variability"], n ) # Add calibration events calibration_interval = int(params["calibration_interval"] * n) for i in range(0, n, calibration_interval): if i < n: # Apply calibration correction calibration_correction = np.random.normal( 0, params["calibration_accuracy"] ) drift[i:] += calibration_correction contaminated += drift return contaminated
[docs] def _apply_sensor_aging( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply sensor aging confounding.""" n = len(data) contaminated = data.copy() # Generate aging effects t = np.arange(n) / n aging = params["aging_rate"] * t + np.random.normal( 0, params["aging_variability"], n ) # Apply noise increase noise_increase = params["noise_increase"] * t noise = np.random.normal(0, noise_increase, n) # Apply sensitivity decrease sensitivity_factor = 1 - params["sensitivity_decrease"] * t contaminated = contaminated * sensitivity_factor + aging + noise return contaminated
[docs] def _apply_environmental_interference( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply environmental interference confounding.""" n = len(data) contaminated = data.copy() # Generate interference events interference_mask = np.random.random(n) < params["interference_probability"] interference_length = int(params["interference_duration"] * n) for i in range(n): if interference_mask[i]: # Apply interference interference_amplitude = params["interference_amplitude"] interference_freq = params["interference_frequency"] for j in range(min(interference_length, n - i)): if i + j < n: interference_signal = interference_amplitude * np.sin( 2 * np.pi * interference_freq * j ) contaminated[i + j] += interference_signal return contaminated
[docs] def _apply_mixed_realistic_light( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply light realistic confounding.""" n = len(data) contaminated = data.copy() # Add noise noise = np.random.normal(0, params["noise_level"], n) contaminated += noise # Add drift drift = params["drift_rate"] * np.arange(n) / n contaminated += drift # Add occasional artifacts artifact_mask = np.random.random(n) < params["artifact_probability"] contaminated[artifact_mask] += np.random.normal(0, 0.5, np.sum(artifact_mask)) # Add trend trend = params["trend_component"] * np.arange(n) / n contaminated += trend return contaminated
[docs] def _apply_mixed_realistic_moderate( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply moderate realistic confounding.""" n = len(data) contaminated = data.copy() # Add noise noise = np.random.normal(0, params["noise_level"], n) contaminated += noise # Add drift drift = params["drift_rate"] * np.arange(n) / n contaminated += drift # Add artifacts artifact_mask = np.random.random(n) < params["artifact_probability"] contaminated[artifact_mask] += np.random.normal(0, 1.0, np.sum(artifact_mask)) # Add trend trend = params["trend_component"] * np.arange(n) / n contaminated += trend # Add seasonal component t = np.arange(n) / n seasonal = params["seasonal_amplitude"] * np.sin(2 * np.pi * t / 0.25) contaminated += seasonal return contaminated
[docs] def _apply_mixed_realistic_severe( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply severe realistic confounding.""" n = len(data) contaminated = data.copy() # Add noise noise = np.random.normal(0, params["noise_level"], n) contaminated += noise # Add drift drift = params["drift_rate"] * np.arange(n) / n contaminated += drift # Add artifacts artifact_mask = np.random.random(n) < params["artifact_probability"] contaminated[artifact_mask] += np.random.normal(0, 2.0, np.sum(artifact_mask)) # Add trend trend = params["trend_component"] * np.arange(n) / n contaminated += trend # Add seasonal component t = np.arange(n) / n seasonal = params["seasonal_amplitude"] * np.sin(2 * np.pi * t / 0.25) contaminated += seasonal # Add extreme events extreme_mask = np.random.random(n) < params["extreme_events"] contaminated[extreme_mask] += np.random.normal(0, 5.0, np.sum(extreme_mask)) return contaminated
[docs] def _apply_eeg_ocular_artifacts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG ocular artifacts (blinks, saccades).""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 dt = 1.0 / sampling_rate # Generate blink artifacts blink_mask = np.random.random(n) < params["blink_probability"] * dt blink_indices = np.where(blink_mask)[0] for blink_idx in blink_indices: blink_duration = int(params["blink_duration"] * sampling_rate) blink_amplitude = params["eye_movement_amplitude"] * np.random.choice( [-1, 1] ) # Create blink artifact (exponential decay) for i in range(min(blink_duration, n - blink_idx)): if blink_idx + i < n: decay_factor = np.exp(-i / (blink_duration * 0.3)) contaminated[blink_idx + i] += blink_amplitude * decay_factor # Generate saccade artifacts saccade_mask = np.random.random(n) < params["saccade_probability"] * dt saccade_indices = np.where(saccade_mask)[0] for saccade_idx in saccade_indices: saccade_duration = int(params["saccade_duration"] * sampling_rate) saccade_amplitude = ( params["eye_movement_amplitude"] * 0.3 * np.random.choice([-1, 1]) ) # Create saccade artifact (sharp transient) for i in range(min(saccade_duration, n - saccade_idx)): if saccade_idx + i < n: contaminated[saccade_idx + i] += saccade_amplitude * ( 1 - i / saccade_duration ) return contaminated
[docs] def _apply_eeg_muscle_artifacts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG muscle artifacts (EMG).""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 dt = 1.0 / sampling_rate # Generate muscle activity muscle_mask = np.random.random(n) < params["muscle_activity_probability"] * dt muscle_indices = np.where(muscle_mask)[0] for muscle_idx in muscle_indices: muscle_duration = int(params["muscle_duration"] * sampling_rate) muscle_amplitude = params["muscle_amplitude"] * np.random.uniform(0.5, 1.5) # Create muscle artifact (high-frequency noise) for i in range(min(muscle_duration, n - muscle_idx)): if muscle_idx + i < n: # High-frequency muscle activity freq = np.random.uniform( params["frequency_min"], params["frequency_max"] ) phase = 2 * np.pi * freq * i / sampling_rate muscle_signal = ( muscle_amplitude * np.sin(phase) * np.random.uniform(0.5, 1.0) ) contaminated[muscle_idx + i] += muscle_signal return contaminated
[docs] def _apply_eeg_cardiac_artifacts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG cardiac artifacts (ECG).""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 dt = 1.0 / sampling_rate # Generate heart rate variability heart_rate = params["heart_rate"] * ( 1 + np.random.normal(0, params["variability"]) ) heart_period = 1.0 / heart_rate # Generate R-peaks current_time = 0 while current_time < n * dt: # Add R-peak r_peak_idx = int(current_time * sampling_rate) if r_peak_idx < n: r_peak_amplitude = params["ecg_amplitude"] * np.random.uniform(0.8, 1.2) contaminated[r_peak_idx] += r_peak_amplitude # Add QRS complex qrs_duration = int(params["qrs_duration"] * sampling_rate) for i in range(1, min(qrs_duration, n - r_peak_idx)): if r_peak_idx + i < n: qrs_decay = np.exp(-i / (qrs_duration * 0.3)) contaminated[r_peak_idx + i] += ( r_peak_amplitude * 0.3 * qrs_decay ) # Next heartbeat with variability current_time += heart_period * ( 1 + np.random.normal(0, params["variability"]) ) return contaminated
[docs] def _apply_eeg_electrode_popping( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG electrode popping artifacts.""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 dt = 1.0 / sampling_rate # Generate electrode pops pop_mask = np.random.random(n) < params["pop_probability"] * dt pop_indices = np.where(pop_mask)[0] for pop_idx in pop_indices: pop_duration = int(params["pop_duration"] * sampling_rate) pop_amplitude = params["pop_amplitude"] * np.random.uniform(0.5, 2.0) pop_sign = np.random.choice([-1, 1]) # Create sharp pop artifact for i in range(min(pop_duration, n - pop_idx)): if pop_idx + i < n: if i == 0: contaminated[pop_idx + i] += pop_amplitude * pop_sign else: # Exponential decay decay_factor = np.exp(-i / (pop_duration * 0.1)) contaminated[pop_idx + i] += ( pop_amplitude * pop_sign * decay_factor * 0.1 ) return contaminated
[docs] def _apply_eeg_electrode_drift( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG electrode drift and impedance changes.""" n = len(data) contaminated = data.copy() # Generate slow drift t = np.arange(n) / 250.0 # Assume 250 Hz sampling rate drift = params["drift_rate"] * t + np.random.normal( 0, params["drift_variability"], n ) # Add baseline shifts baseline_shifts = np.random.normal(0, params["baseline_shift"], n) drift += baseline_shifts * 0.01 # Slow baseline changes # Add impedance-related artifacts impedance_changes = np.random.normal(0, params["impedance_change"], n) impedance_effect = impedance_changes * np.random.normal(0, 0.1, n) contaminated += drift + impedance_effect return contaminated
[docs] def _apply_eeg_60hz_noise( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG 60Hz power line noise and harmonics.""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 t = np.arange(n) / sampling_rate # Generate 60Hz noise line_freq = params["line_frequency"] + np.random.normal( 0, params["frequency_drift"] ) phase_noise = np.random.normal(0, params["phase_noise"], n) # Fundamental frequency line_noise = params["noise_amplitude"] * np.sin( 2 * np.pi * line_freq * t + phase_noise ) # Harmonics (120Hz, 180Hz) harmonic_noise = params["harmonic_amplitude"] * ( np.sin(2 * np.pi * 2 * line_freq * t + phase_noise) + np.sin(2 * np.pi * 3 * line_freq * t + phase_noise) ) contaminated += line_noise + harmonic_noise return contaminated
[docs] def _apply_eeg_sweat_artifacts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG sweat artifacts and impedance changes.""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 dt = 1.0 / sampling_rate # Generate sweat events sweat_mask = np.random.random(n) < params["sweat_probability"] * dt sweat_indices = np.where(sweat_mask)[0] for sweat_idx in sweat_indices: sweat_duration = int(params["sweat_duration"] * sampling_rate) sweat_amplitude = params["sweat_amplitude"] * np.random.uniform(0.5, 1.5) # Create sweat artifact (slow drift + noise) for i in range(min(sweat_duration, n - sweat_idx)): if sweat_idx + i < n: # Slow drift component drift_component = params["slow_drift"] * i / sampling_rate # Noise component noise_component = np.random.normal(0, sweat_amplitude * 0.1) contaminated[sweat_idx + i] += drift_component + noise_component return contaminated
[docs] def _apply_eeg_movement_artifacts( self, data: np.ndarray, params: Dict[str, float] ) -> np.ndarray: """Apply EEG head/body movement artifacts.""" n = len(data) contaminated = data.copy() # Assume 250 Hz sampling rate for EEG sampling_rate = 250.0 dt = 1.0 / sampling_rate # Generate movement events movement_mask = np.random.random(n) < params["movement_probability"] * dt movement_indices = np.where(movement_mask)[0] for movement_idx in movement_indices: movement_duration = int(params["movement_duration"] * sampling_rate) movement_amplitude = params["movement_amplitude"] * np.random.uniform( 0.5, 1.5 ) # Create movement artifact (low-frequency oscillation) for i in range(min(movement_duration, n - movement_idx)): if movement_idx + i < n: # Low-frequency movement freq = np.random.uniform( params["frequency_min"], params["frequency_max"] ) phase = 2 * np.pi * freq * i / sampling_rate movement_signal = ( movement_amplitude * np.sin(phase) * np.random.uniform(0.5, 1.0) ) contaminated[movement_idx + i] += movement_signal return contaminated
[docs] def get_available_scenarios(self) -> List[ConfoundingScenario]: """Get list of available confounding scenarios.""" return list(self.profiles.keys())
[docs] def get_scenario_info(self, scenario: ConfoundingScenario) -> Dict[str, str]: """Get information about a specific scenario.""" if scenario not in self.profiles: raise ValueError(f"Unknown scenario: {scenario}") profile = self.profiles[scenario] return { "scenario": scenario.value, "description": profile.description, "default_intensity": str(profile.intensity), "parameters": str(profile.parameters), }