DTSO-Mtech_2025/2q

431 lines
17 KiB
Plaintext

import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple
import yaml
import logging
from tqdm import tqdm
import json
class TrafficTrainer:
"""
Advanced training framework for traffic signal optimization
Includes comprehensive logging, evaluation, and analysis
"""
def __init__(self, config_path: str):
# Load configuration
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Training parameters
self.episodes = self.config['training']['episodes']
self.max_steps = self.config['training']['max_steps_per_episode']
self.save_freq = self.config['training']['save_freq']
self.eval_freq = self.config['training']['eval_freq']
self.log_freq = self.config['training']['log_freq']
# Setup logging and directories
self.logger = self._setup_logger()
self._setup_directories()
# Training statistics
self.training_history = {
'episodes': [],
'rewards': [],
'steps': [],
'epsilon': [],
'loss': [],
'eval_scores': [],
'metrics': []
}
# Best model tracking
self.best_reward = float('-inf')
self.best_eval_score = float('-inf')
def _setup_logger(self) -> logging.Logger:
"""Setup comprehensive logging"""
logger = logging.getLogger('Trainer')
logger.setLevel(logging.INFO)
# Create file handler
os.makedirs(self.config['paths']['logs'], exist_ok=True)
fh = logging.FileHandler(
os.path.join(self.config['paths']['logs'], 'training.log')
)
fh.setLevel(logging.INFO)
# Create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def _setup_directories(self):
"""Create necessary directories"""
for path in self.config['paths'].values():
os.makedirs(path, exist_ok=True)
# Create subdirectories
os.makedirs(os.path.join(self.config['paths']['results'], 'plots'), exist_ok=True)
os.makedirs(os.path.join(self.config['paths']['results'], 'analysis'), exist_ok=True)
os.makedirs(os.path.join(self.config['paths']['models'], 'checkpoints'), exist_ok=True)
def train(self, env, agent) -> Dict:
"""Main training loop with comprehensive monitoring"""
self.logger.info("Starting advanced training...")
self.logger.info(f"Configuration: {self.config['experiment']}")
start_time = time.time()
# Training loop
for episode in tqdm(range(self.episodes), desc="Training Episodes"):
episode_start_time = time.time()
# Run episode
episode_stats = self._run_episode(episode, env, agent)
# Update training history
self._update_training_history(episode, episode_stats)
# Periodic evaluation
if episode % self.eval_freq == 0 and episode > 0:
eval_score = self._evaluate_agent(episode, env, agent)
self.training_history['eval_scores'].append(eval_score)
# Save best model based on evaluation
if eval_score > self.best_eval_score:
self.best_eval_score = eval_score
self._save_best_model(agent, episode, "eval")
# Periodic model saving
if episode % self.save_freq == 0 and episode > 0:
self._save_checkpoint(agent, episode)
# Periodic logging
if episode % self.log_freq == 0:
self._log_progress(episode, episode_stats, time.time() - episode_start_time)
# Early stopping check
if self._should_early_stop(episode):
self.logger.info(f"Early stopping at episode {episode}")
break
total_time = time.time() - start_time
# Final evaluation and analysis
final_stats = self._finalize_training(agent, total_time)
self.logger.info("Training completed successfully!")
return final_stats
def _run_episode(self, episode: int, env, agent) -> Dict:
"""Run a single training episode"""
state = env.reset()
total_reward = 0
steps = 0
losses = []
for step in range(self.max_steps):
# Agent action
action = agent.act(state, training=True)
# Environment step
next_state, reward, done, info = env.step(action)
# Store experience
agent.remember(state, action, reward, next_state, done)
# Train agent
loss = agent.replay()
if loss is not None:
losses.append(loss)
# Update state and metrics
state = next_state
total_reward += reward
steps += 1
if done:
break
# Get episode summary
episode_summary = env.get_episode_summary()
# Compile episode statistics
episode_stats = {
'reward': total_reward,
'steps': steps,
'average_loss': np.mean(losses) if losses else 0,
'epsilon': agent.epsilon,
'episode_summary': episode_summary,
'agent_stats': agent.get_training_stats()
}
return episode_stats
def _update_training_history(self, episode: int, episode_stats: Dict):
"""Update comprehensive training history"""
self.training_history['episodes'].append(episode)
self.training_history['rewards'].append(episode_stats['reward'])
self.training_history['steps'].append(episode_stats['steps'])
self.training_history['epsilon'].append(episode_stats['epsilon'])
self.training_history['loss'].append(episode_stats['average_loss'])
self.training_history['metrics'].append(episode_stats['episode_summary'])
# Update best reward
if episode_stats['reward'] > self.best_reward:
self.best_reward = episode_stats['reward']
def _evaluate_agent(self, episode: int, env, agent) -> float:
"""Evaluate agent performance"""
self.logger.info(f"Evaluating agent at episode {episode}...")
eval_episodes = self.config['evaluation']['test_episodes']
eval_rewards = []
eval_metrics = []
for eval_ep in range(eval_episodes):
state = env.reset()
total_reward = 0
for step in range(self.max_steps):
action = agent.act(state, training=False) # No exploration
next_state, reward, done, info = env.step(action)
state = next_state
total_reward += reward
if done:
break
eval_rewards.append(total_reward)
eval_metrics.append(env.get_episode_summary())
# Calculate evaluation score
avg_reward = np.mean(eval_rewards)
avg_delay = np.mean([m.get('average_delay', 0) for m in eval_metrics])
avg_throughput = np.mean([m.get('total_throughput', 0) for m in eval_metrics])
# Composite evaluation score
eval_score = avg_reward - 0.1 * avg_delay + 0.01 * avg_throughput
self.logger.info(f"Evaluation - Avg Reward: {avg_reward:.2f}, "
f"Avg Delay: {avg_delay:.2f}, Score: {eval_score:.2f}")
return eval_score
def _save_checkpoint(self, agent, episode: int):
"""Save training checkpoint"""
checkpoint_path = os.path.join(
self.config['paths']['models'], 'checkpoints',
f'checkpoint_episode_{episode}.pth'
)
agent.save(checkpoint_path, episode)
# Save training history
history_path = os.path.join(
self.config['paths']['results'],
f'training_history_episode_{episode}.json'
)
with open(history_path, 'w') as f:
# Convert numpy arrays to lists for JSON serialization
history_json = {}
for key, value in self.training_history.items():
if key == 'metrics':
history_json[key] = value # Keep as is for now
else:
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
json.dump(history_json, f, indent=2)
def _save_best_model(self, agent, episode: int, criteria: str):
"""Save best performing model"""
best_model_path = os.path.join(
self.config['paths']['models'],
f'best_model_{criteria}.pth'
)
agent.save(best_model_path, episode)
self.logger.info(f"New best model saved (criteria: {criteria}) at episode {episode}")
def _log_progress(self, episode: int, episode_stats: Dict, episode_time: float):
"""Log detailed training progress"""
recent_rewards = self.training_history['rewards'][-50:]
avg_reward = np.mean(recent_rewards)
self.logger.info(
f"Episode {episode:4d} | "
f"Reward: {episode_stats['reward']:8.2f} | "
f"Avg(50): {avg_reward:8.2f} | "
f"Steps: {episode_stats['steps']:4d} | "
f"Epsilon: {episode_stats['epsilon']:.3f} | "
f"Loss: {episode_stats['average_loss']:.4f} | "
f"Time: {episode_time:.2f}s"
)
# Log episode summary metrics
summary = episode_stats['episode_summary']
if summary:
self.logger.info(
f" Metrics - Delay: {summary.get('average_delay', 0):.2f}s | "
f"Queue: {summary.get('average_queue_length', 0):.1f} | "
f"Throughput: {summary.get('total_throughput', 0):.0f} | "
f"Fuel: {summary.get('fuel_efficiency', 0):.3f}L/veh"
)
def _should_early_stop(self, episode: int) -> bool:
"""Check if training should stop early"""
if episode < 100: # Minimum episodes before considering early stop
return False
# Check if reward has plateaued
recent_rewards = self.training_history['rewards'][-50:]
if len(recent_rewards) >= 50:
improvement = np.mean(recent_rewards[-25:]) - np.mean(recent_rewards[:25])
if improvement < 1.0: # Less than 1.0 reward improvement
return True
return False
def _finalize_training(self, agent, total_time: float) -> Dict:
"""Finalize training with comprehensive analysis"""
self.logger.info("Finalizing training...")
# Save final model
final_model_path = os.path.join(
self.config['paths']['models'], 'final_model.pth'
)
agent.save(final_model_path, len(self.training_history['episodes']))
# Generate comprehensive plots
self._generate_training_plots()
# Save final training history
final_history_path = os.path.join(
self.config['paths']['results'], 'final_training_history.json'
)
with open(final_history_path, 'w') as f:
history_json = {}
for key, value in self.training_history.items():
if key == 'metrics':
history_json[key] = value
else:
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
json.dump(history_json, f, indent=2)
# Compile final statistics
final_stats = {
'total_episodes': len(self.training_history['episodes']),
'total_training_time': total_time,
'best_reward': self.best_reward,
'best_eval_score': self.best_eval_score,
'final_epsilon': agent.epsilon,
'average_reward_last_100': np.mean(self.training_history['rewards'][-100:]),
'training_efficiency': len(self.training_history['episodes']) / (total_time / 3600) # episodes per hour
}
# Save final stats
stats_path = os.path.join(
self.config['paths']['results'], 'final_training_stats.json'
)
with open(stats_path, 'w') as f:
json.dump(final_stats, f, indent=2, default=str)
return final_stats
def _generate_training_plots(self):
"""Generate comprehensive training visualization"""
plt.style.use('seaborn-v0_8')
# Create subplot layout
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Advanced Traffic Signal RL Training Analysis', fontsize=16)
episodes = self.training_history['episodes']
# 1. Reward progression
axes[0, 0].plot(episodes, self.training_history['rewards'], alpha=0.7, label='Episode Reward')
# Moving average
if len(self.training_history['rewards']) > 50:
moving_avg = pd.Series(self.training_history['rewards']).rolling(50).mean()
axes[0, 0].plot(episodes, moving_avg, 'r-', linewidth=2, label='Moving Average (50)')
axes[0, 0].set_title('Training Reward Progression')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Reward')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. Loss progression
valid_losses = [l for l in self.training_history['loss'] if l > 0]
valid_episodes = episodes[:len(valid_losses)]
if valid_losses:
axes[0, 1].plot(valid_episodes, valid_losses, alpha=0.7)
if len(valid_losses) > 20:
loss_avg = pd.Series(valid_losses).rolling(20).mean()
axes[0, 1].plot(valid_episodes, loss_avg, 'r-', linewidth=2)
axes[0, 1].set_title('Training Loss')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Loss')
axes[0, 1].set_yscale('log')
axes[0, 1].grid(True, alpha=0.3)
# 3. Epsilon decay
axes[0, 2].plot(episodes, self.training_history['epsilon'])
axes[0, 2].set_title('Exploration Rate (Epsilon)')
axes[0, 2].set_xlabel('Episode')
axes[0, 2].set_ylabel('Epsilon')
axes[0, 2].grid(True, alpha=0.3)
# 4. Episode length
axes[1, 0].plot(episodes, self.training_history['steps'])
if len(self.training_history['steps']) > 20:
steps_avg = pd.Series(self.training_history['steps']).rolling(20).mean()
axes[1, 0].plot(episodes, steps_avg, 'r-', linewidth=2)
axes[1, 0].set_title('Episode Length')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Steps')
axes[1, 0].grid(True, alpha=0.3)
# 5. Evaluation scores
if self.training_history['eval_scores']:
eval_episodes = [i * self.eval_freq for i in range(len(self.training_history['eval_scores']))]
axes[1, 1].plot(eval_episodes, self.training_history['eval_scores'], 'go-')
axes[1, 1].set_title('Evaluation Scores')
axes[1, 1].set_xlabel('Episode')
axes[1, 1].set_ylabel('Eval Score')
axes[1, 1].grid(True, alpha=0.3)
# 6. Performance metrics over time
if self.training_history['metrics']:
delays = [m.get('average_delay', 0) for m in self.training_history['metrics'] if m]
if delays:
axes[1, 2].plot(episodes[:len(delays)], delays)
axes[1, 2].set_title('Average Delay Over Time')
axes[1, 2].set_xlabel('Episode')
axes[1, 2].set_ylabel('Delay (s)')
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
# Save plots
plots_dir = os.path.join(self.config['paths']['results'], 'plots')
plt.savefig(os.path.join(plots_dir, 'training_analysis.png'), dpi=300, bbox_inches='tight')
plt.savefig(os.path.join(plots_dir, 'training_analysis.pdf'), bbox_inches='tight')
plt.close()
self.logger.info("Training plots generated successfully")