"""
Neural Network Factory for Hurst Parameter Estimation
This module provides a factory for creating various neural network architectures
suitable for benchmarking Hurst parameter estimation in time series data.
"""
import json
import logging
import os
import pickle
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# Import utility function for package data paths
try:
from ...utils import get_neural_network_model_path
except ImportError:
# Fallback for development
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent.parent))
from utils import get_neural_network_model_path
logger = logging.getLogger(__name__)
[docs]
class NNArchitecture(Enum):
"""Available neural network architectures."""
FFN = "feedforward"
CNN = "convolutional"
LSTM = "lstm"
BILSTM = "bidirectional_lstm"
GRU = "gru"
TRANSFORMER = "transformer"
HYBRID_CNN_LSTM = "hybrid_cnn_lstm"
RESNET = "resnet"
[docs]
@dataclass
class NNConfig:
"""Configuration for neural network architecture."""
architecture: NNArchitecture
input_length: int
hidden_dims: List[int] = None
dropout_rate: float = 0.2
learning_rate: float = 0.001
batch_size: int = 32
epochs: int = 50
activation: str = "relu"
optimizer: str = "adam"
weight_decay: float = 1e-4
# Architecture-specific parameters
conv_filters: int = 64
conv_kernel_size: int = 3
lstm_units: int = 64
transformer_heads: int = 8
transformer_layers: int = 2
resnet_blocks: int = 2
def __post_init__(self):
if self.hidden_dims is None:
self.hidden_dims = [64, 32]
[docs]
class BaseNeuralNetwork(nn.Module):
"""Base class for all neural network architectures with train-once, apply-many workflow."""
[docs]
def __init__(self, config: NNConfig, model_name: str = None):
super().__init__()
self.config = config
self.device = self._get_optimal_device()
self.is_trained = False
self.training_history = None
self.model_name = model_name or self.__class__.__name__.lower()
self.model_dir = "models"
os.makedirs(self.model_dir, exist_ok=True)
# Don't move to device here, let subclasses handle it
[docs]
def _get_optimal_device(self):
"""Select optimal device with CUDA compatibility check."""
if torch.cuda.is_available():
try:
# Test CUDA with small operation
device = torch.device("cuda")
test_tensor = torch.zeros(1).to(device)
result = test_tensor + 1 # Test operation
_ = result.cpu() # Move result back to CPU to test full pipeline
logger.info(f"CUDA device selected: {torch.cuda.get_device_name()}")
return device
except RuntimeError as e:
if "CUDA" in str(e) or "kernel image" in str(e):
logger.warning(
f"CUDA available but incompatible: {e}. Falling back to CPU."
)
return torch.device("cpu")
else:
logger.warning(
f"CUDA test failed with unexpected error: {e}. Falling back to CPU."
)
return torch.device("cpu")
except Exception as e:
logger.warning(
f"CUDA test failed with exception: {e}. Falling back to CPU."
)
return torch.device("cpu")
logger.info("CUDA not available, using CPU device")
return torch.device("cpu")
[docs]
def forward(self, x):
raise NotImplementedError("Subclasses must implement forward method")
[docs]
def predict(self, x: np.ndarray, batch_size: int = 32) -> np.ndarray:
"""Make predictions on new data with automatic device handling."""
if not self.is_trained:
raise ValueError("Model must be trained before making predictions")
self.eval()
# Convert to PyTorch tensor and ensure correct device
if isinstance(x, np.ndarray):
x = torch.FloatTensor(x)
if x.dim() == 1:
x = x.unsqueeze(0)
n_samples = x.shape[0]
predictions = []
# Process in batches to avoid GPU memory issues
for i in range(0, n_samples, batch_size):
batch_end = min(i + batch_size, n_samples)
batch_x = x[i:batch_end]
with torch.no_grad():
# Ensure data is on correct device
batch_x = batch_x.to(self.device)
# Ensure correct input shape
if len(batch_x.shape) == 1:
batch_x = batch_x.unsqueeze(0) # Add batch dimension
elif len(batch_x.shape) == 2:
# For 2D input (batch, sequence_length), we need to add feature dimension
# This will be handled by the individual network's forward method
pass
batch_predictions = self.forward(batch_x)
predictions.extend(batch_predictions.cpu().numpy().flatten())
# Clear GPU memory
del batch_predictions
if torch.cuda.is_available():
torch.cuda.empty_cache()
return np.array(predictions)
[docs]
def train_model(
self, X: np.ndarray, y: np.ndarray, validation_split: float = 0.2
) -> Dict[str, List[float]]:
"""Train the neural network model."""
# Convert to PyTorch tensors
X_tensor = torch.FloatTensor(X).to(self.device)
y_tensor = torch.FloatTensor(y).to(self.device)
# Create data loaders
dataset = torch.utils.data.TensorDataset(X_tensor, y_tensor)
train_size = int((1 - validation_split) * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(
dataset, [train_size, val_size]
)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=self.config.batch_size, shuffle=True
)
val_loader = torch.utils.data.DataLoader(
val_dataset, batch_size=self.config.batch_size, shuffle=False
)
# Setup optimizer and loss
optimizer = self._get_optimizer()
criterion = nn.MSELoss()
# Training history
history = {"train_loss": [], "val_loss": []}
# Training loop
for epoch in range(self.config.epochs):
# Training
self.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = self.forward(batch_X)
loss = criterion(outputs.squeeze(), batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
self.eval()
val_loss = 0.0
with torch.no_grad():
for batch_X, batch_y in val_loader:
outputs = self.forward(batch_X)
loss = criterion(outputs.squeeze(), batch_y)
val_loss += loss.item()
# Record history
avg_train_loss = train_loss / len(train_loader)
avg_val_loss = val_loss / len(val_loader)
history["train_loss"].append(avg_train_loss)
history["val_loss"].append(avg_val_loss)
if epoch % 10 == 0:
logger.info(
f"Epoch {epoch}: Train Loss = {avg_train_loss:.4f}, Val Loss = {avg_val_loss:.4f}"
)
# Mark as trained and save model
self.is_trained = True
self.training_history = history
self.save_model()
logger.info(
f"Training completed: Final Train Loss = {avg_train_loss:.4f}, Val Loss = {avg_val_loss:.4f}"
)
return history
[docs]
def _get_optimizer(self):
"""Get optimizer based on configuration."""
if self.config.optimizer.lower() == "adam":
return torch.optim.Adam(
self.parameters(),
lr=self.config.learning_rate,
weight_decay=self.config.weight_decay,
)
elif self.config.optimizer.lower() == "sgd":
return torch.optim.SGD(
self.parameters(),
lr=self.config.learning_rate,
weight_decay=self.config.weight_decay,
)
else:
return torch.optim.Adam(
self.parameters(),
lr=self.config.learning_rate,
weight_decay=self.config.weight_decay,
)
[docs]
def save_model(self):
"""Save the trained model and configuration."""
if not self.is_trained:
logger.warning("Cannot save untrained model")
return
model_path = os.path.join(
self.model_dir, f"{self.model_name}_neural_network.pth"
)
config_path = os.path.join(
self.model_dir, f"{self.model_name}_neural_network_config.json"
)
# Save model state
torch.save(
{
"model_state_dict": self.state_dict(),
"config": self.config.__dict__,
"training_history": self.training_history,
"is_trained": self.is_trained,
},
model_path,
)
# Save config separately for easy loading
config_dict = {
"architecture": self.config.architecture.value,
"input_length": self.config.input_length,
"hidden_dims": self.config.hidden_dims,
"dropout_rate": self.config.dropout_rate,
"learning_rate": self.config.learning_rate,
"batch_size": self.config.batch_size,
"epochs": self.config.epochs,
"activation": self.config.activation,
"optimizer": self.config.optimizer,
"weight_decay": self.config.weight_decay,
"conv_filters": self.config.conv_filters,
"conv_kernel_size": self.config.conv_kernel_size,
"lstm_units": self.config.lstm_units,
"transformer_heads": self.config.transformer_heads,
"transformer_layers": self.config.transformer_layers,
"resnet_blocks": self.config.resnet_blocks,
}
with open(config_path, "w") as f:
json.dump(config_dict, f, indent=2)
logger.info(f"Model saved to {model_path}")
[docs]
def load_model(self, model_path: str = None):
"""Load a trained model with automatic device handling."""
if model_path is None:
# Try to get pretrained model from package first
(
pretrained_model_path,
pretrained_config_path,
) = get_neural_network_model_path(self.model_name)
if pretrained_model_path:
model_path = pretrained_model_path
logger.info(
f"Using pretrained {self.model_name} model from package: {model_path}"
)
else:
# Fallback to local model directory
model_path = os.path.join(
self.model_dir, f"{self.model_name}_neural_network.pth"
)
logger.info(f"Using local {self.model_name} model path: {model_path}")
if not os.path.exists(model_path):
logger.warning(f"Model file not found: {model_path}")
return False
try:
# Load with automatic device mapping
checkpoint = torch.load(
model_path, map_location=self.device, weights_only=False
)
self.load_state_dict(checkpoint["model_state_dict"])
self.to(self.device)
self.training_history = checkpoint["training_history"]
self.is_trained = checkpoint["is_trained"]
logger.info(f"Model loaded from {model_path} on device {self.device}")
return True
except Exception as e:
logger.error(f"Failed to load model: {e}")
return False
[docs]
def is_model_trained(self) -> bool:
"""Check if model is trained."""
return self.is_trained
[docs]
class FeedforwardNetwork(BaseNeuralNetwork):
"""Basic feedforward neural network."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
layers = []
input_dim = config.input_length
for i, hidden_dim in enumerate(config.hidden_dims):
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.ReLU() if config.activation == "relu" else nn.Tanh())
layers.append(nn.Dropout(config.dropout_rate))
input_dim = hidden_dim
layers.append(nn.Linear(input_dim, 1))
self.network = nn.Sequential(*layers)
self.to(self.device)
[docs]
def forward(self, x):
return self.network(x)
[docs]
class ConvolutionalNetwork(BaseNeuralNetwork):
"""Convolutional neural network for time series."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
self.conv1 = nn.Conv1d(
1, config.conv_filters, config.conv_kernel_size, padding=1
)
self.pool1 = nn.MaxPool1d(2)
self.conv2 = nn.Conv1d(
config.conv_filters,
config.conv_filters * 2,
config.conv_kernel_size,
padding=1,
)
self.pool2 = nn.MaxPool1d(2)
# Calculate the size after convolutions
conv_output_size = self._get_conv_output_size(config.input_length)
self.fc1 = nn.Linear(conv_output_size, config.hidden_dims[0])
self.dropout = nn.Dropout(config.dropout_rate)
self.fc2 = nn.Linear(config.hidden_dims[0], 1)
self.to(self.device)
[docs]
def _get_conv_output_size(self, input_length):
"""Calculate the output size after convolutions."""
# Simulate forward pass to get output size
x = torch.zeros(1, 1, input_length)
x = self.pool1(F.relu(self.conv1(x)))
x = self.pool2(F.relu(self.conv2(x)))
return x.view(1, -1).size(1)
[docs]
def forward(self, x):
# Ensure input has correct shape: (batch, channels, length)
if len(x.shape) == 2:
x = x.unsqueeze(1) # Add channel dimension
x = self.pool1(F.relu(self.conv1(x)))
x = self.pool2(F.relu(self.conv2(x)))
x = x.view(x.size(0), -1) # Flatten
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
[docs]
class LSTMNetwork(BaseNeuralNetwork):
"""LSTM neural network for time series."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
self.lstm = nn.LSTM(
input_size=1,
hidden_size=config.lstm_units,
batch_first=True,
dropout=config.dropout_rate,
)
self.dropout = nn.Dropout(config.dropout_rate)
self.fc = nn.Linear(config.lstm_units, 1)
self.to(self.device)
[docs]
def forward(self, x):
# Ensure input has correct shape: (batch, length, features)
if len(x.shape) == 2:
x = x.unsqueeze(-1) # Add feature dimension
lstm_out, _ = self.lstm(x)
# Take the last output
last_output = lstm_out[:, -1, :]
x = self.dropout(last_output)
x = self.fc(x)
return x
[docs]
class BidirectionalLSTMNetwork(BaseNeuralNetwork):
"""Bidirectional LSTM neural network."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
self.lstm = nn.LSTM(
input_size=1,
hidden_size=config.lstm_units,
batch_first=True,
dropout=config.dropout_rate,
bidirectional=True,
)
self.dropout = nn.Dropout(config.dropout_rate)
self.fc = nn.Linear(config.lstm_units * 2, 1) # *2 for bidirectional
self.to(self.device)
[docs]
def forward(self, x):
if len(x.shape) == 2:
x = x.unsqueeze(-1)
lstm_out, _ = self.lstm(x)
last_output = lstm_out[:, -1, :]
x = self.dropout(last_output)
x = self.fc(x)
return x
[docs]
class GRUNetwork(BaseNeuralNetwork):
"""GRU neural network for time series."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
self.gru = nn.GRU(
input_size=1,
hidden_size=config.lstm_units,
batch_first=True,
dropout=config.dropout_rate,
)
self.dropout = nn.Dropout(config.dropout_rate)
self.fc = nn.Linear(config.lstm_units, 1)
self.to(self.device)
[docs]
def forward(self, x):
if len(x.shape) == 2:
x = x.unsqueeze(-1)
gru_out, _ = self.gru(x)
last_output = gru_out[:, -1, :]
x = self.dropout(last_output)
x = self.fc(x)
return x
[docs]
class HybridCNNLSTMNetwork(BaseNeuralNetwork):
"""Hybrid CNN-LSTM network."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
self.conv1 = nn.Conv1d(
1, config.conv_filters, config.conv_kernel_size, padding=1
)
self.pool1 = nn.MaxPool1d(2)
self.conv2 = nn.Conv1d(
config.conv_filters,
config.conv_filters * 2,
config.conv_kernel_size,
padding=1,
)
self.pool2 = nn.MaxPool1d(2)
# Calculate LSTM input size
lstm_input_size = self._get_lstm_input_size(config.input_length)
self.lstm = nn.LSTM(
input_size=lstm_input_size,
hidden_size=config.lstm_units,
batch_first=True,
dropout=config.dropout_rate,
)
self.dropout = nn.Dropout(config.dropout_rate)
self.fc = nn.Linear(config.lstm_units, 1)
self.to(self.device)
[docs]
def forward(self, x):
if len(x.shape) == 2:
x = x.unsqueeze(1)
# CNN layers
x = self.pool1(F.relu(self.conv1(x)))
x = self.pool2(F.relu(self.conv2(x)))
# Reshape for LSTM: (batch, length, features)
x = x.permute(0, 2, 1)
x = x.contiguous().view(x.size(0), x.size(1), -1)
# LSTM layers
lstm_out, _ = self.lstm(x)
last_output = lstm_out[:, -1, :]
x = self.dropout(last_output)
x = self.fc(x)
return x
[docs]
class ResNetBlock(nn.Module):
"""Residual block for ResNet architecture."""
[docs]
def __init__(self, in_channels, out_channels, kernel_size=3):
super().__init__()
self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, padding=1)
self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, padding=1)
self.bn1 = nn.BatchNorm1d(out_channels)
self.bn2 = nn.BatchNorm1d(out_channels)
# Skip connection
self.skip = (
nn.Conv1d(in_channels, out_channels, 1)
if in_channels != out_channels
else nn.Identity()
)
[docs]
def forward(self, x):
residual = self.skip(x)
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += residual
out = F.relu(out)
return out
[docs]
class ResNetNetwork(BaseNeuralNetwork):
"""ResNet architecture for time series."""
[docs]
def __init__(self, config: NNConfig):
super().__init__(config)
self.conv1 = nn.Conv1d(1, 64, 7, padding=3)
self.bn1 = nn.BatchNorm1d(64)
self.pool1 = nn.MaxPool1d(2)
# Residual blocks
self.blocks = nn.ModuleList()
in_channels = 64
for i in range(config.resnet_blocks):
out_channels = 64 * (2**i)
self.blocks.append(ResNetBlock(in_channels, out_channels))
in_channels = out_channels
self.global_pool = nn.AdaptiveAvgPool1d(1)
self.dropout = nn.Dropout(config.dropout_rate)
self.fc = nn.Linear(in_channels, 1)
self.to(self.device)
[docs]
def forward(self, x):
if len(x.shape) == 2:
x = x.unsqueeze(1)
x = F.relu(self.bn1(self.conv1(x)))
x = self.pool1(x)
for block in self.blocks:
x = block(x)
x = self.global_pool(x)
x = x.view(x.size(0), -1)
x = self.dropout(x)
x = self.fc(x)
return x
[docs]
class NeuralNetworkFactory:
"""Factory for creating neural network architectures."""
_architectures = {
NNArchitecture.FFN: FeedforwardNetwork,
NNArchitecture.CNN: ConvolutionalNetwork,
NNArchitecture.LSTM: LSTMNetwork,
NNArchitecture.BILSTM: BidirectionalLSTMNetwork,
NNArchitecture.GRU: GRUNetwork,
NNArchitecture.TRANSFORMER: TransformerNetwork,
NNArchitecture.HYBRID_CNN_LSTM: HybridCNNLSTMNetwork,
NNArchitecture.RESNET: ResNetNetwork,
}
[docs]
@classmethod
def create_network(cls, config: NNConfig) -> BaseNeuralNetwork:
"""Create a neural network with the specified architecture."""
if config.architecture not in cls._architectures:
raise ValueError(f"Unknown architecture: {config.architecture}")
network_class = cls._architectures[config.architecture]
return network_class(config)
[docs]
@classmethod
def get_available_architectures(cls) -> List[NNArchitecture]:
"""Get list of available architectures."""
return list(cls._architectures.keys())
[docs]
@classmethod
def create_benchmark_networks(
cls, input_length: int, architectures: Optional[List[NNArchitecture]] = None
) -> Dict[str, BaseNeuralNetwork]:
"""Create a set of networks for benchmarking."""
if architectures is None:
architectures = list(NNArchitecture)
networks = {}
for arch in architectures:
config = NNConfig(
architecture=arch,
input_length=input_length,
hidden_dims=[64, 32],
dropout_rate=0.2,
learning_rate=0.001,
epochs=50,
)
# Pass model name to the network
network = cls.create_network(config)
network.model_name = arch.value
networks[arch.value] = network
return networks
# Convenience functions
[docs]
def create_feedforward_network(
input_length: int, hidden_dims: List[int] = [64, 32]
) -> BaseNeuralNetwork:
"""Create a feedforward neural network."""
config = NNConfig(
architecture=NNArchitecture.FFN,
input_length=input_length,
hidden_dims=hidden_dims,
)
return NeuralNetworkFactory.create_network(config)
[docs]
def create_cnn_network(input_length: int, conv_filters: int = 64) -> BaseNeuralNetwork:
"""Create a convolutional neural network."""
config = NNConfig(
architecture=NNArchitecture.CNN,
input_length=input_length,
conv_filters=conv_filters,
)
return NeuralNetworkFactory.create_network(config)
[docs]
def create_lstm_network(input_length: int, lstm_units: int = 64) -> BaseNeuralNetwork:
"""Create an LSTM neural network."""
config = NNConfig(
architecture=NNArchitecture.LSTM,
input_length=input_length,
lstm_units=lstm_units,
)
return NeuralNetworkFactory.create_network(config)
[docs]
def create_all_benchmark_networks(input_length: int) -> Dict[str, BaseNeuralNetwork]:
"""Create all available neural network architectures for benchmarking."""
return NeuralNetworkFactory.create_benchmark_networks(input_length)