import os import time import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from typing import Dict, List, Tuple import yaml import logging from tqdm import tqdm import json class TrafficTrainer: """ Advanced training framework for traffic signal optimization Includes comprehensive logging, evaluation, and analysis """ def __init__(self, config_path: str): # Load configuration with open(config_path, 'r') as f: self.config = yaml.safe_load(f) # Training parameters self.episodes = self.config['training']['episodes'] self.max_steps = self.config['training']['max_steps_per_episode'] self.save_freq = self.config['training']['save_freq'] self.eval_freq = self.config['training']['eval_freq'] self.log_freq = self.config['training']['log_freq'] # Setup logging and directories self.logger = self._setup_logger() self._setup_directories() # Training statistics self.training_history = { 'episodes': [], 'rewards': [], 'steps': [], 'epsilon': [], 'loss': [], 'eval_scores': [], 'metrics': [] } # Best model tracking self.best_reward = float('-inf') self.best_eval_score = float('-inf') def _setup_logger(self) -> logging.Logger: """Setup comprehensive logging""" logger = logging.getLogger('Trainer') logger.setLevel(logging.INFO) # Create file handler os.makedirs(self.config['paths']['logs'], exist_ok=True) fh = logging.FileHandler( os.path.join(self.config['paths']['logs'], 'training.log') ) fh.setLevel(logging.INFO) # Create console handler ch = logging.StreamHandler() ch.setLevel(logging.INFO) # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) fh.setFormatter(formatter) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger def _setup_directories(self): """Create necessary directories""" for path in self.config['paths'].values(): os.makedirs(path, exist_ok=True) # Create subdirectories os.makedirs(os.path.join(self.config['paths']['results'], 'plots'), exist_ok=True) os.makedirs(os.path.join(self.config['paths']['results'], 'analysis'), exist_ok=True) os.makedirs(os.path.join(self.config['paths']['models'], 'checkpoints'), exist_ok=True) def train(self, env, agent) -> Dict: """Main training loop with comprehensive monitoring""" self.logger.info("Starting advanced training...") self.logger.info(f"Configuration: {self.config['experiment']}") start_time = time.time() # Training loop for episode in tqdm(range(self.episodes), desc="Training Episodes"): episode_start_time = time.time() # Run episode episode_stats = self._run_episode(episode, env, agent) # Update training history self._update_training_history(episode, episode_stats) # Periodic evaluation if episode % self.eval_freq == 0 and episode > 0: eval_score = self._evaluate_agent(episode, env, agent) self.training_history['eval_scores'].append(eval_score) # Save best model based on evaluation if eval_score > self.best_eval_score: self.best_eval_score = eval_score self._save_best_model(agent, episode, "eval") # Periodic model saving if episode % self.save_freq == 0 and episode > 0: self._save_checkpoint(agent, episode) # Periodic logging if episode % self.log_freq == 0: self._log_progress(episode, episode_stats, time.time() - episode_start_time) # Early stopping check if self._should_early_stop(episode): self.logger.info(f"Early stopping at episode {episode}") break total_time = time.time() - start_time # Final evaluation and analysis final_stats = self._finalize_training(agent, total_time) self.logger.info("Training completed successfully!") return final_stats def _run_episode(self, episode: int, env, agent) -> Dict: """Run a single training episode""" state = env.reset() total_reward = 0 steps = 0 losses = [] for step in range(self.max_steps): # Agent action action = agent.act(state, training=True) # Environment step next_state, reward, done, info = env.step(action) # Store experience agent.remember(state, action, reward, next_state, done) # Train agent loss = agent.replay() if loss is not None: losses.append(loss) # Update state and metrics state = next_state total_reward += reward steps += 1 if done: break # Get episode summary episode_summary = env.get_episode_summary() # Compile episode statistics episode_stats = { 'reward': total_reward, 'steps': steps, 'average_loss': np.mean(losses) if losses else 0, 'epsilon': agent.epsilon, 'episode_summary': episode_summary, 'agent_stats': agent.get_training_stats() } return episode_stats def _update_training_history(self, episode: int, episode_stats: Dict): """Update comprehensive training history""" self.training_history['episodes'].append(episode) self.training_history['rewards'].append(episode_stats['reward']) self.training_history['steps'].append(episode_stats['steps']) self.training_history['epsilon'].append(episode_stats['epsilon']) self.training_history['loss'].append(episode_stats['average_loss']) self.training_history['metrics'].append(episode_stats['episode_summary']) # Update best reward if episode_stats['reward'] > self.best_reward: self.best_reward = episode_stats['reward'] def _evaluate_agent(self, episode: int, env, agent) -> float: """Evaluate agent performance""" self.logger.info(f"Evaluating agent at episode {episode}...") eval_episodes = self.config['evaluation']['test_episodes'] eval_rewards = [] eval_metrics = [] for eval_ep in range(eval_episodes): state = env.reset() total_reward = 0 for step in range(self.max_steps): action = agent.act(state, training=False) # No exploration next_state, reward, done, info = env.step(action) state = next_state total_reward += reward if done: break eval_rewards.append(total_reward) eval_metrics.append(env.get_episode_summary()) # Calculate evaluation score avg_reward = np.mean(eval_rewards) avg_delay = np.mean([m.get('average_delay', 0) for m in eval_metrics]) avg_throughput = np.mean([m.get('total_throughput', 0) for m in eval_metrics]) # Composite evaluation score eval_score = avg_reward - 0.1 * avg_delay + 0.01 * avg_throughput self.logger.info(f"Evaluation - Avg Reward: {avg_reward:.2f}, " f"Avg Delay: {avg_delay:.2f}, Score: {eval_score:.2f}") return eval_score def _save_checkpoint(self, agent, episode: int): """Save training checkpoint""" checkpoint_path = os.path.join( self.config['paths']['models'], 'checkpoints', f'checkpoint_episode_{episode}.pth' ) agent.save(checkpoint_path, episode) # Save training history history_path = os.path.join( self.config['paths']['results'], f'training_history_episode_{episode}.json' ) with open(history_path, 'w') as f: # Convert numpy arrays to lists for JSON serialization history_json = {} for key, value in self.training_history.items(): if key == 'metrics': history_json[key] = value # Keep as is for now else: history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value] json.dump(history_json, f, indent=2) def _save_best_model(self, agent, episode: int, criteria: str): """Save best performing model""" best_model_path = os.path.join( self.config['paths']['models'], f'best_model_{criteria}.pth' ) agent.save(best_model_path, episode) self.logger.info(f"New best model saved (criteria: {criteria}) at episode {episode}") def _log_progress(self, episode: int, episode_stats: Dict, episode_time: float): """Log detailed training progress""" recent_rewards = self.training_history['rewards'][-50:] avg_reward = np.mean(recent_rewards) self.logger.info( f"Episode {episode:4d} | " f"Reward: {episode_stats['reward']:8.2f} | " f"Avg(50): {avg_reward:8.2f} | " f"Steps: {episode_stats['steps']:4d} | " f"Epsilon: {episode_stats['epsilon']:.3f} | " f"Loss: {episode_stats['average_loss']:.4f} | " f"Time: {episode_time:.2f}s" ) # Log episode summary metrics summary = episode_stats['episode_summary'] if summary: self.logger.info( f" Metrics - Delay: {summary.get('average_delay', 0):.2f}s | " f"Queue: {summary.get('average_queue_length', 0):.1f} | " f"Throughput: {summary.get('total_throughput', 0):.0f} | " f"Fuel: {summary.get('fuel_efficiency', 0):.3f}L/veh" ) def _should_early_stop(self, episode: int) -> bool: """Check if training should stop early""" if episode < 100: # Minimum episodes before considering early stop return False # Check if reward has plateaued recent_rewards = self.training_history['rewards'][-50:] if len(recent_rewards) >= 50: improvement = np.mean(recent_rewards[-25:]) - np.mean(recent_rewards[:25]) if improvement < 1.0: # Less than 1.0 reward improvement return True return False def _finalize_training(self, agent, total_time: float) -> Dict: """Finalize training with comprehensive analysis""" self.logger.info("Finalizing training...") # Save final model final_model_path = os.path.join( self.config['paths']['models'], 'final_model.pth' ) agent.save(final_model_path, len(self.training_history['episodes'])) # Generate comprehensive plots self._generate_training_plots() # Save final training history final_history_path = os.path.join( self.config['paths']['results'], 'final_training_history.json' ) with open(final_history_path, 'w') as f: history_json = {} for key, value in self.training_history.items(): if key == 'metrics': history_json[key] = value else: history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value] json.dump(history_json, f, indent=2) # Compile final statistics final_stats = { 'total_episodes': len(self.training_history['episodes']), 'total_training_time': total_time, 'best_reward': self.best_reward, 'best_eval_score': self.best_eval_score, 'final_epsilon': agent.epsilon, 'average_reward_last_100': np.mean(self.training_history['rewards'][-100:]), 'training_efficiency': len(self.training_history['episodes']) / (total_time / 3600) # episodes per hour } # Save final stats stats_path = os.path.join( self.config['paths']['results'], 'final_training_stats.json' ) with open(stats_path, 'w') as f: json.dump(final_stats, f, indent=2, default=str) return final_stats def _generate_training_plots(self): """Generate comprehensive training visualization""" plt.style.use('seaborn-v0_8') # Create subplot layout fig, axes = plt.subplots(2, 3, figsize=(18, 12)) fig.suptitle('Advanced Traffic Signal RL Training Analysis', fontsize=16) episodes = self.training_history['episodes'] # 1. Reward progression axes[0, 0].plot(episodes, self.training_history['rewards'], alpha=0.7, label='Episode Reward') # Moving average if len(self.training_history['rewards']) > 50: moving_avg = pd.Series(self.training_history['rewards']).rolling(50).mean() axes[0, 0].plot(episodes, moving_avg, 'r-', linewidth=2, label='Moving Average (50)') axes[0, 0].set_title('Training Reward Progression') axes[0, 0].set_xlabel('Episode') axes[0, 0].set_ylabel('Reward') axes[0, 0].legend() axes[0, 0].grid(True, alpha=0.3) # 2. Loss progression valid_losses = [l for l in self.training_history['loss'] if l > 0] valid_episodes = episodes[:len(valid_losses)] if valid_losses: axes[0, 1].plot(valid_episodes, valid_losses, alpha=0.7) if len(valid_losses) > 20: loss_avg = pd.Series(valid_losses).rolling(20).mean() axes[0, 1].plot(valid_episodes, loss_avg, 'r-', linewidth=2) axes[0, 1].set_title('Training Loss') axes[0, 1].set_xlabel('Episode') axes[0, 1].set_ylabel('Loss') axes[0, 1].set_yscale('log') axes[0, 1].grid(True, alpha=0.3) # 3. Epsilon decay axes[0, 2].plot(episodes, self.training_history['epsilon']) axes[0, 2].set_title('Exploration Rate (Epsilon)') axes[0, 2].set_xlabel('Episode') axes[0, 2].set_ylabel('Epsilon') axes[0, 2].grid(True, alpha=0.3) # 4. Episode length axes[1, 0].plot(episodes, self.training_history['steps']) if len(self.training_history['steps']) > 20: steps_avg = pd.Series(self.training_history['steps']).rolling(20).mean() axes[1, 0].plot(episodes, steps_avg, 'r-', linewidth=2) axes[1, 0].set_title('Episode Length') axes[1, 0].set_xlabel('Episode') axes[1, 0].set_ylabel('Steps') axes[1, 0].grid(True, alpha=0.3) # 5. Evaluation scores if self.training_history['eval_scores']: eval_episodes = [i * self.eval_freq for i in range(len(self.training_history['eval_scores']))] axes[1, 1].plot(eval_episodes, self.training_history['eval_scores'], 'go-') axes[1, 1].set_title('Evaluation Scores') axes[1, 1].set_xlabel('Episode') axes[1, 1].set_ylabel('Eval Score') axes[1, 1].grid(True, alpha=0.3) # 6. Performance metrics over time if self.training_history['metrics']: delays = [m.get('average_delay', 0) for m in self.training_history['metrics'] if m] if delays: axes[1, 2].plot(episodes[:len(delays)], delays) axes[1, 2].set_title('Average Delay Over Time') axes[1, 2].set_xlabel('Episode') axes[1, 2].set_ylabel('Delay (s)') axes[1, 2].grid(True, alpha=0.3) plt.tight_layout() # Save plots plots_dir = os.path.join(self.config['paths']['results'], 'plots') plt.savefig(os.path.join(plots_dir, 'training_analysis.png'), dpi=300, bbox_inches='tight') plt.savefig(os.path.join(plots_dir, 'training_analysis.pdf'), bbox_inches='tight') plt.close() self.logger.info("Training plots generated successfully")