431 lines
17 KiB
Plaintext
431 lines
17 KiB
Plaintext
import os
|
|
import time
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from typing import Dict, List, Tuple
|
|
import yaml
|
|
import logging
|
|
from tqdm import tqdm
|
|
import json
|
|
|
|
class TrafficTrainer:
|
|
"""
|
|
Advanced training framework for traffic signal optimization
|
|
Includes comprehensive logging, evaluation, and analysis
|
|
"""
|
|
|
|
def __init__(self, config_path: str):
|
|
# Load configuration
|
|
with open(config_path, 'r') as f:
|
|
self.config = yaml.safe_load(f)
|
|
|
|
# Training parameters
|
|
self.episodes = self.config['training']['episodes']
|
|
self.max_steps = self.config['training']['max_steps_per_episode']
|
|
self.save_freq = self.config['training']['save_freq']
|
|
self.eval_freq = self.config['training']['eval_freq']
|
|
self.log_freq = self.config['training']['log_freq']
|
|
|
|
# Setup logging and directories
|
|
self.logger = self._setup_logger()
|
|
self._setup_directories()
|
|
|
|
# Training statistics
|
|
self.training_history = {
|
|
'episodes': [],
|
|
'rewards': [],
|
|
'steps': [],
|
|
'epsilon': [],
|
|
'loss': [],
|
|
'eval_scores': [],
|
|
'metrics': []
|
|
}
|
|
|
|
# Best model tracking
|
|
self.best_reward = float('-inf')
|
|
self.best_eval_score = float('-inf')
|
|
|
|
def _setup_logger(self) -> logging.Logger:
|
|
"""Setup comprehensive logging"""
|
|
logger = logging.getLogger('Trainer')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Create file handler
|
|
os.makedirs(self.config['paths']['logs'], exist_ok=True)
|
|
fh = logging.FileHandler(
|
|
os.path.join(self.config['paths']['logs'], 'training.log')
|
|
)
|
|
fh.setLevel(logging.INFO)
|
|
|
|
# Create console handler
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.INFO)
|
|
|
|
# Create formatter
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
fh.setFormatter(formatter)
|
|
ch.setFormatter(formatter)
|
|
|
|
logger.addHandler(fh)
|
|
logger.addHandler(ch)
|
|
|
|
return logger
|
|
|
|
def _setup_directories(self):
|
|
"""Create necessary directories"""
|
|
for path in self.config['paths'].values():
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
# Create subdirectories
|
|
os.makedirs(os.path.join(self.config['paths']['results'], 'plots'), exist_ok=True)
|
|
os.makedirs(os.path.join(self.config['paths']['results'], 'analysis'), exist_ok=True)
|
|
os.makedirs(os.path.join(self.config['paths']['models'], 'checkpoints'), exist_ok=True)
|
|
|
|
def train(self, env, agent) -> Dict:
|
|
"""Main training loop with comprehensive monitoring"""
|
|
|
|
self.logger.info("Starting advanced training...")
|
|
self.logger.info(f"Configuration: {self.config['experiment']}")
|
|
|
|
start_time = time.time()
|
|
|
|
# Training loop
|
|
for episode in tqdm(range(self.episodes), desc="Training Episodes"):
|
|
episode_start_time = time.time()
|
|
|
|
# Run episode
|
|
episode_stats = self._run_episode(episode, env, agent)
|
|
|
|
# Update training history
|
|
self._update_training_history(episode, episode_stats)
|
|
|
|
# Periodic evaluation
|
|
if episode % self.eval_freq == 0 and episode > 0:
|
|
eval_score = self._evaluate_agent(episode, env, agent)
|
|
self.training_history['eval_scores'].append(eval_score)
|
|
|
|
# Save best model based on evaluation
|
|
if eval_score > self.best_eval_score:
|
|
self.best_eval_score = eval_score
|
|
self._save_best_model(agent, episode, "eval")
|
|
|
|
# Periodic model saving
|
|
if episode % self.save_freq == 0 and episode > 0:
|
|
self._save_checkpoint(agent, episode)
|
|
|
|
# Periodic logging
|
|
if episode % self.log_freq == 0:
|
|
self._log_progress(episode, episode_stats, time.time() - episode_start_time)
|
|
|
|
# Early stopping check
|
|
if self._should_early_stop(episode):
|
|
self.logger.info(f"Early stopping at episode {episode}")
|
|
break
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Final evaluation and analysis
|
|
final_stats = self._finalize_training(agent, total_time)
|
|
|
|
self.logger.info("Training completed successfully!")
|
|
return final_stats
|
|
|
|
def _run_episode(self, episode: int, env, agent) -> Dict:
|
|
"""Run a single training episode"""
|
|
state = env.reset()
|
|
total_reward = 0
|
|
steps = 0
|
|
losses = []
|
|
|
|
for step in range(self.max_steps):
|
|
# Agent action
|
|
action = agent.act(state, training=True)
|
|
|
|
# Environment step
|
|
next_state, reward, done, info = env.step(action)
|
|
|
|
# Store experience
|
|
agent.remember(state, action, reward, next_state, done)
|
|
|
|
# Train agent
|
|
loss = agent.replay()
|
|
if loss is not None:
|
|
losses.append(loss)
|
|
|
|
# Update state and metrics
|
|
state = next_state
|
|
total_reward += reward
|
|
steps += 1
|
|
|
|
if done:
|
|
break
|
|
|
|
# Get episode summary
|
|
episode_summary = env.get_episode_summary()
|
|
|
|
# Compile episode statistics
|
|
episode_stats = {
|
|
'reward': total_reward,
|
|
'steps': steps,
|
|
'average_loss': np.mean(losses) if losses else 0,
|
|
'epsilon': agent.epsilon,
|
|
'episode_summary': episode_summary,
|
|
'agent_stats': agent.get_training_stats()
|
|
}
|
|
|
|
return episode_stats
|
|
|
|
def _update_training_history(self, episode: int, episode_stats: Dict):
|
|
"""Update comprehensive training history"""
|
|
self.training_history['episodes'].append(episode)
|
|
self.training_history['rewards'].append(episode_stats['reward'])
|
|
self.training_history['steps'].append(episode_stats['steps'])
|
|
self.training_history['epsilon'].append(episode_stats['epsilon'])
|
|
self.training_history['loss'].append(episode_stats['average_loss'])
|
|
self.training_history['metrics'].append(episode_stats['episode_summary'])
|
|
|
|
# Update best reward
|
|
if episode_stats['reward'] > self.best_reward:
|
|
self.best_reward = episode_stats['reward']
|
|
|
|
def _evaluate_agent(self, episode: int, env, agent) -> float:
|
|
"""Evaluate agent performance"""
|
|
self.logger.info(f"Evaluating agent at episode {episode}...")
|
|
|
|
eval_episodes = self.config['evaluation']['test_episodes']
|
|
eval_rewards = []
|
|
eval_metrics = []
|
|
|
|
for eval_ep in range(eval_episodes):
|
|
state = env.reset()
|
|
total_reward = 0
|
|
|
|
for step in range(self.max_steps):
|
|
action = agent.act(state, training=False) # No exploration
|
|
next_state, reward, done, info = env.step(action)
|
|
|
|
state = next_state
|
|
total_reward += reward
|
|
|
|
if done:
|
|
break
|
|
|
|
eval_rewards.append(total_reward)
|
|
eval_metrics.append(env.get_episode_summary())
|
|
|
|
# Calculate evaluation score
|
|
avg_reward = np.mean(eval_rewards)
|
|
avg_delay = np.mean([m.get('average_delay', 0) for m in eval_metrics])
|
|
avg_throughput = np.mean([m.get('total_throughput', 0) for m in eval_metrics])
|
|
|
|
# Composite evaluation score
|
|
eval_score = avg_reward - 0.1 * avg_delay + 0.01 * avg_throughput
|
|
|
|
self.logger.info(f"Evaluation - Avg Reward: {avg_reward:.2f}, "
|
|
f"Avg Delay: {avg_delay:.2f}, Score: {eval_score:.2f}")
|
|
|
|
return eval_score
|
|
|
|
def _save_checkpoint(self, agent, episode: int):
|
|
"""Save training checkpoint"""
|
|
checkpoint_path = os.path.join(
|
|
self.config['paths']['models'], 'checkpoints',
|
|
f'checkpoint_episode_{episode}.pth'
|
|
)
|
|
agent.save(checkpoint_path, episode)
|
|
|
|
# Save training history
|
|
history_path = os.path.join(
|
|
self.config['paths']['results'],
|
|
f'training_history_episode_{episode}.json'
|
|
)
|
|
with open(history_path, 'w') as f:
|
|
# Convert numpy arrays to lists for JSON serialization
|
|
history_json = {}
|
|
for key, value in self.training_history.items():
|
|
if key == 'metrics':
|
|
history_json[key] = value # Keep as is for now
|
|
else:
|
|
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
|
|
json.dump(history_json, f, indent=2)
|
|
|
|
def _save_best_model(self, agent, episode: int, criteria: str):
|
|
"""Save best performing model"""
|
|
best_model_path = os.path.join(
|
|
self.config['paths']['models'],
|
|
f'best_model_{criteria}.pth'
|
|
)
|
|
agent.save(best_model_path, episode)
|
|
self.logger.info(f"New best model saved (criteria: {criteria}) at episode {episode}")
|
|
|
|
def _log_progress(self, episode: int, episode_stats: Dict, episode_time: float):
|
|
"""Log detailed training progress"""
|
|
recent_rewards = self.training_history['rewards'][-50:]
|
|
avg_reward = np.mean(recent_rewards)
|
|
|
|
self.logger.info(
|
|
f"Episode {episode:4d} | "
|
|
f"Reward: {episode_stats['reward']:8.2f} | "
|
|
f"Avg(50): {avg_reward:8.2f} | "
|
|
f"Steps: {episode_stats['steps']:4d} | "
|
|
f"Epsilon: {episode_stats['epsilon']:.3f} | "
|
|
f"Loss: {episode_stats['average_loss']:.4f} | "
|
|
f"Time: {episode_time:.2f}s"
|
|
)
|
|
|
|
# Log episode summary metrics
|
|
summary = episode_stats['episode_summary']
|
|
if summary:
|
|
self.logger.info(
|
|
f" Metrics - Delay: {summary.get('average_delay', 0):.2f}s | "
|
|
f"Queue: {summary.get('average_queue_length', 0):.1f} | "
|
|
f"Throughput: {summary.get('total_throughput', 0):.0f} | "
|
|
f"Fuel: {summary.get('fuel_efficiency', 0):.3f}L/veh"
|
|
)
|
|
|
|
def _should_early_stop(self, episode: int) -> bool:
|
|
"""Check if training should stop early"""
|
|
if episode < 100: # Minimum episodes before considering early stop
|
|
return False
|
|
|
|
# Check if reward has plateaued
|
|
recent_rewards = self.training_history['rewards'][-50:]
|
|
if len(recent_rewards) >= 50:
|
|
improvement = np.mean(recent_rewards[-25:]) - np.mean(recent_rewards[:25])
|
|
if improvement < 1.0: # Less than 1.0 reward improvement
|
|
return True
|
|
|
|
return False
|
|
|
|
def _finalize_training(self, agent, total_time: float) -> Dict:
|
|
"""Finalize training with comprehensive analysis"""
|
|
self.logger.info("Finalizing training...")
|
|
|
|
# Save final model
|
|
final_model_path = os.path.join(
|
|
self.config['paths']['models'], 'final_model.pth'
|
|
)
|
|
agent.save(final_model_path, len(self.training_history['episodes']))
|
|
|
|
# Generate comprehensive plots
|
|
self._generate_training_plots()
|
|
|
|
# Save final training history
|
|
final_history_path = os.path.join(
|
|
self.config['paths']['results'], 'final_training_history.json'
|
|
)
|
|
with open(final_history_path, 'w') as f:
|
|
history_json = {}
|
|
for key, value in self.training_history.items():
|
|
if key == 'metrics':
|
|
history_json[key] = value
|
|
else:
|
|
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
|
|
json.dump(history_json, f, indent=2)
|
|
|
|
# Compile final statistics
|
|
final_stats = {
|
|
'total_episodes': len(self.training_history['episodes']),
|
|
'total_training_time': total_time,
|
|
'best_reward': self.best_reward,
|
|
'best_eval_score': self.best_eval_score,
|
|
'final_epsilon': agent.epsilon,
|
|
'average_reward_last_100': np.mean(self.training_history['rewards'][-100:]),
|
|
'training_efficiency': len(self.training_history['episodes']) / (total_time / 3600) # episodes per hour
|
|
}
|
|
|
|
# Save final stats
|
|
stats_path = os.path.join(
|
|
self.config['paths']['results'], 'final_training_stats.json'
|
|
)
|
|
with open(stats_path, 'w') as f:
|
|
json.dump(final_stats, f, indent=2, default=str)
|
|
|
|
return final_stats
|
|
|
|
def _generate_training_plots(self):
|
|
"""Generate comprehensive training visualization"""
|
|
plt.style.use('seaborn-v0_8')
|
|
|
|
# Create subplot layout
|
|
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
|
|
fig.suptitle('Advanced Traffic Signal RL Training Analysis', fontsize=16)
|
|
|
|
episodes = self.training_history['episodes']
|
|
|
|
# 1. Reward progression
|
|
axes[0, 0].plot(episodes, self.training_history['rewards'], alpha=0.7, label='Episode Reward')
|
|
# Moving average
|
|
if len(self.training_history['rewards']) > 50:
|
|
moving_avg = pd.Series(self.training_history['rewards']).rolling(50).mean()
|
|
axes[0, 0].plot(episodes, moving_avg, 'r-', linewidth=2, label='Moving Average (50)')
|
|
axes[0, 0].set_title('Training Reward Progression')
|
|
axes[0, 0].set_xlabel('Episode')
|
|
axes[0, 0].set_ylabel('Reward')
|
|
axes[0, 0].legend()
|
|
axes[0, 0].grid(True, alpha=0.3)
|
|
|
|
# 2. Loss progression
|
|
valid_losses = [l for l in self.training_history['loss'] if l > 0]
|
|
valid_episodes = episodes[:len(valid_losses)]
|
|
if valid_losses:
|
|
axes[0, 1].plot(valid_episodes, valid_losses, alpha=0.7)
|
|
if len(valid_losses) > 20:
|
|
loss_avg = pd.Series(valid_losses).rolling(20).mean()
|
|
axes[0, 1].plot(valid_episodes, loss_avg, 'r-', linewidth=2)
|
|
axes[0, 1].set_title('Training Loss')
|
|
axes[0, 1].set_xlabel('Episode')
|
|
axes[0, 1].set_ylabel('Loss')
|
|
axes[0, 1].set_yscale('log')
|
|
axes[0, 1].grid(True, alpha=0.3)
|
|
|
|
# 3. Epsilon decay
|
|
axes[0, 2].plot(episodes, self.training_history['epsilon'])
|
|
axes[0, 2].set_title('Exploration Rate (Epsilon)')
|
|
axes[0, 2].set_xlabel('Episode')
|
|
axes[0, 2].set_ylabel('Epsilon')
|
|
axes[0, 2].grid(True, alpha=0.3)
|
|
|
|
# 4. Episode length
|
|
axes[1, 0].plot(episodes, self.training_history['steps'])
|
|
if len(self.training_history['steps']) > 20:
|
|
steps_avg = pd.Series(self.training_history['steps']).rolling(20).mean()
|
|
axes[1, 0].plot(episodes, steps_avg, 'r-', linewidth=2)
|
|
axes[1, 0].set_title('Episode Length')
|
|
axes[1, 0].set_xlabel('Episode')
|
|
axes[1, 0].set_ylabel('Steps')
|
|
axes[1, 0].grid(True, alpha=0.3)
|
|
|
|
# 5. Evaluation scores
|
|
if self.training_history['eval_scores']:
|
|
eval_episodes = [i * self.eval_freq for i in range(len(self.training_history['eval_scores']))]
|
|
axes[1, 1].plot(eval_episodes, self.training_history['eval_scores'], 'go-')
|
|
axes[1, 1].set_title('Evaluation Scores')
|
|
axes[1, 1].set_xlabel('Episode')
|
|
axes[1, 1].set_ylabel('Eval Score')
|
|
axes[1, 1].grid(True, alpha=0.3)
|
|
|
|
# 6. Performance metrics over time
|
|
if self.training_history['metrics']:
|
|
delays = [m.get('average_delay', 0) for m in self.training_history['metrics'] if m]
|
|
if delays:
|
|
axes[1, 2].plot(episodes[:len(delays)], delays)
|
|
axes[1, 2].set_title('Average Delay Over Time')
|
|
axes[1, 2].set_xlabel('Episode')
|
|
axes[1, 2].set_ylabel('Delay (s)')
|
|
axes[1, 2].grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
|
|
# Save plots
|
|
plots_dir = os.path.join(self.config['paths']['results'], 'plots')
|
|
plt.savefig(os.path.join(plots_dir, 'training_analysis.png'), dpi=300, bbox_inches='tight')
|
|
plt.savefig(os.path.join(plots_dir, 'training_analysis.pdf'), bbox_inches='tight')
|
|
plt.close()
|
|
|
|
self.logger.info("Training plots generated successfully")
|