Source code for lrdbenchmark.analytics.performance_monitor

"""
Performance Monitor for LRDBench

Monitors and analyzes performance metrics including:
- Execution time tracking
- Memory usage monitoring
- Performance trends over time
- Bottleneck identification
- Resource utilization
"""

import json
import threading
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np
import psutil


[docs] @dataclass class PerformanceMetrics: """Performance metrics for a single execution""" timestamp: str estimator_name: str execution_time: float memory_before: float memory_after: float memory_peak: float cpu_percent: float data_length: int parameters: Dict[str, str]
[docs] @dataclass class PerformanceSummary: """Aggregated performance statistics""" total_executions: int avg_execution_time: float std_execution_time: float min_execution_time: float max_execution_time: float avg_memory_usage: float memory_efficiency: float performance_trend: str # "improving", "stable", "degrading" bottleneck_estimators: List[str]
[docs] class PerformanceMonitor: """ Comprehensive performance monitoring system Features: - Real-time performance tracking - Memory usage monitoring - CPU utilization tracking - Performance trend analysis - Bottleneck identification """
[docs] def __init__(self, storage_path: str = "~/.lrdbench/analytics"): """Initialize the performance monitor""" self.storage_path = Path(storage_path).expanduser() self.storage_path.mkdir(parents=True, exist_ok=True) self._lock = threading.Lock() self._metrics: List[PerformanceMetrics] = [] self._process = psutil.Process() self._session_info: Dict[str, Dict] = {} self._timings: Dict[str, List[float]] = {} # Load existing data self._load_existing_data()
[docs] def _load_existing_data(self): """Load existing performance data""" try: metrics_file = self.storage_path / "performance_metrics.json" if metrics_file.exists(): with open(metrics_file, "r") as f: data = json.load(f) for metric_data in data: metric = PerformanceMetrics(**metric_data) self._metrics.append(metric) except Exception as e: print(f"Warning: Could not load existing performance data: {e}")
[docs] def start_monitoring( self, estimator_name: str, data_length: int, parameters: Dict[str, str] ) -> str: """ Start monitoring a new execution Args: estimator_name: Name of the estimator data_length: Length of input data parameters: Estimator parameters Returns: Monitoring session ID """ session_id = f"{estimator_name}_{int(time.time())}" # Record initial state memory_before = self._process.memory_info().rss / 1024 / 1024 # MB # Store session info with self._lock: self._session_info[session_id] = { "estimator_name": estimator_name, "start_time": time.time(), "memory_before": memory_before, "data_length": data_length, "parameters": parameters, } return session_id
[docs] def stop_monitoring(self, session_id: str) -> None: """ Stop monitoring and record metrics Args: session_id: Monitoring session ID """ if session_id not in self._session_info: return session = self._session_info[session_id] execution_time = time.time() - session["start_time"] # Record final state memory_after = self._process.memory_info().rss / 1024 / 1024 # MB memory_peak = self._process.memory_info().peak_wset / 1024 / 1024 # MB cpu_percent = self._process.cpu_percent() # Create metrics metrics = PerformanceMetrics( timestamp=datetime.now().isoformat(), estimator_name=session["estimator_name"], execution_time=execution_time, memory_before=session["memory_before"], memory_after=memory_after, memory_peak=memory_peak, cpu_percent=cpu_percent, data_length=session["data_length"], parameters=session["parameters"], ) # Store metrics with self._lock: self._metrics.append(metrics) # Clean up session del self._session_info[session_id]
[docs] def timer(self, name: str): """ Context manager for timing code blocks. Args: name: Name of the timer Usage: with monitor.timer('my_operation'): # code to time pass """ from contextlib import contextmanager @contextmanager def _timer(): start_time = time.time() try: yield finally: elapsed = time.time() - start_time with self._lock: if name not in self._timings: self._timings[name] = [] self._timings[name].append(elapsed) return _timer()
[docs] def get_stats(self) -> Dict[str, Dict[str, float]]: """ Get statistics for all timers. Returns: Dictionary mapping timer names to statistics (mean, std, min, max, count) """ stats = {} with self._lock: for name, times in self._timings.items(): if times: stats[name] = { "mean": np.mean(times), "std": np.std(times), "min": np.min(times), "max": np.max(times), "count": len(times), "total": np.sum(times), } return stats
[docs] def get_performance_summary(self, days: int = 30) -> PerformanceSummary: """ Get performance summary for the specified time period Args: days: Number of days to analyze Returns: PerformanceSummary object """ cutoff_time = datetime.now() - timedelta(days=days) with self._lock: recent_metrics = [ m for m in self._metrics if datetime.fromisoformat(m.timestamp) > cutoff_time ] if not recent_metrics: return PerformanceSummary( total_executions=0, avg_execution_time=0.0, std_execution_time=0.0, min_execution_time=0.0, max_execution_time=0.0, avg_memory_usage=0.0, memory_efficiency=0.0, performance_trend="stable", bottleneck_estimators=[], ) # Calculate statistics execution_times = [m.execution_time for m in recent_metrics] memory_usage = [m.memory_after - m.memory_before for m in recent_metrics] avg_execution_time = np.mean(execution_times) std_execution_time = np.std(execution_times) min_execution_time = np.min(execution_times) max_execution_time = np.max(execution_times) avg_memory_usage = np.mean(memory_usage) memory_efficiency = ( avg_memory_usage / avg_execution_time if avg_execution_time > 0 else 0 ) # Determine performance trend performance_trend = self._analyze_performance_trend(recent_metrics) # Identify bottlenecks bottleneck_estimators = self._identify_bottlenecks(recent_metrics) return PerformanceSummary( total_executions=len(recent_metrics), avg_execution_time=avg_execution_time, std_execution_time=std_execution_time, min_execution_time=min_execution_time, max_execution_time=max_execution_time, avg_memory_usage=avg_memory_usage, memory_efficiency=memory_efficiency, performance_trend=performance_trend, bottleneck_estimators=bottleneck_estimators, )
[docs] def _analyze_performance_trend(self, metrics: List[PerformanceMetrics]) -> str: """Analyze performance trend over time""" if len(metrics) < 10: return "stable" # Sort by timestamp sorted_metrics = sorted(metrics, key=lambda m: m.timestamp) # Split into early and late periods mid_point = len(sorted_metrics) // 2 early_times = [m.execution_time for m in sorted_metrics[:mid_point]] late_times = [m.execution_time for m in sorted_metrics[mid_point:]] early_avg = np.mean(early_times) late_avg = np.mean(late_times) # Calculate trend if late_avg < early_avg * 0.9: # 10% improvement return "improving" elif late_avg > early_avg * 1.1: # 10% degradation return "degrading" else: return "stable"
[docs] def _identify_bottlenecks(self, metrics: List[PerformanceMetrics]) -> List[str]: """Identify estimators with performance bottlenecks""" if not metrics: return [] # Group by estimator estimator_metrics = {} for metric in metrics: if metric.estimator_name not in estimator_metrics: estimator_metrics[metric.estimator_name] = [] estimator_metrics[metric.estimator_name].append(metric) # Calculate average execution time for each estimator estimator_avg_times = {} for name, est_metrics in estimator_metrics.items(): avg_time = np.mean([m.execution_time for m in est_metrics]) estimator_avg_times[name] = avg_time # Find estimators above 75th percentile if not estimator_avg_times: return [] times = list(estimator_avg_times.values()) threshold = np.percentile(times, 75) bottlenecks = [ name for name, avg_time in estimator_avg_times.items() if avg_time > threshold ] return sorted(bottlenecks, key=lambda x: estimator_avg_times[x], reverse=True)
[docs] def get_estimator_performance( self, estimator_name: str, days: int = 30 ) -> Dict[str, float]: """Get performance metrics for a specific estimator""" cutoff_time = datetime.now() - timedelta(days=days) with self._lock: estimator_metrics = [ m for m in self._metrics if m.estimator_name == estimator_name and datetime.fromisoformat(m.timestamp) > cutoff_time ] if not estimator_metrics: return {} execution_times = [m.execution_time for m in estimator_metrics] memory_usage = [m.memory_after - m.memory_before for m in estimator_metrics] return { "avg_execution_time": np.mean(execution_times), "std_execution_time": np.std(execution_times), "min_execution_time": np.min(execution_times), "max_execution_time": np.max(execution_times), "avg_memory_usage": np.mean(memory_usage), "total_executions": len(estimator_metrics), }
[docs] def export_metrics(self, output_path: str, days: int = 30) -> None: """Export performance metrics to file""" cutoff_time = datetime.now() - timedelta(days=days) with self._lock: recent_metrics = [ m for m in self._metrics if datetime.fromisoformat(m.timestamp) > cutoff_time ] metrics_data = [asdict(m) for m in recent_metrics] with open(output_path, "w") as f: json.dump(metrics_data, f, indent=2)
# Global performance monitor instance _global_monitor: Optional[PerformanceMonitor] = None def get_performance_monitor() -> PerformanceMonitor: """Get the global performance monitor instance""" global _global_monitor if _global_monitor is None: _global_monitor = PerformanceMonitor() return _global_monitor def monitor_performance(estimator_name: str): """Decorator for monitoring estimator performance""" def decorator(func): def wrapper(*args, **kwargs): monitor = get_performance_monitor() # Start monitoring session_id = monitor.start_monitoring( estimator_name=estimator_name, data_length=len(args[0]) if args else 0, parameters={k: str(v) for k, v in kwargs.items()}, ) try: result = func(*args, **kwargs) return result finally: # Stop monitoring monitor.stop_monitoring(session_id) return wrapper return decorator