"""
Workflow Analyzer for LRDBench
Analyzes user workflows and usage patterns:
- Workflow sequence analysis
- Common parameter combinations
- User behavior patterns
- Workflow optimization recommendations
- Feature usage analysis
"""
import json
import threading
from collections import Counter, defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
import networkx as nx
[docs]
@dataclass
class WorkflowStep:
"""Represents a single step in a user workflow"""
timestamp: str
step_type: str # 'estimator_usage', 'benchmark_run', 'data_generation', etc.
estimator_name: Optional[str]
parameters: Dict[str, str]
data_length: int
session_id: str
user_id: Optional[str]
[docs]
@dataclass
class Workflow:
"""Represents a complete user workflow"""
workflow_id: str
session_id: str
user_id: Optional[str]
steps: List[WorkflowStep]
start_time: str
end_time: str
total_duration: float
step_count: int
@dataclass
class WorkflowSummary:
"""Aggregated workflow statistics"""
total_workflows: int
unique_users: int
avg_workflow_duration: float
avg_steps_per_workflow: float
common_workflow_patterns: List[Tuple[List[str], int]]
popular_estimator_sequences: List[Tuple[List[str], int]]
workflow_complexity_distribution: Dict[str, int]
[docs]
class WorkflowAnalyzer:
"""
Comprehensive workflow analysis system
Features:
- Workflow pattern recognition
- Sequence analysis
- User behavior modeling
- Optimization recommendations
- Feature usage analysis
"""
[docs]
def __init__(self, storage_path: str = "~/.lrdbench/analytics"):
"""Initialize the workflow analyzer"""
self.storage_path = Path(storage_path).expanduser()
self.storage_path.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._workflows: List[Workflow] = []
self._current_sessions: Dict[str, List[WorkflowStep]] = defaultdict(list)
# Load existing data
self._load_existing_data()
[docs]
def _load_existing_data(self):
"""Load existing workflow data"""
try:
workflows_file = self.storage_path / "workflows.json"
if workflows_file.exists():
with open(workflows_file, "r") as f:
data = json.load(f)
for workflow_data in data:
# Reconstruct workflow from data
workflow = self._reconstruct_workflow(workflow_data)
if workflow:
self._workflows.append(workflow)
except Exception as e:
print(f"Warning: Could not load existing workflow data: {e}")
[docs]
def _reconstruct_workflow(self, workflow_data: Dict) -> Optional[Workflow]:
"""Reconstruct workflow object from stored data"""
try:
steps = []
for step_data in workflow_data.get("steps", []):
step = WorkflowStep(**step_data)
steps.append(step)
return Workflow(
workflow_id=workflow_data["workflow_id"],
session_id=workflow_data["session_id"],
user_id=workflow_data.get("user_id"),
steps=steps,
start_time=workflow_data["start_time"],
end_time=workflow_data["end_time"],
total_duration=workflow_data["total_duration"],
step_count=workflow_data["step_count"],
)
except Exception as e:
print(f"Warning: Could not reconstruct workflow: {e}")
return None
[docs]
def start_workflow_session(
self, session_id: str, user_id: Optional[str] = None
) -> None:
"""Start tracking a new workflow session"""
with self._lock:
if session_id not in self._current_sessions:
self._current_sessions[session_id] = []
[docs]
def add_workflow_step(
self,
session_id: str,
step_type: str,
estimator_name: Optional[str] = None,
parameters: Optional[Dict[str, str]] = None,
data_length: int = 0,
user_id: Optional[str] = None,
) -> None:
"""
Add a step to the current workflow session
Args:
session_id: Session identifier
step_type: Type of workflow step
estimator_name: Name of estimator used (if applicable)
parameters: Parameters for the step
data_length: Length of input data
user_id: Optional user identifier
"""
step = WorkflowStep(
timestamp=datetime.now().isoformat(),
step_type=step_type,
estimator_name=estimator_name,
parameters=parameters or {},
data_length=data_length,
session_id=session_id,
user_id=user_id,
)
with self._lock:
if session_id in self._current_sessions:
self._current_sessions[session_id].append(step)
[docs]
def end_workflow_session(self, session_id: str) -> Optional[str]:
"""
End a workflow session and create workflow record
Args:
session_id: Session identifier
Returns:
Workflow ID if successful, None otherwise
"""
with self._lock:
if session_id not in self._current_sessions:
return None
steps = self._current_sessions[session_id]
if not steps:
del self._current_sessions[session_id]
return None
# Create workflow
workflow_id = f"workflow_{int(datetime.now().timestamp())}_{session_id}"
start_time = steps[0].timestamp
end_time = steps[-1].timestamp
# Calculate duration
start_dt = datetime.fromisoformat(start_time)
end_dt = datetime.fromisoformat(end_time)
total_duration = (end_dt - start_dt).total_seconds()
workflow = Workflow(
workflow_id=workflow_id,
session_id=session_id,
user_id=steps[0].user_id,
steps=steps,
start_time=start_time,
end_time=end_time,
total_duration=total_duration,
step_count=len(steps),
)
# Store workflow
self._workflows.append(workflow)
# Clean up session
del self._current_sessions[session_id]
return workflow_id
[docs]
def get_workflow_summary(self, days: int = 30) -> WorkflowSummary:
"""
Get workflow summary for the specified time period
Args:
days: Number of days to analyze
Returns:
WorkflowSummary object
"""
cutoff_time = datetime.now() - timedelta(days=days)
with self._lock:
recent_workflows = [
w
for w in self._workflows
if datetime.fromisoformat(w.start_time) > cutoff_time
]
if not recent_workflows:
return WorkflowSummary(
total_workflows=0,
unique_users=0,
avg_workflow_duration=0.0,
avg_steps_per_workflow=0.0,
common_workflow_patterns=[],
popular_estimator_sequences=[],
workflow_complexity_distribution={},
)
# Calculate basic statistics
total_workflows = len(recent_workflows)
unique_users = len(set(w.user_id for w in recent_workflows if w.user_id))
durations = [w.total_duration for w in recent_workflows]
avg_workflow_duration = sum(durations) / len(durations)
step_counts = [w.step_count for w in recent_workflows]
avg_steps_per_workflow = sum(step_counts) / len(step_counts)
# Analyze workflow patterns
common_patterns = self._analyze_workflow_patterns(recent_workflows)
popular_sequences = self._analyze_estimator_sequences(recent_workflows)
complexity_distribution = self._analyze_workflow_complexity(recent_workflows)
return WorkflowSummary(
total_workflows=total_workflows,
unique_users=unique_users,
avg_workflow_duration=avg_workflow_duration,
avg_steps_per_workflow=avg_steps_per_workflow,
common_workflow_patterns=common_patterns,
popular_estimator_sequences=popular_sequences,
workflow_complexity_distribution=complexity_distribution,
)
[docs]
def _analyze_workflow_patterns(
self, workflows: List[Workflow]
) -> List[Tuple[List[str], int]]:
"""Analyze common workflow patterns"""
patterns = Counter()
for workflow in workflows:
# Extract step types as pattern
pattern = [step.step_type for step in workflow.steps]
patterns[tuple(pattern)] += 1
# Return top patterns
return patterns.most_common(10)
[docs]
def _analyze_estimator_sequences(
self, workflows: List[Workflow]
) -> List[Tuple[List[str], int]]:
"""Analyze popular estimator sequences"""
sequences = Counter()
for workflow in workflows:
# Extract estimator names as sequence
estimator_sequence = [
step.estimator_name for step in workflow.steps if step.estimator_name
]
if len(estimator_sequence) > 1: # Only sequences with multiple estimators
sequences[tuple(estimator_sequence)] += 1
# Return top sequences
return sequences.most_common(10)
[docs]
def _analyze_workflow_complexity(self, workflows: List[Workflow]) -> Dict[str, int]:
"""Analyze workflow complexity distribution"""
complexity_distribution = {
"simple": 0, # 1-2 steps
"moderate": 0, # 3-5 steps
"complex": 0, # 6-10 steps
"very_complex": 0, # 10+ steps
}
for workflow in workflows:
step_count = workflow.step_count
if step_count <= 2:
complexity_distribution["simple"] += 1
elif step_count <= 5:
complexity_distribution["moderate"] += 1
elif step_count <= 10:
complexity_distribution["complex"] += 1
else:
complexity_distribution["very_complex"] += 1
return complexity_distribution
[docs]
def get_user_workflow_patterns(
self, user_id: str, days: int = 30
) -> Dict[str, Any]:
"""Get workflow patterns for a specific user"""
cutoff_time = datetime.now() - timedelta(days=days)
with self._lock:
user_workflows = [
w
for w in self._workflows
if w.user_id == user_id
and datetime.fromisoformat(w.start_time) > cutoff_time
]
if not user_workflows:
return {}
# Analyze user-specific patterns
avg_duration = sum(w.total_duration for w in user_workflows) / len(
user_workflows
)
avg_steps = sum(w.step_count for w in user_workflows) / len(user_workflows)
# Most common step types
step_types = Counter()
for workflow in user_workflows:
for step in workflow.steps:
step_types[step.step_type] += 1
# Most common estimators
estimators = Counter()
for workflow in user_workflows:
for step in workflow.steps:
if step.estimator_name:
estimators[step.estimator_name] += 1
return {
"total_workflows": len(user_workflows),
"avg_duration": avg_duration,
"avg_steps": avg_steps,
"favorite_step_types": step_types.most_common(5),
"favorite_estimators": estimators.most_common(5),
}
[docs]
def get_workflow_optimization_recommendations(self, days: int = 30) -> List[str]:
"""Get recommendations for workflow optimization"""
summary = self.get_workflow_summary(days)
recommendations = []
# Analyze workflow duration
if summary.avg_workflow_duration > 300: # 5 minutes
recommendations.append(
"Workflows are taking a long time on average. Consider "
"implementing parallel processing or caching."
)
# Analyze workflow complexity
complex_workflows = summary.workflow_complexity_distribution.get(
"complex", 0
) + summary.workflow_complexity_distribution.get("very_complex", 0)
if complex_workflows > summary.total_workflows * 0.3: # 30% are complex
recommendations.append(
"Many workflows are complex. Consider creating workflow templates "
"or automated workflows for common tasks."
)
# Analyze step patterns
if summary.common_workflow_patterns:
most_common = summary.common_workflow_patterns[0]
if (
most_common[1] > summary.total_workflows * 0.5
): # 50% follow same pattern
recommendations.append(
f"Most workflows follow the same pattern: {most_common[0]}. "
"Consider creating a dedicated function for this workflow."
)
# Analyze estimator usage
if summary.popular_estimator_sequences:
most_popular = summary.popular_estimator_sequences[0]
if most_popular[1] > summary.total_workflows * 0.4: # 40% use same sequence
recommendations.append(
f"Many users use the same estimator sequence: {most_popular[0]}. "
"Consider creating a combined estimator or pipeline."
)
if not recommendations:
recommendations.append("No specific optimization opportunities detected.")
return recommendations
[docs]
def export_workflows(self, output_path: str, days: int = 30) -> None:
"""Export workflow data to file"""
cutoff_time = datetime.now() - timedelta(days=days)
with self._lock:
recent_workflows = [
w
for w in self._workflows
if datetime.fromisoformat(w.start_time) > cutoff_time
]
workflows_data = [asdict(w) for w in recent_workflows]
with open(output_path, "w") as f:
json.dump(workflows_data, f, indent=2)
[docs]
def get_feature_usage_analysis(self, days: int = 30) -> Dict[str, Any]:
"""Analyze feature usage patterns"""
cutoff_time = datetime.now() - timedelta(days=days)
with self._lock:
recent_workflows = [
w
for w in self._workflows
if datetime.fromisoformat(w.start_time) > cutoff_time
]
# Analyze feature usage
feature_usage = {
"estimators": Counter(),
"step_types": Counter(),
"parameter_combinations": Counter(),
"data_length_ranges": Counter(),
}
for workflow in recent_workflows:
for step in workflow.steps:
# Count step types
feature_usage["step_types"][step.step_type] += 1
# Count estimators
if step.estimator_name:
feature_usage["estimators"][step.estimator_name] += 1
# Count parameter combinations
if step.parameters:
param_key = tuple(sorted(step.parameters.items()))
feature_usage["parameter_combinations"][param_key] += 1
# Count data length ranges
if step.data_length > 0:
length_range = self._get_length_range(step.data_length)
feature_usage["data_length_ranges"][length_range] += 1
return {
"total_workflows": len(recent_workflows),
"feature_usage": feature_usage,
"top_estimators": feature_usage["estimators"].most_common(10),
"top_step_types": feature_usage["step_types"].most_common(10),
"top_parameter_combinations": feature_usage[
"parameter_combinations"
].most_common(10),
"data_length_distribution": dict(feature_usage["data_length_ranges"]),
}
[docs]
def _get_length_range(self, length: int) -> str:
"""Convert data length to range category"""
if length < 100:
return "<100"
elif length < 1000:
return "100-1000"
elif length < 10000:
return "1000-10000"
else:
return ">10000"
# Global workflow analyzer instance
_global_workflow_analyzer: Optional[WorkflowAnalyzer] = None
def get_workflow_analyzer() -> WorkflowAnalyzer:
"""Get the global workflow analyzer instance"""
global _global_workflow_analyzer
if _global_workflow_analyzer is None:
_global_workflow_analyzer = WorkflowAnalyzer()
return _global_workflow_analyzer
def track_workflow(step_type: str):
"""Decorator for tracking workflow steps"""
def decorator(func):
def wrapper(*args, **kwargs):
analyzer = get_workflow_analyzer()
# Start workflow session if not already started
session_id = f"session_{int(datetime.now().timestamp())}"
analyzer.start_workflow_session(session_id)
# Add workflow step
analyzer.add_workflow_step(
session_id=session_id,
step_type=step_type,
estimator_name=func.__name__ if hasattr(func, "__name__") else None,
parameters={k: str(v) for k, v in kwargs.items()},
data_length=len(args[0]) if args else 0,
)
try:
result = func(*args, **kwargs)
return result
finally:
# End workflow session
analyzer.end_workflow_session(session_id)
return wrapper
return decorator