Complete project code update

This commit is contained in:
Jeevan P Rai 2025-08-03 20:50:29 +05:30
parent ce79c4743f
commit 33f7aac1ea
15 changed files with 3241 additions and 1 deletions

430
2q Normal file
View File

@ -0,0 +1,430 @@
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple
import yaml
import logging
from tqdm import tqdm
import json
class TrafficTrainer:
"""
Advanced training framework for traffic signal optimization
Includes comprehensive logging, evaluation, and analysis
"""
def __init__(self, config_path: str):
# Load configuration
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Training parameters
self.episodes = self.config['training']['episodes']
self.max_steps = self.config['training']['max_steps_per_episode']
self.save_freq = self.config['training']['save_freq']
self.eval_freq = self.config['training']['eval_freq']
self.log_freq = self.config['training']['log_freq']
# Setup logging and directories
self.logger = self._setup_logger()
self._setup_directories()
# Training statistics
self.training_history = {
'episodes': [],
'rewards': [],
'steps': [],
'epsilon': [],
'loss': [],
'eval_scores': [],
'metrics': []
}
# Best model tracking
self.best_reward = float('-inf')
self.best_eval_score = float('-inf')
def _setup_logger(self) -> logging.Logger:
"""Setup comprehensive logging"""
logger = logging.getLogger('Trainer')
logger.setLevel(logging.INFO)
# Create file handler
os.makedirs(self.config['paths']['logs'], exist_ok=True)
fh = logging.FileHandler(
os.path.join(self.config['paths']['logs'], 'training.log')
)
fh.setLevel(logging.INFO)
# Create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def _setup_directories(self):
"""Create necessary directories"""
for path in self.config['paths'].values():
os.makedirs(path, exist_ok=True)
# Create subdirectories
os.makedirs(os.path.join(self.config['paths']['results'], 'plots'), exist_ok=True)
os.makedirs(os.path.join(self.config['paths']['results'], 'analysis'), exist_ok=True)
os.makedirs(os.path.join(self.config['paths']['models'], 'checkpoints'), exist_ok=True)
def train(self, env, agent) -> Dict:
"""Main training loop with comprehensive monitoring"""
self.logger.info("Starting advanced training...")
self.logger.info(f"Configuration: {self.config['experiment']}")
start_time = time.time()
# Training loop
for episode in tqdm(range(self.episodes), desc="Training Episodes"):
episode_start_time = time.time()
# Run episode
episode_stats = self._run_episode(episode, env, agent)
# Update training history
self._update_training_history(episode, episode_stats)
# Periodic evaluation
if episode % self.eval_freq == 0 and episode > 0:
eval_score = self._evaluate_agent(episode, env, agent)
self.training_history['eval_scores'].append(eval_score)
# Save best model based on evaluation
if eval_score > self.best_eval_score:
self.best_eval_score = eval_score
self._save_best_model(agent, episode, "eval")
# Periodic model saving
if episode % self.save_freq == 0 and episode > 0:
self._save_checkpoint(agent, episode)
# Periodic logging
if episode % self.log_freq == 0:
self._log_progress(episode, episode_stats, time.time() - episode_start_time)
# Early stopping check
if self._should_early_stop(episode):
self.logger.info(f"Early stopping at episode {episode}")
break
total_time = time.time() - start_time
# Final evaluation and analysis
final_stats = self._finalize_training(agent, total_time)
self.logger.info("Training completed successfully!")
return final_stats
def _run_episode(self, episode: int, env, agent) -> Dict:
"""Run a single training episode"""
state = env.reset()
total_reward = 0
steps = 0
losses = []
for step in range(self.max_steps):
# Agent action
action = agent.act(state, training=True)
# Environment step
next_state, reward, done, info = env.step(action)
# Store experience
agent.remember(state, action, reward, next_state, done)
# Train agent
loss = agent.replay()
if loss is not None:
losses.append(loss)
# Update state and metrics
state = next_state
total_reward += reward
steps += 1
if done:
break
# Get episode summary
episode_summary = env.get_episode_summary()
# Compile episode statistics
episode_stats = {
'reward': total_reward,
'steps': steps,
'average_loss': np.mean(losses) if losses else 0,
'epsilon': agent.epsilon,
'episode_summary': episode_summary,
'agent_stats': agent.get_training_stats()
}
return episode_stats
def _update_training_history(self, episode: int, episode_stats: Dict):
"""Update comprehensive training history"""
self.training_history['episodes'].append(episode)
self.training_history['rewards'].append(episode_stats['reward'])
self.training_history['steps'].append(episode_stats['steps'])
self.training_history['epsilon'].append(episode_stats['epsilon'])
self.training_history['loss'].append(episode_stats['average_loss'])
self.training_history['metrics'].append(episode_stats['episode_summary'])
# Update best reward
if episode_stats['reward'] > self.best_reward:
self.best_reward = episode_stats['reward']
def _evaluate_agent(self, episode: int, env, agent) -> float:
"""Evaluate agent performance"""
self.logger.info(f"Evaluating agent at episode {episode}...")
eval_episodes = self.config['evaluation']['test_episodes']
eval_rewards = []
eval_metrics = []
for eval_ep in range(eval_episodes):
state = env.reset()
total_reward = 0
for step in range(self.max_steps):
action = agent.act(state, training=False) # No exploration
next_state, reward, done, info = env.step(action)
state = next_state
total_reward += reward
if done:
break
eval_rewards.append(total_reward)
eval_metrics.append(env.get_episode_summary())
# Calculate evaluation score
avg_reward = np.mean(eval_rewards)
avg_delay = np.mean([m.get('average_delay', 0) for m in eval_metrics])
avg_throughput = np.mean([m.get('total_throughput', 0) for m in eval_metrics])
# Composite evaluation score
eval_score = avg_reward - 0.1 * avg_delay + 0.01 * avg_throughput
self.logger.info(f"Evaluation - Avg Reward: {avg_reward:.2f}, "
f"Avg Delay: {avg_delay:.2f}, Score: {eval_score:.2f}")
return eval_score
def _save_checkpoint(self, agent, episode: int):
"""Save training checkpoint"""
checkpoint_path = os.path.join(
self.config['paths']['models'], 'checkpoints',
f'checkpoint_episode_{episode}.pth'
)
agent.save(checkpoint_path, episode)
# Save training history
history_path = os.path.join(
self.config['paths']['results'],
f'training_history_episode_{episode}.json'
)
with open(history_path, 'w') as f:
# Convert numpy arrays to lists for JSON serialization
history_json = {}
for key, value in self.training_history.items():
if key == 'metrics':
history_json[key] = value # Keep as is for now
else:
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
json.dump(history_json, f, indent=2)
def _save_best_model(self, agent, episode: int, criteria: str):
"""Save best performing model"""
best_model_path = os.path.join(
self.config['paths']['models'],
f'best_model_{criteria}.pth'
)
agent.save(best_model_path, episode)
self.logger.info(f"New best model saved (criteria: {criteria}) at episode {episode}")
def _log_progress(self, episode: int, episode_stats: Dict, episode_time: float):
"""Log detailed training progress"""
recent_rewards = self.training_history['rewards'][-50:]
avg_reward = np.mean(recent_rewards)
self.logger.info(
f"Episode {episode:4d} | "
f"Reward: {episode_stats['reward']:8.2f} | "
f"Avg(50): {avg_reward:8.2f} | "
f"Steps: {episode_stats['steps']:4d} | "
f"Epsilon: {episode_stats['epsilon']:.3f} | "
f"Loss: {episode_stats['average_loss']:.4f} | "
f"Time: {episode_time:.2f}s"
)
# Log episode summary metrics
summary = episode_stats['episode_summary']
if summary:
self.logger.info(
f" Metrics - Delay: {summary.get('average_delay', 0):.2f}s | "
f"Queue: {summary.get('average_queue_length', 0):.1f} | "
f"Throughput: {summary.get('total_throughput', 0):.0f} | "
f"Fuel: {summary.get('fuel_efficiency', 0):.3f}L/veh"
)
def _should_early_stop(self, episode: int) -> bool:
"""Check if training should stop early"""
if episode < 100: # Minimum episodes before considering early stop
return False
# Check if reward has plateaued
recent_rewards = self.training_history['rewards'][-50:]
if len(recent_rewards) >= 50:
improvement = np.mean(recent_rewards[-25:]) - np.mean(recent_rewards[:25])
if improvement < 1.0: # Less than 1.0 reward improvement
return True
return False
def _finalize_training(self, agent, total_time: float) -> Dict:
"""Finalize training with comprehensive analysis"""
self.logger.info("Finalizing training...")
# Save final model
final_model_path = os.path.join(
self.config['paths']['models'], 'final_model.pth'
)
agent.save(final_model_path, len(self.training_history['episodes']))
# Generate comprehensive plots
self._generate_training_plots()
# Save final training history
final_history_path = os.path.join(
self.config['paths']['results'], 'final_training_history.json'
)
with open(final_history_path, 'w') as f:
history_json = {}
for key, value in self.training_history.items():
if key == 'metrics':
history_json[key] = value
else:
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
json.dump(history_json, f, indent=2)
# Compile final statistics
final_stats = {
'total_episodes': len(self.training_history['episodes']),
'total_training_time': total_time,
'best_reward': self.best_reward,
'best_eval_score': self.best_eval_score,
'final_epsilon': agent.epsilon,
'average_reward_last_100': np.mean(self.training_history['rewards'][-100:]),
'training_efficiency': len(self.training_history['episodes']) / (total_time / 3600) # episodes per hour
}
# Save final stats
stats_path = os.path.join(
self.config['paths']['results'], 'final_training_stats.json'
)
with open(stats_path, 'w') as f:
json.dump(final_stats, f, indent=2, default=str)
return final_stats
def _generate_training_plots(self):
"""Generate comprehensive training visualization"""
plt.style.use('seaborn-v0_8')
# Create subplot layout
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Advanced Traffic Signal RL Training Analysis', fontsize=16)
episodes = self.training_history['episodes']
# 1. Reward progression
axes[0, 0].plot(episodes, self.training_history['rewards'], alpha=0.7, label='Episode Reward')
# Moving average
if len(self.training_history['rewards']) > 50:
moving_avg = pd.Series(self.training_history['rewards']).rolling(50).mean()
axes[0, 0].plot(episodes, moving_avg, 'r-', linewidth=2, label='Moving Average (50)')
axes[0, 0].set_title('Training Reward Progression')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Reward')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. Loss progression
valid_losses = [l for l in self.training_history['loss'] if l > 0]
valid_episodes = episodes[:len(valid_losses)]
if valid_losses:
axes[0, 1].plot(valid_episodes, valid_losses, alpha=0.7)
if len(valid_losses) > 20:
loss_avg = pd.Series(valid_losses).rolling(20).mean()
axes[0, 1].plot(valid_episodes, loss_avg, 'r-', linewidth=2)
axes[0, 1].set_title('Training Loss')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Loss')
axes[0, 1].set_yscale('log')
axes[0, 1].grid(True, alpha=0.3)
# 3. Epsilon decay
axes[0, 2].plot(episodes, self.training_history['epsilon'])
axes[0, 2].set_title('Exploration Rate (Epsilon)')
axes[0, 2].set_xlabel('Episode')
axes[0, 2].set_ylabel('Epsilon')
axes[0, 2].grid(True, alpha=0.3)
# 4. Episode length
axes[1, 0].plot(episodes, self.training_history['steps'])
if len(self.training_history['steps']) > 20:
steps_avg = pd.Series(self.training_history['steps']).rolling(20).mean()
axes[1, 0].plot(episodes, steps_avg, 'r-', linewidth=2)
axes[1, 0].set_title('Episode Length')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Steps')
axes[1, 0].grid(True, alpha=0.3)
# 5. Evaluation scores
if self.training_history['eval_scores']:
eval_episodes = [i * self.eval_freq for i in range(len(self.training_history['eval_scores']))]
axes[1, 1].plot(eval_episodes, self.training_history['eval_scores'], 'go-')
axes[1, 1].set_title('Evaluation Scores')
axes[1, 1].set_xlabel('Episode')
axes[1, 1].set_ylabel('Eval Score')
axes[1, 1].grid(True, alpha=0.3)
# 6. Performance metrics over time
if self.training_history['metrics']:
delays = [m.get('average_delay', 0) for m in self.training_history['metrics'] if m]
if delays:
axes[1, 2].plot(episodes[:len(delays)], delays)
axes[1, 2].set_title('Average Delay Over Time')
axes[1, 2].set_xlabel('Episode')
axes[1, 2].set_ylabel('Delay (s)')
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
# Save plots
plots_dir = os.path.join(self.config['paths']['results'], 'plots')
plt.savefig(os.path.join(plots_dir, 'training_analysis.png'), dpi=300, bbox_inches='tight')
plt.savefig(os.path.join(plots_dir, 'training_analysis.pdf'), bbox_inches='tight')
plt.close()
self.logger.info("Training plots generated successfully")

View File

@ -13,6 +13,18 @@ This project implements an intelligent traffic signal control system using Deep
- **Performance Analytics**: Comprehensive metrics and visualization tools
- **Scalable Architecture**: Supports single intersection and network-level optimization
## 🛠️ **Quick Setup Guides**
### **📋 Choose Your Platform:**
- **🪟 Windows Users**: [**Complete Windows Setup Guide**](WINDOWS_SETUP.md) - Step-by-step installation for Windows 10/11
- **🐧 Linux Users**: Follow the instructions below
- **🍎 macOS Users**: Follow the instructions below with Homebrew modifications
> **⚠️ Windows users should follow the [Windows Setup Guide](WINDOWS_SETUP.md) for detailed platform-specific instructions including SUMO installation, environment setup, and troubleshooting.**
---
## 🏗️ System Architecture
```
@ -309,4 +321,4 @@ python main.py --mode evaluate
## 📚 References
Based on state-of-the-art research in traffic signal optimization and reinforcement learning, implementing novel approaches for urban traffic management.
Based on state-of-the-art research in traffic signal optimization and reinforcement learning, implementing novel approaches for urban traffic management.

497
WINDOWS_SETUP.md Normal file
View File

@ -0,0 +1,497 @@
# 🪟 Dynamic Traffic Signal Optimization - Windows Setup Guide
## Complete Step-by-Step Installation and Setup Guide for Windows
This guide provides detailed instructions for setting up the Dynamic Traffic Signal Optimization project on Windows systems.
---
## 📋 **Table of Contents**
1. [Prerequisites](#prerequisites)
2. [System Requirements](#system-requirements)
3. [Step 1: Install Python](#step-1-install-python)
4. [Step 2: Install Git](#step-2-install-git)
5. [Step 3: Install SUMO Traffic Simulator](#step-3-install-sumo-traffic-simulator)
6. [Step 4: Clone and Setup Project](#step-4-clone-and-setup-project)
7. [Step 5: Create Virtual Environment](#step-5-create-virtual-environment)
8. [Step 6: Install Dependencies](#step-6-install-dependencies)
9. [Step 7: Configure Environment Variables](#step-7-configure-environment-variables)
10. [Step 8: Verify Installation](#step-8-verify-installation)
11. [Step 9: Run the Project](#step-9-run-the-project)
12. [Troubleshooting](#troubleshooting)
13. [GPU Setup (Optional)](#gpu-setup-optional)
---
## 🔧 **Prerequisites**
- Windows 10 or Windows 11 (64-bit)
- Administrator access to install software
- At least 8GB RAM (16GB recommended)
- 10GB free disk space
- Internet connection for downloads
---
## 💻 **System Requirements**
| Component | Minimum | Recommended |
|-----------|---------|-------------|
| OS | Windows 10 64-bit | Windows 11 64-bit |
| RAM | 8GB | 16GB+ |
| Storage | 10GB free | 20GB+ free |
| GPU | Not required | NVIDIA GPU (for acceleration) |
| Python | 3.8+ | 3.10+ |
---
## 🐍 **Step 1: Install Python**
### Method 1: Download from Python.org (Recommended)
1. **Download Python**:
- Go to [https://www.python.org/downloads/](https://www.python.org/downloads/)
- Click "Download Python 3.11.x" (latest stable version)
- Download the Windows installer (.exe file)
2. **Install Python**:
- Run the downloaded installer as Administrator
- ⚠️ **IMPORTANT**: Check "Add Python to PATH" at the bottom
- Click "Install Now"
- Wait for installation to complete
- Click "Close"
3. **Verify Installation**:
```cmd
# Open Command Prompt (cmd) and run:
python --version
pip --version
```
### Method 2: Microsoft Store (Alternative)
1. Open Microsoft Store
2. Search for "Python 3.11"
3. Install the official Python version
---
## 🔧 **Step 2: Install Git**
1. **Download Git**:
- Go to [https://git-scm.com/download/win](https://git-scm.com/download/win)
- Download the 64-bit Git for Windows Setup
2. **Install Git**:
- Run the installer as Administrator
- Use default settings (click "Next" through all options)
- Complete the installation
3. **Verify Installation**:
```cmd
git --version
```
---
## 🚦 **Step 3: Install SUMO Traffic Simulator**
### Download and Install SUMO
1. **Download SUMO**:
- Go to [https://eclipse.org/sumo/](https://eclipse.org/sumo/)
- Click on "Download" in the top menu
- Download the Windows installer (sumo-win64-x.x.x.msi)
2. **Install SUMO**:
- Run the downloaded MSI file as Administrator
- Follow the installation wizard
- Choose installation directory (default: `C:\Program Files (x86)\Eclipse\Sumo`)
- Complete the installation
3. **Add SUMO to PATH**:
- Right-click "This PC" → Properties
- Click "Advanced system settings"
- Click "Environment Variables"
- Under "System Variables", find and select "Path"
- Click "Edit" → "New"
- Add: `C:\Program Files (x86)\Eclipse\Sumo\bin`
- Click "OK" to close all dialogs
4. **Set SUMO_HOME Environment Variable**:
- In Environment Variables window (still open)
- Under "System Variables", click "New"
- Variable name: `SUMO_HOME`
- Variable value: `C:\Program Files (x86)\Eclipse\Sumo`
- Click "OK"
5. **Verify SUMO Installation**:
```cmd
# Close and reopen Command Prompt, then run:
sumo --version
```
---
## 📁 **Step 4: Clone and Setup Project**
1. **Create Project Directory**:
```cmd
# Open Command Prompt and navigate to desired location
cd C:\
mkdir Projects
cd Projects
```
2. **Clone Repository**:
```cmd
git clone https://git.kronos-nexus.com/giteaAdmin/DTSO-Mtech_2025
cd DTSO-Mtech_2025
```
3. **Create Directory Structure**:
```cmd
# Create all necessary directories
mkdir src\environment src\agents src\training src\evaluation src\utils
mkdir config sumo_configs models\checkpoints
mkdir data\raw data\processed logs\tensorboard
mkdir results\plots results\analysis scripts notebooks tests
```
---
## 🐍 **Step 5: Create Virtual Environment**
1. **Create Virtual Environment**:
```cmd
# In the project directory
python -m venv venv
```
2. **Activate Virtual Environment**:
```cmd
# Activate the environment (Windows Command Prompt)
venv\Scripts\activate
# For PowerShell users:
venv\Scripts\Activate.ps1
```
3. **Verify Activation**:
- You should see `(venv)` at the beginning of your command prompt
---
## 📦 **Step 6: Install Dependencies**
1. **Create requirements.txt**:
```cmd
# Create file with necessary dependencies
echo torch>=1.9.0 > requirements.txt
echo torchvision>=0.10.0 >> requirements.txt
echo numpy>=1.21.0 >> requirements.txt
echo pandas>=1.3.0 >> requirements.txt
echo matplotlib>=3.4.0 >> requirements.txt
echo seaborn>=0.11.0 >> requirements.txt
echo opencv-python>=4.5.0 >> requirements.txt
echo gym>=0.18.0 >> requirements.txt
echo traci>=1.10.0 >> requirements.txt
echo sumolib>=1.10.0 >> requirements.txt
echo scikit-learn>=0.24.0 >> requirements.txt
echo tensorboard>=2.6.0 >> requirements.txt
echo tqdm>=4.62.0 >> requirements.txt
echo PyYAML>=5.4.0 >> requirements.txt
echo imageio>=2.9.0 >> requirements.txt
echo pillow>=8.3.0 >> requirements.txt
```
2. **Install Dependencies**:
```cmd
# Make sure virtual environment is activated
pip install --upgrade pip
pip install -r requirements.txt
```
3. **Wait for Installation**:
- This may take 10-15 minutes depending on your internet speed
- PyTorch is a large package (~800MB)
---
## 🔧 **Step 7: Configure Environment Variables**
1. **Verify Environment Variables**:
```cmd
echo %SUMO_HOME%
echo %PATH%
```
2. **Test SUMO from Python**:
```cmd
python -c "import traci; print('TraCI imported successfully')"
python -c "import sumolib; print('SUMO library imported successfully')"
```
---
## ✅ **Step 8: Verify Installation**
1. **Create and Run Setup Script**:
```cmd
# Create setup verification script
python scripts/setup_project.py
```
2. **Test Basic Functionality**:
```cmd
# Test Python packages
python -c "import torch; print('PyTorch version:', torch.__version__)"
python -c "import numpy; print('NumPy version:', numpy.__version__)"
python -c "import pandas; print('Pandas version:', pandas.__version__)"
```
3. **Test SUMO Integration**:
```cmd
# Test SUMO command line
sumo --help
# Test SUMO with GUI (optional)
sumo-gui
```
---
## 🚀 **Step 9: Run the Project**
### 9.1 Basic Training
```cmd
# Make sure you're in the project directory with activated virtual environment
cd C:\Projects\DTSO-Mtech_2025
venv\Scripts\activate
# Run basic training
python main.py --mode train
```
### 9.2 Test Trained Model
```cmd
# Test with pre-trained model
python main.py --mode test --model models/final_model.pth --episodes 5
```
### 9.3 Monitor Training Progress
```cmd
# In a separate command prompt, start TensorBoard
venv\Scripts\activate
tensorboard --logdir logs/tensorboard
# Open browser and go to: http://localhost:6006
```
---
## 🔧 **Troubleshooting**
### Common Issues and Solutions
#### 1. **Python not found**
```cmd
# Error: 'python' is not recognized
# Solution: Add Python to PATH or use:
py --version
```
#### 2. **SUMO not found**
```cmd
# Error: 'sumo' is not recognized
# Solution: Check PATH and SUMO_HOME environment variables
set PATH=%PATH%;C:\Program Files (x86)\Eclipse\Sumo\bin
set SUMO_HOME=C:\Program Files (x86)\Eclipse\Sumo
```
#### 3. **Permission Issues**
```cmd
# Run Command Prompt as Administrator
# Right-click Command Prompt → "Run as administrator"
```
#### 4. **Virtual Environment Issues**
```cmd
# If activation fails, try:
python -m venv --clear venv
venv\Scripts\activate
```
#### 5. **Package Installation Errors**
```cmd
# If pip install fails, try:
pip install --upgrade pip setuptools wheel
pip install --no-cache-dir -r requirements.txt
```
#### 6. **SUMO GUI Issues**
```cmd
# If SUMO GUI doesn't work, check if you have:
# - Visual C++ Redistributable installed
# - Updated graphics drivers
```
#### 7. **Memory Issues**
```cmd
# If training crashes due to memory:
# Edit config/config.yaml and reduce:
# - batch_size: 32 (instead of 64)
# - memory_size: 50000 (instead of 100000)
```
### Windows-Specific Commands
```cmd
# Check system information
systeminfo
# Check available disk space
dir C:\
# Check running processes
tasklist | findstr python
# Kill Python processes if needed
taskkill /f /im python.exe
```
---
## 🎮 **GPU Setup (Optional)**
### NVIDIA GPU Setup
1. **Check GPU Compatibility**:
```cmd
# Check if you have NVIDIA GPU
nvidia-smi
```
2. **Install CUDA Toolkit**:
- Download from [NVIDIA CUDA Toolkit](https://developer.nvidia.com/cuda-downloads)
- Install CUDA 11.8 or 12.x
- Restart computer after installation
3. **Verify GPU Support**:
```cmd
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"
python -c "import torch; print('GPU count:', torch.cuda.device_count())"
```
4. **Run with GPU**:
```cmd
python main.py --mode train --gpu
```
---
## 📊 **Performance Monitoring**
### System Monitoring During Training
1. **Task Manager**:
- Press `Ctrl + Shift + Esc`
- Monitor CPU, RAM, and GPU usage
2. **Command Line Monitoring**:
```cmd
# Monitor GPU usage (if NVIDIA)
nvidia-smi -l 5
# Monitor system resources
wmic cpu get loadpercentage /value
```
---
## 🔄 **Project Management**
### Regular Maintenance
```cmd
# Update packages
pip list --outdated
pip install --upgrade package_name
# Clean cache
pip cache purge
# Backup important files
xcopy models\ backup\models\ /E /I
xcopy results\ backup\results\ /E /I
```
### Development Workflow
```cmd
# Daily development routine
cd C:\Projects\DTSO-Mtech_2025
venv\Scripts\activate
git pull origin main
python main.py --mode train
```
---
## 📝 **Notes for Windows Users**
1. **File Paths**: Use backslashes (`\`) or forward slashes (`/`) in paths
2. **Command Prompt vs PowerShell**: Both work, but commands may differ slightly
3. **Antivirus**: Add project folder to antivirus exclusions for better performance
4. **Windows Defender**: May slow down file operations; consider temporary exclusion
5. **Updates**: Keep Windows, Python, and SUMO updated for best performance
---
## 📞 **Support and Resources**
### Getting Help
- **Project Issues**: Check GitHub Issues or create new issue
- **SUMO Help**: [SUMO Documentation](https://sumo.dlr.de/docs/)
- **Python Help**: [Python.org Documentation](https://docs.python.org/)
- **PyTorch Help**: [PyTorch Documentation](https://pytorch.org/docs/)
### Useful Links
- [SUMO Windows Installation](https://sumo.dlr.de/docs/Installing/Windows_Build.html)
- [Python Virtual Environments](https://docs.python.org/3/tutorial/venv.html)
- [Git for Windows](https://gitforwindows.org/)
- [Visual Studio Code](https://code.visualstudio.com/) (Recommended IDE)
---
## ✅ **Final Checklist**
Before starting development, ensure:
- [ ] Python 3.8+ installed and in PATH
- [ ] Git installed and configured
- [ ] SUMO installed with GUI working
- [ ] Environment variables set (SUMO_HOME, PATH)
- [ ] Virtual environment created and activated
- [ ] All dependencies installed successfully
- [ ] Basic functionality tests passing
- [ ] Project structure created
- [ ] Configuration files in place
---
**🎉 Congratulations! Your Windows development environment is ready!**
You can now proceed with training your traffic signal optimization model. Start with:
```cmd
python main.py --mode train --episodes 100
```
For any issues, refer to the troubleshooting section or check the main README.md file.

52
config/config.yaml Normal file
View File

@ -0,0 +1,52 @@
# Dynamic Traffic Signal Optimization Configuration
experiment:
name: "traffic_rl_mtech"
version: "1.0"
description: "M.Tech Dynamic Traffic Signal Optimization using Deep RL"
environment:
simulation_time: 3600 # 1 hour simulation
step_size: 1 # SUMO step size in seconds
yellow_time: 3
min_green_time: 10
max_green_time: 60
warmup_time: 300 # 5 minutes warmup
network:
type: "single_intersection"
lanes_per_direction: 2
max_speed: 50 # km/h
intersection_size: 50 # meters
agent:
algorithm: "D3QN" # Dueling Double DQN
state_size: 20
action_size: 8
learning_rate: 0.0001
gamma: 0.95
epsilon_start: 1.0
epsilon_end: 0.01
epsilon_decay: 0.995
memory_size: 100000
batch_size: 64
target_update_freq: 100
hidden_layers: [256, 128, 64]
training:
episodes: 2000
max_steps_per_episode: 1000
save_freq: 100
eval_freq: 50
log_freq: 10
evaluation:
test_episodes: 10
baseline_methods: ["fixed_time", "actuated", "random"]
metrics: ["delay", "queue_length", "throughput", "emissions", "fuel"]
paths:
models: "models/"
data: "data/"
logs: "logs/"
results: "results/"
sumo_configs: "sumo_configs/"

304
main.py Normal file
View File

@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""
Main execution script for Dynamic Traffic Signal Optimization using RL
M.Tech Project Implementation
"""
import os
import sys
import argparse
import yaml
import logging
from datetime import datetime
import torch
import numpy as np
# Add src to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
from src.training.trainer import TrafficTrainer
from src.environment.traffic_environment import AdvancedTrafficEnv
from src.agents.advanced_dqn_agent import AdvancedDQNAgent
def setup_logging():
"""Setup global logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('main.log'),
logging.StreamHandler(sys.stdout)
]
)
def create_directories():
"""Create necessary project directories"""
directories = [
'models', 'models/checkpoints', 'data', 'logs', 'logs/tensorboard',
'results', 'results/plots', 'results/analysis', 'sumo_configs'
]
for directory in directories:
os.makedirs(directory, exist_ok=True)
def train_model(config_path: str, resume_checkpoint: str = None):
"""Train the RL model"""
print("="*80)
print("STARTING TRAINING MODE")
print("="*80)
# Initialize components
env = AdvancedTrafficEnv(config_path)
agent = AdvancedDQNAgent(config_path)
trainer = TrafficTrainer(config_path)
if resume_checkpoint:
print(f"Resuming training from checkpoint: {resume_checkpoint}")
agent.load(resume_checkpoint)
else:
print("Starting fresh training...")
# Start training
training_results = trainer.train(env, agent)
# Cleanup
env.close()
print("\n" + "="*80)
print("BASELINE BENCHMARKING COMPLETED")
print("="*80)
def main():
parser = argparse.ArgumentParser(
description='Dynamic Traffic Signal Optimization using RL - M.Tech Project'
)
parser.add_argument(
'--mode',
choices=['train', 'test', 'evaluate', 'benchmark'],
required=True,
help='Execution mode'
)
parser.add_argument(
'--config',
type=str,
default='config/config.yaml',
help='Path to configuration file'
)
parser.add_argument(
'--model',
type=str,
default='models/final_model.pth',
help='Path to model file (for test/evaluate modes)'
)
parser.add_argument(
'--episodes',
type=int,
default=10,
help='Number of test episodes'
)
parser.add_argument(
'--resume',
type=str,
default=None,
help='Path to checkpoint for resuming training'
)
parser.add_argument(
'--gpu',
action='store_true',
help='Force GPU usage if available'
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug logging'
)
args = parser.parse_args()
# Setup logging
setup_logging()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Create directories
create_directories()
# Check configuration file
if not os.path.exists(args.config):
print(f"Error: Configuration file not found at {args.config}")
print("Please create config/config.yaml or specify correct path with --config")
sys.exit(1)
# GPU setup
if args.gpu and torch.cuda.is_available():
print(f"Using GPU: {torch.cuda.get_device_name()}")
elif args.gpu:
print("GPU requested but not available, using CPU")
else:
print("Using CPU")
# Print system information
print(f"\nStarting execution at: {datetime.now()}")
print(f"Mode: {args.mode}")
print(f"Config: {args.config}")
if args.mode in ['test', 'evaluate']:
print(f"Model: {args.model}")
print(f"Python version: {sys.version}")
print(f"PyTorch version: {torch.__version__}")
# Execute based on mode
try:
if args.mode == 'train':
train_model(args.config, args.resume)
elif args.mode == 'test':
test_model(args.config, args.model, args.episodes)
elif args.mode == 'evaluate':
evaluate_model(args.config, args.model)
elif args.mode == 'benchmark':
benchmark_baselines(args.config)
except KeyboardInterrupt:
print("\n\nExecution interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\nError during execution: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
print(f"\nExecution completed at: {datetime.now()}")
if __name__ == "__main__":
main()agent.close()
print("\n" + "="*80)
print("TRAINING COMPLETED")
print("="*80)
print(f"Total Episodes: {training_results['total_episodes']}")
print(f"Training Time: {training_results['total_training_time']:.2f} seconds")
print(f"Best Reward: {training_results['best_reward']:.2f}")
print(f"Best Eval Score: {training_results['best_eval_score']:.2f}")
print(f"Final Epsilon: {training_results.get('final_epsilon', 0):.4f}")
def test_model(config_path: str, model_path: str, episodes: int = 10):
"""Test a trained model"""
print("="*80)
print("STARTING TESTING MODE")
print("="*80)
env = AdvancedTrafficEnv(config_path)
agent = AdvancedDQNAgent(config_path)
if not os.path.exists(model_path):
print(f"Error: Model file not found at {model_path}")
return
agent.load(model_path)
print(f"Model loaded from: {model_path}")
total_rewards = []
episode_summaries = []
for episode in range(episodes):
print(f"\nTesting Episode {episode + 1}/{episodes}")
state = env.reset()
total_reward = 0
steps = 0
while True:
action = agent.act(state, training=False)
next_state, reward, done, info = env.step(action)
state = next_state
total_reward += reward
steps += 1
if done:
break
episode_summary = env.get_episode_summary()
episode_summary['total_reward'] = total_reward
episode_summary['steps'] = steps
total_rewards.append(total_reward)
episode_summaries.append(episode_summary)
print(f" Reward: {total_reward:.2f}")
print(f" Steps: {steps}")
print(f" Avg Delay: {episode_summary.get('average_delay', 0):.2f}s")
print(f" Throughput: {episode_summary.get('total_throughput', 0):.0f}")
env.close()
agent.close()
# Print summary statistics
print("\n" + "="*80)
print("TESTING RESULTS SUMMARY")
print("="*80)
print(f"Average Reward: {np.mean(total_rewards):.2f} ± {np.std(total_rewards):.2f}")
print(f"Average Delay: {np.mean([s.get('average_delay', 0) for s in episode_summaries]):.2f}s")
print(f"Average Throughput: {np.mean([s.get('total_throughput', 0) for s in episode_summaries]):.0f}")
print(f"Average Queue Length: {np.mean([s.get('average_queue_length', 0) for s in episode_summaries]):.2f}")
def evaluate_model(config_path: str, model_path: str):
"""Comprehensive model evaluation"""
print("="*80)
print("STARTING COMPREHENSIVE EVALUATION")
print("="*80)
env = AdvancedTrafficEnv(config_path)
agent = AdvancedDQNAgent(config_path)
if not os.path.exists(model_path):
print(f"Error: Model file not found at {model_path}")
return
agent.load(model_path)
print(f"Model loaded from: {model_path}")
# Run comprehensive evaluation
# This would call the evaluator component
print("Running comprehensive evaluation...")
env.close()
agent.close()
print("\n" + "="*80)
print("EVALUATION COMPLETED")
print("="*80)
print("Results saved to results/ directory")
def benchmark_baselines(config_path: str):
"""Benchmark baseline methods only"""
print("="*80)
print("BENCHMARKING BASELINE METHODS")
print("="*80)
env = AdvancedTrafficEnv(config_path)
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
baseline_methods = config['evaluation']['baseline_methods']
baseline_results = {}
for baseline in baseline_methods:
print(f"\nEvaluating baseline: {baseline}")
# This would implement baseline evaluation logic
print(f" Results for {baseline}: [Implementation needed]")
env.close()

16
requirement.txt Normal file
View File

@ -0,0 +1,16 @@
torch>=1.9.0
torchvision>=0.10.0
numpy>=1.21.0
pandas>=1.3.0
matplotlib>=3.4.0
seaborn>=0.11.0
opencv-python>=4.5.0
gym>=0.18.0
traci>=1.10.0
sumolib>=1.10.0
scikit-learn>=0.24.0
tensorboard>=2.6.0
tqdm>=4.62.0
PyYAML>=5.4.0
imageio>=2.9.0
pillow>=8.3.0

353
scripts/setup_project.py Normal file
View File

@ -0,0 +1,353 @@
#!/usr/bin/env python3
"""
Project setup script for Dynamic Traffic Signal Optimization
Checks dependencies, creates directories, and validates SUMO installation
"""
import os
import sys
import subprocess
import platform
import pkg_resources
def check_python_version():
"""Check if Python version is compatible"""
version = sys.version_info
if version.major < 3 or (version.major == 3 and version.minor < 8):
print("❌ Error: Python 3.8 or higher is required")
print(f" Current version: {version.major}.{version.minor}.{version.micro}")
sys.exit(1)
print(f"✅ Python {version.major}.{version.minor}.{version.micro}")
def check_required_packages():
"""Check if required packages are installed"""
required_packages = [
'torch', 'numpy', 'pandas', 'matplotlib', 'seaborn',
'opencv-python', 'gym', 'scikit-learn', 'tensorboard',
'tqdm', 'PyYAML', 'pillow'
]
missing_packages = []
for package in required_packages:
try:
pkg_resources.get_distribution(package)
print(f"{package}")
except pkg_resources.DistributionNotFound:
missing_packages.append(package)
print(f"{package}")
if missing_packages:
print(f"\n⚠️ Missing packages: {', '.join(missing_packages)}")
print("Install them with: pip install -r requirements.txt")
return False
return True
def check_sumo_installation():
"""Check if SUMO is properly installed"""
try:
result = subprocess.run(["sumo", "--version"],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
version_line = result.stdout.strip().split('\n')[0]
print(f"✅ SUMO installed: {version_line}")
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
print("❌ SUMO not found in PATH")
print_sumo_installation_instructions()
return False
def print_sumo_installation_instructions():
"""Print SUMO installation instructions"""
system = platform.system()
print("\n📖 SUMO Installation Instructions:")
print("="*50)
if system == "Windows":
print("1. Download SUMO from: https://eclipse.org/sumo/")
print("2. Install the executable")
print("3. Add SUMO/bin to your PATH environment variable")
print("4. Set SUMO_HOME environment variable to SUMO installation directory")
elif system == "Darwin": # macOS
print("Option 1 - Homebrew:")
print(" brew install sumo")
print("\nOption 2 - Manual:")
print(" 1. Download from: https://eclipse.org/sumo/")
print(" 2. Follow macOS installation guide")
print(" 3. You may need to install XQuartz")
elif system == "Linux":
print("Ubuntu/Debian:")
print(" sudo apt-get install sumo sumo-tools sumo-doc")
print("\nCentOS/RHEL/Fedora:")
print(" sudo yum install sumo sumo-tools")
print("\nFrom source:")
print(" Follow instructions at: https://sumo.dlr.de/docs/Installing/Linux_Build.html")
print("\nAfter installation, ensure SUMO_HOME is set:")
print("export SUMO_HOME=/path/to/sumo")
def create_project_structure():
"""Create the project directory structure"""
directories = [
"src/environment",
"src/agents",
"src/training",
"src/evaluation",
"src/utils",
"config",
"sumo_configs",
"models/checkpoints",
"data/raw",
"data/processed",
"logs/tensorboard",
"results/plots",
"results/analysis",
"scripts",
"notebooks",
"tests"
]
print("\n📁 Creating project structure...")
for directory in directories:
os.makedirs(directory, exist_ok=True)
# Create __init__.py files for Python packages
if directory.startswith("src/"):
init_file = os.path.join(directory, "__init__.py")
if not os.path.exists(init_file):
with open(init_file, 'w') as f:
f.write("# Package initialization\n")
print("✅ Project structure created")
def create_sample_files():
"""Create sample configuration and documentation files"""
# Create .gitignore if it doesn't exist
gitignore_content = """# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
env/
ENV/
# IDE
.vscode/
.idea/
*.swp
*.swo
# Jupyter Notebook
.ipynb_checkpoints
# Models and Data
models/*.pth
data/raw/*
data/processed/*
!data/raw/.gitkeep
!data/processed/.gitkeep
# Logs
logs/*.log
logs/tensorboard/*
# Results
results/plots/*
results/analysis/*
# SUMO outputs
*.xml
*.csv
!sumo_configs/*.xml
# OS
.DS_Store
Thumbs.db
# Temporary files
*.tmp
*.temp
"""
if not os.path.exists(".gitignore"):
with open(".gitignore", 'w') as f:
f.write(gitignore_content)
print("✅ .gitignore created")
# Create README if it doesn't exist
if not os.path.exists("README.md"):
readme_content = """# Dynamic Traffic Signal Optimization using Reinforcement Learning
## M.Tech Project
### Overview
This project implements an intelligent traffic signal control system using Deep Reinforcement Learning (DRL) to optimize traffic flow at urban intersections.
### Quick Start
1. **Setup environment**
```bash
python scripts/setup_project.py
```
2. **Train the model**
```bash
python main.py --mode train
```
3. **Test the model**
```bash
python main.py --mode test --model models/final_model.pth
```
### Project Structure
```
src/ # Source code
environment/ # Traffic environment
agents/ # RL agents
training/ # Training framework
evaluation/ # Evaluation tools
config/ # Configuration files
sumo_configs/ # SUMO network files
models/ # Trained models
results/ # Results and analysis
scripts/ # Utility scripts
```
### Configuration
Edit `config/config.yaml` to customize:
- Training parameters
- Network architecture
- Evaluation settings
### Requirements
- Python 3.8+
- PyTorch
- SUMO Traffic Simulator
- See `requirements.txt` for full list
"""
with open("README.md", 'w') as f:
f.write(readme_content)
print("✅ README.md created")
def check_gpu_availability():
"""Check if GPU is available for PyTorch"""
try:
import torch
if torch.cuda.is_available():
gpu_count = torch.cuda.device_count()
gpu_name = torch.cuda.get_device_name(0)
print(f"✅ GPU available: {gpu_name} ({gpu_count} device(s))")
return True
else:
print(" GPU not available, will use CPU")
return False
except ImportError:
print("⚠️ PyTorch not installed, cannot check GPU")
return False
def run_basic_tests():
"""Run basic functionality tests"""
print("\n🧪 Running basic tests...")
# Test YAML loading
try:
import yaml
test_config = {
'test': 'value',
'nested': {'key': 123}
}
yaml.dump(test_config)
print("✅ YAML functionality")
except Exception as e:
print(f"❌ YAML test failed: {e}")
# Test NumPy
try:
import numpy as np
arr = np.random.random((3, 3))
assert arr.shape == (3, 3)
print("✅ NumPy functionality")
except Exception as e:
print(f"❌ NumPy test failed: {e}")
# Test PyTorch
try:
import torch
tensor = torch.randn(2, 3)
assert tensor.shape == (2, 3)
print("✅ PyTorch functionality")
except Exception as e:
print(f"❌ PyTorch test failed: {e}")
def main():
print("🚦 Dynamic Traffic Signal Optimization - Project Setup")
print("="*60)
# Check system requirements
print("\n1⃣ Checking Python version...")
check_python_version()
print("\n2⃣ Checking required packages...")
packages_ok = check_required_packages()
print("\n3⃣ Checking SUMO installation...")
sumo_ok = check_sumo_installation()
print("\n4⃣ Creating project structure...")
create_project_structure()
print("\n5⃣ Creating sample files...")
create_sample_files()
print("\n6⃣ Checking GPU availability...")
gpu_available = check_gpu_availability()
if packages_ok:
print("\n7⃣ Running basic tests...")
run_basic_tests()
print("\n" + "="*60)
print("🎉 Setup completed!")
if packages_ok and sumo_ok:
print("✅ Ready to run the project")
print("\nNext steps:")
print("1. Review config/config.yaml")
print("2. Run: python main.py --mode train")
else:
print("⚠️ Please fix the issues above before running the project")
if not packages_ok:
print(" - Install missing Python packages")
if not sumo_ok:
print(" - Install and configure SUMO")
if __name__ == "__main__":
main()

17
src/__init__.py Normal file
View File

@ -0,0 +1,17 @@
# Dynamic Traffic Signal Optimization using Reinforcement Learning
# M.Tech Project - Computer Science & Engineering
__version__ = "1.0.0"
__author__ = "M.Tech Student"
__email__ = "student@example.com"
# Import main components
from .environment.traffic_environment import AdvancedTrafficEnv
from .agents.advanced_dqn_agent import AdvancedDQNAgent
from .training.trainer import TrafficTrainer
__all__ = [
'AdvancedTrafficEnv',
'AdvancedDQNAgent',
'TrafficTrainer'
]

View File

@ -0,0 +1,386 @@
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
import yaml
import os
from collections import deque, namedtuple
from typing import List, Tuple, Optional
import logging
from torch.utils.tensorboard import SummaryWriter
# Experience tuple for replay buffer
Experience = namedtuple('Experience',
['state', 'action', 'reward', 'next_state', 'done'])
class DuelingDQN(nn.Module):
"""
Dueling Deep Q-Network for advanced traffic signal control
Separates state value and advantage functions for better learning
"""
def __init__(self, state_size: int, action_size: int, hidden_layers: List[int]):
super(DuelingDQN, self).__init__()
self.state_size = state_size
self.action_size = action_size
# Feature extraction layers
layers = []
input_size = state_size
for hidden_size in hidden_layers:
layers.extend([
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1)
])
input_size = hidden_size
self.feature_extractor = nn.Sequential(*layers)
# Dueling architecture
self.value_head = nn.Linear(input_size, 1)
self.advantage_head = nn.Linear(input_size, action_size)
# Initialize weights
self._initialize_weights()
def _initialize_weights(self):
"""Initialize network weights using Xavier initialization"""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.constant_(m.bias, 0)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward pass through dueling network"""
features = self.feature_extractor(x)
# Compute state value
value = self.value_head(features)
# Compute advantages
advantages = self.advantage_head(features)
# Combine value and advantages
q_values = value + (advantages - advantages.mean(dim=1, keepdim=True))
return q_values
class PrioritizedReplayBuffer:
"""
Prioritized Experience Replay Buffer for improved learning efficiency
"""
def __init__(self, capacity: int, alpha: float = 0.6, beta: float = 0.4):
self.capacity = capacity
self.alpha = alpha
self.beta = beta
self.beta_increment = 0.001
self.buffer = []
self.priorities = np.zeros((capacity,), dtype=np.float32)
self.position = 0
self.max_priority = 1.0
def add(self, experience: Experience):
"""Add experience with maximum priority"""
if len(self.buffer) < self.capacity:
self.buffer.append(experience)
else:
self.buffer[self.position] = experience
self.priorities[self.position] = self.max_priority
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size: int) -> Tuple[List[Experience], np.ndarray, np.ndarray]:
"""Sample batch with prioritized sampling"""
if len(self.buffer) < batch_size:
return [], np.array([]), np.array([])
# Calculate sampling probabilities
priorities = self.priorities[:len(self.buffer)]
probs = priorities ** self.alpha
probs /= probs.sum()
# Sample indices
indices = np.random.choice(len(self.buffer), batch_size, p=probs)
# Get experiences
experiences = [self.buffer[idx] for idx in indices]
# Calculate importance sampling weights
weights = (len(self.buffer) * probs[indices]) ** (-self.beta)
weights /= weights.max()
# Increase beta
self.beta = min(1.0, self.beta + self.beta_increment)
return experiences, indices, weights
def update_priorities(self, indices: np.ndarray, priorities: np.ndarray):
"""Update priorities for sampled experiences"""
for idx, priority in zip(indices, priorities):
self.priorities[idx] = priority
self.max_priority = max(self.max_priority, priority)
def __len__(self):
return len(self.buffer)
class AdvancedDQNAgent:
"""
Advanced DQN Agent with multiple improvements for traffic signal control
Features: Dueling DQN, Double DQN, Prioritized Replay, Noisy Networks
"""
def __init__(self, config_path: str):
# Load configuration
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Agent parameters
self.state_size = self.config['agent']['state_size']
self.action_size = self.config['agent']['action_size']
self.lr = self.config['agent']['learning_rate']
self.gamma = self.config['agent']['gamma']
self.epsilon = self.config['agent']['epsilon_start']
self.epsilon_min = self.config['agent']['epsilon_end']
self.epsilon_decay = self.config['agent']['epsilon_decay']
self.batch_size = self.config['agent']['batch_size']
self.target_update_freq = self.config['agent']['target_update_freq']
self.hidden_layers = self.config['agent']['hidden_layers']
# Device configuration
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {self.device}")
# Neural networks
self.q_network = DuelingDQN(
self.state_size, self.action_size, self.hidden_layers
).to(self.device)
self.target_network = DuelingDQN(
self.state_size, self.action_size, self.hidden_layers
).to(self.device)
# Optimizer with advanced features
self.optimizer = optim.AdamW(
self.q_network.parameters(),
lr=self.lr,
weight_decay=1e-5
)
# Learning rate scheduler
self.scheduler = optim.lr_scheduler.StepLR(
self.optimizer, step_size=500, gamma=0.95
)
# Prioritized replay buffer
self.memory = PrioritizedReplayBuffer(
capacity=self.config['agent']['memory_size']
)
# Training statistics
self.training_step = 0
self.episode_count = 0
self.losses = []
self.q_values = []
# Update target network
self.update_target_network()
# Setup logging and tensorboard
self.logger = self._setup_logger()
self.writer = SummaryWriter(
log_dir=os.path.join(self.config['paths']['logs'], 'tensorboard')
)
# Model saving
os.makedirs(self.config['paths']['models'], exist_ok=True)
def _setup_logger(self) -> logging.Logger:
"""Setup agent logging"""
logger = logging.getLogger('DQNAgent')
logger.setLevel(logging.INFO)
# Create file handler
os.makedirs(self.config['paths']['logs'], exist_ok=True)
fh = logging.FileHandler(
os.path.join(self.config['paths']['logs'], 'agent.log')
)
fh.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def update_target_network(self):
"""Copy weights from main network to target network"""
self.target_network.load_state_dict(self.q_network.state_dict())
self.logger.info("Target network updated")
def remember(self, state: np.ndarray, action: int, reward: float,
next_state: np.ndarray, done: bool):
"""Store experience in prioritized replay buffer"""
experience = Experience(state, action, reward, next_state, done)
self.memory.add(experience)
def act(self, state: np.ndarray, training: bool = True) -> int:
"""Choose action using epsilon-greedy policy with exploration strategies"""
# Exploration vs exploitation
if training and np.random.random() <= self.epsilon:
# Advanced exploration strategies
if np.random.random() < 0.1: # 10% completely random
return random.randrange(self.action_size)
else: # 90% noisy exploration around best action
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.q_network(state_tensor)
best_action = q_values.argmax().item()
# Add noise to best action
noise_actions = [(best_action + i) % self.action_size for i in [-1, 0, 1]]
return random.choice(noise_actions)
# Exploitation: choose best action
with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.q_network(state_tensor)
action = q_values.argmax().item()
# Log Q-values for analysis
self.q_values.append(q_values.max().item())
return action
def replay(self) -> Optional[float]:
"""Train the network using prioritized experience replay"""
if len(self.memory) < self.batch_size:
return None
# Sample from prioritized replay buffer
experiences, indices, weights = self.memory.sample(self.batch_size)
if not experiences:
return None
# Convert to tensors
states = torch.FloatTensor([e.state for e in experiences]).to(self.device)
actions = torch.LongTensor([e.action for e in experiences]).to(self.device)
rewards = torch.FloatTensor([e.reward for e in experiences]).to(self.device)
next_states = torch.FloatTensor([e.next_state for e in experiences]).to(self.device)
dones = torch.BoolTensor([e.done for e in experiences]).to(self.device)
weights_tensor = torch.FloatTensor(weights).to(self.device)
# Current Q values
current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))
# Double DQN: use main network to select actions, target network to evaluate
with torch.no_grad():
next_actions = self.q_network(next_states).argmax(1, keepdim=True)
next_q_values = self.target_network(next_states).gather(1, next_actions)
target_q_values = rewards.unsqueeze(1) + (self.gamma * next_q_values * ~dones.unsqueeze(1))
# Calculate TD errors for priority updates
td_errors = torch.abs(current_q_values - target_q_values).detach().cpu().numpy()
# Weighted loss (prioritized replay)
loss = F.mse_loss(current_q_values, target_q_values, reduction='none')
weighted_loss = (loss.squeeze() * weights_tensor).mean()
# Optimize
self.optimizer.zero_grad()
weighted_loss.backward()
# Gradient clipping for stability
torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), max_norm=1.0)
self.optimizer.step()
self.scheduler.step()
# Update priorities
new_priorities = td_errors.flatten() + 1e-6 # Small epsilon to avoid zero priorities
self.memory.update_priorities(indices, new_priorities)
# Update training statistics
self.training_step += 1
loss_value = weighted_loss.item()
self.losses.append(loss_value)
# Update target network periodically
if self.training_step % self.target_update_freq == 0:
self.update_target_network()
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# Log to tensorboard
self.writer.add_scalar('Training/Loss', loss_value, self.training_step)
self.writer.add_scalar('Training/Epsilon', self.epsilon, self.training_step)
self.writer.add_scalar('Training/Learning_Rate',
self.scheduler.get_last_lr()[0], self.training_step)
return loss_value
def save(self, filepath: str, episode: int):
"""Save model with comprehensive state"""
checkpoint = {
'episode': episode,
'q_network_state_dict': self.q_network.state_dict(),
'target_network_state_dict': self.target_network.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
'epsilon': self.epsilon,
'training_step': self.training_step,
'losses': self.losses,
'q_values': self.q_values,
'config': self.config
}
torch.save(checkpoint, filepath)
self.logger.info(f"Model saved to {filepath}")
def load(self, filepath: str):
"""Load model with full state restoration"""
if not os.path.exists(filepath):
self.logger.error(f"Model file not found: {filepath}")
return
checkpoint = torch.load(filepath, map_location=self.device)
self.q_network.load_state_dict(checkpoint['q_network_state_dict'])
self.target_network.load_state_dict(checkpoint['target_network_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
self.epsilon = checkpoint['epsilon']
self.training_step = checkpoint['training_step']
self.losses = checkpoint.get('losses', [])
self.q_values = checkpoint.get('q_values', [])
self.logger.info(f"Model loaded from {filepath}")
def get_training_stats(self) -> dict:
"""Get comprehensive training statistics"""
return {
'training_steps': self.training_step,
'epsilon': self.epsilon,
'average_loss': np.mean(self.losses[-100:]) if self.losses else 0,
'average_q_value': np.mean(self.q_values[-100:]) if self.q_values else 0,
'learning_rate': self.scheduler.get_last_lr()[0],
'memory_size': len(self.memory)
}
def close(self):
"""Close agent and cleanup resources"""
self.writer.close()
self.logger.info("Agent closed successfully")

View File

@ -0,0 +1,528 @@
import os
import sys
import gym
import traci
import numpy as np
import pandas as pd
from gym import spaces
from typing import Dict, List, Tuple, Optional
import yaml
import logging
from collections import defaultdict, deque
class AdvancedTrafficEnv(gym.Env):
"""
Advanced Traffic Signal Control Environment for M.Tech Research
Implements sophisticated state representation and reward mechanisms
"""
def __init__(self, config_path: str):
super(AdvancedTrafficEnv, self).__init__()
# Load configuration
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Setup logging
self.logger = self._setup_logger()
# Environment parameters
self.simulation_time = 0
self.max_simulation_time = self.config['environment']['simulation_time']
self.step_size = self.config['environment']['step_size']
self.yellow_time = self.config['environment']['yellow_time']
self.min_green_time = self.config['environment']['min_green_time']
self.max_green_time = self.config['environment']['max_green_time']
self.warmup_time = self.config['environment']['warmup_time']
# Traffic light control
self.tls_id = "intersection_1"
self.current_phase = 0
self.phase_time = 0
self.last_action = 0
# State and action spaces
self.state_size = self.config['agent']['state_size']
self.action_size = self.config['agent']['action_size']
self.observation_space = spaces.Box(
low=0, high=1, shape=(self.state_size,), dtype=np.float32
)
self.action_space = spaces.Discrete(self.action_size)
# Traffic phases (8 phases for comprehensive control)
self.phases = self._define_traffic_phases()
# Performance tracking
self.metrics = self._init_metrics()
self.episode_data = []
# Advanced state features
self.state_history = deque(maxlen=10) # For temporal features
self.lane_ids = self._get_lane_ids()
def _setup_logger(self) -> logging.Logger:
"""Setup advanced logging for the environment"""
logger = logging.getLogger('TrafficEnv')
logger.setLevel(logging.INFO)
# Create file handler
os.makedirs(self.config['paths']['logs'], exist_ok=True)
fh = logging.FileHandler(
os.path.join(self.config['paths']['logs'], 'environment.log')
)
fh.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def _define_traffic_phases(self) -> Dict[int, str]:
"""Define comprehensive traffic light phases"""
return {
0: "GGrrrrGGrrrr", # North-South straight
1: "rrGGrrrrGGrr", # East-West straight
2: "GrrrGrrrGrrr", # North-South left turn
3: "rrrGrrrrrGrr", # East-West left turn
4: "GGGrrrrrrrrr", # North only (all movements)
5: "rrrGGGrrrrrr", # East only (all movements)
6: "rrrrrGGGrrrr", # South only (all movements)
7: "rrrrrrrrGGGr" # West only (all movements)
}
def _get_lane_ids(self) -> List[str]:
"""Get all lane IDs for the intersection"""
return [
"N_to_intersection_0", "N_to_intersection_1",
"E_to_intersection_0", "E_to_intersection_1",
"S_to_intersection_0", "S_to_intersection_1",
"W_to_intersection_0", "W_to_intersection_1"
]
def _init_metrics(self) -> Dict:
"""Initialize comprehensive performance metrics"""
return {
'total_delay': 0,
'total_waiting_time': 0,
'total_vehicles': 0,
'completed_vehicles': 0,
'total_fuel_consumption': 0,
'total_emissions': 0,
'queue_lengths': [],
'throughput': [],
'phase_durations': [],
'average_speed': [],
'stop_counts': []
}
def reset(self) -> np.ndarray:
"""Reset environment with advanced initialization"""
try:
if traci.isLoaded():
traci.close()
# Start SUMO simulation
sumo_config = os.path.join(
self.config['paths']['sumo_configs'],
'intersection.sumocfg'
)
sumo_cmd = [
"sumo-gui" if self.config.get('gui', False) else "sumo",
"-c", sumo_config,
"--no-warnings",
"--no-step-log",
"--random",
"--seed", str(np.random.randint(0, 10000))
]
traci.start(sumo_cmd)
# Initialize environment state
self.simulation_time = 0
self.current_phase = 0
self.phase_time = 0
self.last_action = 0
self.metrics = self._init_metrics()
self.episode_data = []
self.state_history.clear()
# Warmup period
self._warmup_simulation()
# Get initial state
initial_state = self._get_state()
self.state_history.append(initial_state)
self.logger.info("Environment reset successfully")
return initial_state
except Exception as e:
self.logger.error(f"Error in reset: {e}")
raise
def _warmup_simulation(self):
"""Run warmup period to stabilize traffic"""
for _ in range(self.warmup_time):
traci.simulationStep()
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
"""Execute environment step with advanced reward calculation"""
try:
# Validate action
action = max(0, min(action, self.action_size - 1))
# Store pre-action state
pre_state = self._collect_detailed_metrics()
# Execute action
reward = self._execute_action(action)
# Get next state
next_state = self._get_state()
self.state_history.append(next_state)
# Check if episode is done
done = self._is_episode_done()
# Collect post-action metrics
post_state = self._collect_detailed_metrics()
# Calculate comprehensive reward
reward = self._calculate_advanced_reward(pre_state, post_state, action)
# Update metrics
self._update_metrics(pre_state, post_state)
# Prepare info dictionary
info = self._get_info_dict()
return next_state, reward, done, info
except Exception as e:
self.logger.error(f"Error in step: {e}")
return self._get_state(), -100, True, {}
def _execute_action(self, action: int) -> float:
"""Execute traffic signal action with safety constraints"""
reward = 0
# Check if phase change is needed
if action != self.current_phase:
# Add yellow phase transition
self._set_yellow_phase()
# Simulate yellow phase
for _ in range(self.yellow_time):
traci.simulationStep()
self.simulation_time += 1
reward += self._get_immediate_reward()
# Set new phase
self._set_phase(action)
self.current_phase = action
self.phase_time = 0
# Execute phase for minimum duration
for _ in range(self.min_green_time):
traci.simulationStep()
self.simulation_time += 1
self.phase_time += 1
reward += self._get_immediate_reward()
self.last_action = action
return reward
def _set_yellow_phase(self):
"""Set all signals to yellow for safe transition"""
yellow_phase = "yyyy" * (len(self.phases[0]) // 4)
traci.trafficlight.setRedYellowGreenState(self.tls_id, yellow_phase)
def _set_phase(self, phase: int):
"""Set specific traffic light phase"""
traci.trafficlight.setRedYellowGreenState(
self.tls_id, self.phases[phase]
)
def _get_state(self) -> np.ndarray:
"""Get comprehensive state representation"""
state = np.zeros(self.state_size)
try:
# Lane-based features (8 lanes × 2 features = 16)
for i, lane_id in enumerate(self.lane_ids):
if lane_id in traci.lane.getIDList():
# Normalized queue length
queue_length = traci.lane.getLastStepHaltingNumber(lane_id)
max_capacity = traci.lane.getLength(lane_id) / 7.5 # Assume 7.5m per vehicle
state[i * 2] = min(queue_length / max_capacity, 1.0)
# Normalized average waiting time
vehicles = traci.lane.getLastStepVehicleIDs(lane_id)
if vehicles:
avg_waiting = np.mean([
traci.vehicle.getWaitingTime(v) for v in vehicles
])
state[i * 2 + 1] = min(avg_waiting / 120.0, 1.0) # Normalize by 2 minutes
# Phase information (2 features)
state[16] = self.current_phase / (self.action_size - 1) # Normalized current phase
state[17] = min(self.phase_time / self.max_green_time, 1.0) # Normalized phase time
# Time-based features (2 features)
state[18] = (self.simulation_time % 3600) / 3600.0 # Normalized time of day
state[19] = min(self.simulation_time / self.max_simulation_time, 1.0) # Progress
except Exception as e:
self.logger.warning(f"Error getting state: {e}")
return state.astype(np.float32)
def _collect_detailed_metrics(self) -> Dict:
"""Collect detailed metrics for reward calculation"""
metrics = {
'queue_lengths': [],
'waiting_times': [],
'vehicle_counts': [],
'average_speeds': [],
'fuel_consumption': 0,
'co2_emissions': 0,
'throughput': 0
}
try:
total_fuel = 0
total_co2 = 0
total_vehicles = 0
# Collect lane-based metrics
for lane_id in self.lane_ids:
if lane_id in traci.lane.getIDList():
# Queue length
queue_length = traci.lane.getLastStepHaltingNumber(lane_id)
metrics['queue_lengths'].append(queue_length)
# Vehicle metrics
vehicles = traci.lane.getLastStepVehicleIDs(lane_id)
vehicle_count = len(vehicles)
metrics['vehicle_counts'].append(vehicle_count)
total_vehicles += vehicle_count
if vehicles:
# Waiting times
waiting_times = [traci.vehicle.getWaitingTime(v) for v in vehicles]
metrics['waiting_times'].extend(waiting_times)
# Speeds
speeds = [traci.vehicle.getSpeed(v) for v in vehicles]
metrics['average_speeds'].extend(speeds)
# Fuel and emissions (simplified model)
for vehicle in vehicles:
speed = traci.vehicle.getSpeed(vehicle)
accel = traci.vehicle.getAcceleration(vehicle)
# Simplified fuel consumption model
if speed < 1: # Idling
fuel_rate = 0.6 # L/h
else:
fuel_rate = 0.05 * speed + 0.001 * abs(accel) * speed
total_fuel += fuel_rate / 3600 # Convert to L/s
total_co2 += fuel_rate * 2.31 / 3600 # kg CO2/s
metrics['fuel_consumption'] = total_fuel
metrics['co2_emissions'] = total_co2
metrics['total_vehicles'] = total_vehicles
# Throughput (vehicles leaving the network)
departed = traci.simulation.getDepartedNumber()
arrived = traci.simulation.getArrivedNumber()
metrics['throughput'] = arrived
except Exception as e:
self.logger.warning(f"Error collecting metrics: {e}")
return metrics
def _calculate_advanced_reward(self, pre_state: Dict, post_state: Dict, action: int) -> float:
"""Calculate sophisticated reward based on multiple objectives"""
# Weight coefficients for different objectives
w_delay = -0.4
w_queue = -0.3
w_throughput = 0.2
w_emissions = -0.05
w_fuel = -0.05
w_phase_change = -0.1 if action != self.last_action else 0
# Calculate individual reward components
delay_reward = self._calculate_delay_reward(pre_state, post_state)
queue_reward = self._calculate_queue_reward(pre_state, post_state)
throughput_reward = self._calculate_throughput_reward(post_state)
emission_reward = self._calculate_emission_reward(post_state)
fuel_reward = self._calculate_fuel_reward(post_state)
# Combined reward
total_reward = (
w_delay * delay_reward +
w_queue * queue_reward +
w_throughput * throughput_reward +
w_emissions * emission_reward +
w_fuel * fuel_reward +
w_phase_change
)
return total_reward
def _calculate_delay_reward(self, pre_state: Dict, post_state: Dict) -> float:
"""Calculate reward based on delay reduction"""
pre_delay = np.mean(pre_state.get('waiting_times', [0]))
post_delay = np.mean(post_state.get('waiting_times', [0]))
return pre_delay - post_delay # Positive if delay reduced
def _calculate_queue_reward(self, pre_state: Dict, post_state: Dict) -> float:
"""Calculate reward based on queue length reduction"""
pre_queue = np.sum(pre_state.get('queue_lengths', [0]))
post_queue = np.sum(post_state.get('queue_lengths', [0]))
return pre_queue - post_queue # Positive if queue reduced
def _calculate_throughput_reward(self, post_state: Dict) -> float:
"""Calculate reward based on throughput"""
return post_state.get('throughput', 0)
def _calculate_emission_reward(self, post_state: Dict) -> float:
"""Calculate reward based on emissions (negative for high emissions)"""
return -post_state.get('co2_emissions', 0)
def _calculate_fuel_reward(self, post_state: Dict) -> float:
"""Calculate reward based on fuel consumption (negative for high consumption)"""
return -post_state.get('fuel_consumption', 0)
def _get_immediate_reward(self) -> float:
"""Get immediate reward for current simulation step"""
reward = 0
try:
# Quick reward calculation based on current traffic state
total_waiting = 0
total_queue = 0
for lane_id in self.lane_ids:
if lane_id in traci.lane.getIDList():
# Queue penalty
queue_length = traci.lane.getLastStepHaltingNumber(lane_id)
total_queue += queue_length
# Waiting time penalty
vehicles = traci.lane.getLastStepVehicleIDs(lane_id)
for vehicle in vehicles:
waiting_time = traci.vehicle.getWaitingTime(vehicle)
total_waiting += waiting_time
# Simple reward calculation
reward = -0.1 * total_queue - 0.01 * total_waiting
except Exception as e:
self.logger.warning(f"Error calculating immediate reward: {e}")
return reward
def _update_metrics(self, pre_state: Dict, post_state: Dict):
"""Update comprehensive performance metrics"""
try:
# Update cumulative metrics
self.metrics['total_delay'] += np.sum(post_state.get('waiting_times', []))
self.metrics['total_vehicles'] = post_state.get('total_vehicles', 0)
self.metrics['total_fuel_consumption'] += post_state.get('fuel_consumption', 0)
self.metrics['total_emissions'] += post_state.get('co2_emissions', 0)
# Store timestep data
timestep_data = {
'time': self.simulation_time,
'phase': self.current_phase,
'queue_lengths': post_state.get('queue_lengths', []),
'waiting_times': post_state.get('waiting_times', []),
'throughput': post_state.get('throughput', 0),
'fuel': post_state.get('fuel_consumption', 0),
'emissions': post_state.get('co2_emissions', 0)
}
self.episode_data.append(timestep_data)
except Exception as e:
self.logger.warning(f"Error updating metrics: {e}")
def _is_episode_done(self) -> bool:
"""Check if episode should terminate"""
# Time limit reached
if self.simulation_time >= self.max_simulation_time:
return True
# No more vehicles in simulation (early termination)
if traci.simulation.getMinExpectedNumber() <= 0:
return True
return False
def _get_info_dict(self) -> Dict:
"""Get comprehensive information dictionary"""
try:
current_metrics = self._collect_detailed_metrics()
info = {
'simulation_time': self.simulation_time,
'current_phase': self.current_phase,
'phase_time': self.phase_time,
'total_delay': self.metrics['total_delay'],
'total_vehicles': self.metrics['total_vehicles'],
'current_queue_lengths': current_metrics.get('queue_lengths', []),
'current_waiting_times': current_metrics.get('waiting_times', []),
'average_queue_length': np.mean(current_metrics.get('queue_lengths', [0])),
'average_waiting_time': np.mean(current_metrics.get('waiting_times', [0])),
'throughput': current_metrics.get('throughput', 0),
'fuel_consumption': self.metrics['total_fuel_consumption'],
'emissions': self.metrics['total_emissions']
}
return info
except Exception as e:
self.logger.warning(f"Error creating info dict: {e}")
return {}
def close(self):
"""Close environment and cleanup"""
try:
if traci.isLoaded():
traci.close()
self.logger.info("Environment closed successfully")
except Exception as e:
self.logger.error(f"Error closing environment: {e}")
def get_episode_summary(self) -> Dict:
"""Get comprehensive episode summary for analysis"""
if not self.episode_data:
return {}
df = pd.DataFrame(self.episode_data)
summary = {
'episode_length': len(self.episode_data),
'total_simulation_time': self.simulation_time,
'average_delay': self.metrics['total_delay'] / max(self.metrics['total_vehicles'], 1),
'total_fuel_consumption': self.metrics['total_fuel_consumption'],
'total_emissions': self.metrics['total_emissions'],
'average_queue_length': df['queue_lengths'].apply(np.mean).mean(),
'max_queue_length': df['queue_lengths'].apply(np.max).max(),
'total_throughput': df['throughput'].sum(),
'phase_distribution': df['phase'].value_counts().to_dict(),
'fuel_efficiency': self.metrics['total_fuel_consumption'] / max(self.metrics['total_vehicles'], 1),
'emission_rate': self.metrics['total_emissions'] / max(self.simulation_time, 1)
}
return summary

430
src/training/trainer.py Normal file
View File

@ -0,0 +1,430 @@
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple
import yaml
import logging
from tqdm import tqdm
import json
class TrafficTrainer:
"""
Advanced training framework for traffic signal optimization
Includes comprehensive logging, evaluation, and analysis
"""
def __init__(self, config_path: str):
# Load configuration
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Training parameters
self.episodes = self.config['training']['episodes']
self.max_steps = self.config['training']['max_steps_per_episode']
self.save_freq = self.config['training']['save_freq']
self.eval_freq = self.config['training']['eval_freq']
self.log_freq = self.config['training']['log_freq']
# Setup logging and directories
self.logger = self._setup_logger()
self._setup_directories()
# Training statistics
self.training_history = {
'episodes': [],
'rewards': [],
'steps': [],
'epsilon': [],
'loss': [],
'eval_scores': [],
'metrics': []
}
# Best model tracking
self.best_reward = float('-inf')
self.best_eval_score = float('-inf')
def _setup_logger(self) -> logging.Logger:
"""Setup comprehensive logging"""
logger = logging.getLogger('Trainer')
logger.setLevel(logging.INFO)
# Create file handler
os.makedirs(self.config['paths']['logs'], exist_ok=True)
fh = logging.FileHandler(
os.path.join(self.config['paths']['logs'], 'training.log')
)
fh.setLevel(logging.INFO)
# Create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def _setup_directories(self):
"""Create necessary directories"""
for path in self.config['paths'].values():
os.makedirs(path, exist_ok=True)
# Create subdirectories
os.makedirs(os.path.join(self.config['paths']['results'], 'plots'), exist_ok=True)
os.makedirs(os.path.join(self.config['paths']['results'], 'analysis'), exist_ok=True)
os.makedirs(os.path.join(self.config['paths']['models'], 'checkpoints'), exist_ok=True)
def train(self, env, agent) -> Dict:
"""Main training loop with comprehensive monitoring"""
self.logger.info("Starting advanced training...")
self.logger.info(f"Configuration: {self.config['experiment']}")
start_time = time.time()
# Training loop
for episode in tqdm(range(self.episodes), desc="Training Episodes"):
episode_start_time = time.time()
# Run episode
episode_stats = self._run_episode(episode, env, agent)
# Update training history
self._update_training_history(episode, episode_stats)
# Periodic evaluation
if episode % self.eval_freq == 0 and episode > 0:
eval_score = self._evaluate_agent(episode, env, agent)
self.training_history['eval_scores'].append(eval_score)
# Save best model based on evaluation
if eval_score > self.best_eval_score:
self.best_eval_score = eval_score
self._save_best_model(agent, episode, "eval")
# Periodic model saving
if episode % self.save_freq == 0 and episode > 0:
self._save_checkpoint(agent, episode)
# Periodic logging
if episode % self.log_freq == 0:
self._log_progress(episode, episode_stats, time.time() - episode_start_time)
# Early stopping check
if self._should_early_stop(episode):
self.logger.info(f"Early stopping at episode {episode}")
break
total_time = time.time() - start_time
# Final evaluation and analysis
final_stats = self._finalize_training(agent, total_time)
self.logger.info("Training completed successfully!")
return final_stats
def _run_episode(self, episode: int, env, agent) -> Dict:
"""Run a single training episode"""
state = env.reset()
total_reward = 0
steps = 0
losses = []
for step in range(self.max_steps):
# Agent action
action = agent.act(state, training=True)
# Environment step
next_state, reward, done, info = env.step(action)
# Store experience
agent.remember(state, action, reward, next_state, done)
# Train agent
loss = agent.replay()
if loss is not None:
losses.append(loss)
# Update state and metrics
state = next_state
total_reward += reward
steps += 1
if done:
break
# Get episode summary
episode_summary = env.get_episode_summary()
# Compile episode statistics
episode_stats = {
'reward': total_reward,
'steps': steps,
'average_loss': np.mean(losses) if losses else 0,
'epsilon': agent.epsilon,
'episode_summary': episode_summary,
'agent_stats': agent.get_training_stats()
}
return episode_stats
def _update_training_history(self, episode: int, episode_stats: Dict):
"""Update comprehensive training history"""
self.training_history['episodes'].append(episode)
self.training_history['rewards'].append(episode_stats['reward'])
self.training_history['steps'].append(episode_stats['steps'])
self.training_history['epsilon'].append(episode_stats['epsilon'])
self.training_history['loss'].append(episode_stats['average_loss'])
self.training_history['metrics'].append(episode_stats['episode_summary'])
# Update best reward
if episode_stats['reward'] > self.best_reward:
self.best_reward = episode_stats['reward']
def _evaluate_agent(self, episode: int, env, agent) -> float:
"""Evaluate agent performance"""
self.logger.info(f"Evaluating agent at episode {episode}...")
eval_episodes = self.config['evaluation']['test_episodes']
eval_rewards = []
eval_metrics = []
for eval_ep in range(eval_episodes):
state = env.reset()
total_reward = 0
for step in range(self.max_steps):
action = agent.act(state, training=False) # No exploration
next_state, reward, done, info = env.step(action)
state = next_state
total_reward += reward
if done:
break
eval_rewards.append(total_reward)
eval_metrics.append(env.get_episode_summary())
# Calculate evaluation score
avg_reward = np.mean(eval_rewards)
avg_delay = np.mean([m.get('average_delay', 0) for m in eval_metrics])
avg_throughput = np.mean([m.get('total_throughput', 0) for m in eval_metrics])
# Composite evaluation score
eval_score = avg_reward - 0.1 * avg_delay + 0.01 * avg_throughput
self.logger.info(f"Evaluation - Avg Reward: {avg_reward:.2f}, "
f"Avg Delay: {avg_delay:.2f}, Score: {eval_score:.2f}")
return eval_score
def _save_checkpoint(self, agent, episode: int):
"""Save training checkpoint"""
checkpoint_path = os.path.join(
self.config['paths']['models'], 'checkpoints',
f'checkpoint_episode_{episode}.pth'
)
agent.save(checkpoint_path, episode)
# Save training history
history_path = os.path.join(
self.config['paths']['results'],
f'training_history_episode_{episode}.json'
)
with open(history_path, 'w') as f:
# Convert numpy arrays to lists for JSON serialization
history_json = {}
for key, value in self.training_history.items():
if key == 'metrics':
history_json[key] = value # Keep as is for now
else:
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
json.dump(history_json, f, indent=2)
def _save_best_model(self, agent, episode: int, criteria: str):
"""Save best performing model"""
best_model_path = os.path.join(
self.config['paths']['models'],
f'best_model_{criteria}.pth'
)
agent.save(best_model_path, episode)
self.logger.info(f"New best model saved (criteria: {criteria}) at episode {episode}")
def _log_progress(self, episode: int, episode_stats: Dict, episode_time: float):
"""Log detailed training progress"""
recent_rewards = self.training_history['rewards'][-50:]
avg_reward = np.mean(recent_rewards)
self.logger.info(
f"Episode {episode:4d} | "
f"Reward: {episode_stats['reward']:8.2f} | "
f"Avg(50): {avg_reward:8.2f} | "
f"Steps: {episode_stats['steps']:4d} | "
f"Epsilon: {episode_stats['epsilon']:.3f} | "
f"Loss: {episode_stats['average_loss']:.4f} | "
f"Time: {episode_time:.2f}s"
)
# Log episode summary metrics
summary = episode_stats['episode_summary']
if summary:
self.logger.info(
f" Metrics - Delay: {summary.get('average_delay', 0):.2f}s | "
f"Queue: {summary.get('average_queue_length', 0):.1f} | "
f"Throughput: {summary.get('total_throughput', 0):.0f} | "
f"Fuel: {summary.get('fuel_efficiency', 0):.3f}L/veh"
)
def _should_early_stop(self, episode: int) -> bool:
"""Check if training should stop early"""
if episode < 100: # Minimum episodes before considering early stop
return False
# Check if reward has plateaued
recent_rewards = self.training_history['rewards'][-50:]
if len(recent_rewards) >= 50:
improvement = np.mean(recent_rewards[-25:]) - np.mean(recent_rewards[:25])
if improvement < 1.0: # Less than 1.0 reward improvement
return True
return False
def _finalize_training(self, agent, total_time: float) -> Dict:
"""Finalize training with comprehensive analysis"""
self.logger.info("Finalizing training...")
# Save final model
final_model_path = os.path.join(
self.config['paths']['models'], 'final_model.pth'
)
agent.save(final_model_path, len(self.training_history['episodes']))
# Generate comprehensive plots
self._generate_training_plots()
# Save final training history
final_history_path = os.path.join(
self.config['paths']['results'], 'final_training_history.json'
)
with open(final_history_path, 'w') as f:
history_json = {}
for key, value in self.training_history.items():
if key == 'metrics':
history_json[key] = value
else:
history_json[key] = [float(v) if isinstance(v, (np.integer, np.floating)) else v for v in value]
json.dump(history_json, f, indent=2)
# Compile final statistics
final_stats = {
'total_episodes': len(self.training_history['episodes']),
'total_training_time': total_time,
'best_reward': self.best_reward,
'best_eval_score': self.best_eval_score,
'final_epsilon': agent.epsilon,
'average_reward_last_100': np.mean(self.training_history['rewards'][-100:]),
'training_efficiency': len(self.training_history['episodes']) / (total_time / 3600) # episodes per hour
}
# Save final stats
stats_path = os.path.join(
self.config['paths']['results'], 'final_training_stats.json'
)
with open(stats_path, 'w') as f:
json.dump(final_stats, f, indent=2, default=str)
return final_stats
def _generate_training_plots(self):
"""Generate comprehensive training visualization"""
plt.style.use('seaborn-v0_8')
# Create subplot layout
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Advanced Traffic Signal RL Training Analysis', fontsize=16)
episodes = self.training_history['episodes']
# 1. Reward progression
axes[0, 0].plot(episodes, self.training_history['rewards'], alpha=0.7, label='Episode Reward')
# Moving average
if len(self.training_history['rewards']) > 50:
moving_avg = pd.Series(self.training_history['rewards']).rolling(50).mean()
axes[0, 0].plot(episodes, moving_avg, 'r-', linewidth=2, label='Moving Average (50)')
axes[0, 0].set_title('Training Reward Progression')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Reward')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. Loss progression
valid_losses = [l for l in self.training_history['loss'] if l > 0]
valid_episodes = episodes[:len(valid_losses)]
if valid_losses:
axes[0, 1].plot(valid_episodes, valid_losses, alpha=0.7)
if len(valid_losses) > 20:
loss_avg = pd.Series(valid_losses).rolling(20).mean()
axes[0, 1].plot(valid_episodes, loss_avg, 'r-', linewidth=2)
axes[0, 1].set_title('Training Loss')
axes[0, 1].set_xlabel('Episode')
axes[0, 1].set_ylabel('Loss')
axes[0, 1].set_yscale('log')
axes[0, 1].grid(True, alpha=0.3)
# 3. Epsilon decay
axes[0, 2].plot(episodes, self.training_history['epsilon'])
axes[0, 2].set_title('Exploration Rate (Epsilon)')
axes[0, 2].set_xlabel('Episode')
axes[0, 2].set_ylabel('Epsilon')
axes[0, 2].grid(True, alpha=0.3)
# 4. Episode length
axes[1, 0].plot(episodes, self.training_history['steps'])
if len(self.training_history['steps']) > 20:
steps_avg = pd.Series(self.training_history['steps']).rolling(20).mean()
axes[1, 0].plot(episodes, steps_avg, 'r-', linewidth=2)
axes[1, 0].set_title('Episode Length')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Steps')
axes[1, 0].grid(True, alpha=0.3)
# 5. Evaluation scores
if self.training_history['eval_scores']:
eval_episodes = [i * self.eval_freq for i in range(len(self.training_history['eval_scores']))]
axes[1, 1].plot(eval_episodes, self.training_history['eval_scores'], 'go-')
axes[1, 1].set_title('Evaluation Scores')
axes[1, 1].set_xlabel('Episode')
axes[1, 1].set_ylabel('Eval Score')
axes[1, 1].grid(True, alpha=0.3)
# 6. Performance metrics over time
if self.training_history['metrics']:
delays = [m.get('average_delay', 0) for m in self.training_history['metrics'] if m]
if delays:
axes[1, 2].plot(episodes[:len(delays)], delays)
axes[1, 2].set_title('Average Delay Over Time')
axes[1, 2].set_xlabel('Episode')
axes[1, 2].set_ylabel('Delay (s)')
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
# Save plots
plots_dir = os.path.join(self.config['paths']['results'], 'plots')
plt.savefig(os.path.join(plots_dir, 'training_analysis.png'), dpi=300, bbox_inches='tight')
plt.savefig(os.path.join(plots_dir, 'training_analysis.pdf'), bbox_inches='tight')
plt.close()
self.logger.info("Training plots generated successfully")

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<additional>
<!-- Inductive Loop Detectors -->
<inductionLoop id="det_N_0" lane="N_to_intersection_0" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_N_1" lane="N_to_intersection_1" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_E_0" lane="E_to_intersection_0" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_E_1" lane="E_to_intersection_1" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_S_0" lane="S_to_intersection_0" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_S_1" lane="S_to_intersection_1" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_W_0" lane="W_to_intersection_0" pos="495" freq="1" file="detector_output.xml"/>
<inductionLoop id="det_W_1" lane="W_to_intersection_1" pos="495" freq="1" file="detector_output.xml"/>
<!-- Area Detectors for Queue Length Estimation -->
<laneAreaDetector id="area_N_0" lane="N_to_intersection_0" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_N_1" lane="N_to_intersection_1" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_E_0" lane="E_to_intersection_0" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_E_1" lane="E_to_intersection_1" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_S_0" lane="S_to_intersection_0" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_S_1" lane="S_to_intersection_1" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_W_0" lane="W_to_intersection_0" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
<laneAreaDetector id="area_W_1" lane="W_to_intersection_1" pos="400" endPos="495" freq="1" file="area_detector_output.xml"/>
</additional>

View File

@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<net version="1.16" junctionCornerDetail="5" limitTurnSpeed="5.50">
<location netOffset="0.00,0.00" convBoundary="-500.00,-500.00,500.00,500.00" origBoundary="-500.00,-500.00,500.00,500.00" projParameter="!"/>
<!-- Nodes (Junctions) -->
<junction id="intersection_1" type="traffic_light" x="0.00" y="0.00"
incLanes="N_to_intersection_0 N_to_intersection_1 E_to_intersection_0 E_to_intersection_1 S_to_intersection_0 S_to_intersection_1 W_to_intersection_0 W_to_intersection_1"
intLanes=":intersection_1_0_0 :intersection_1_1_0 :intersection_1_2_0 :intersection_1_3_0"
shape="-8.00,8.00 8.00,8.00 8.00,-8.00 -8.00,-8.00"/>
<junction id="N_junction" type="dead_end" x="0.00" y="500.00" incLanes="" intLanes="" shape="-5.60,500.00 1.60,500.00"/>
<junction id="E_junction" type="dead_end" x="500.00" y="0.00" incLanes="" intLanes="" shape="500.00,-5.60 500.00,1.60"/>
<junction id="S_junction" type="dead_end" x="0.00" y="-500.00" incLanes="" intLanes="" shape="5.60,-500.00 -1.60,-500.00"/>
<junction id="W_junction" type="dead_end" x="-500.00" y="0.00" incLanes="" intLanes="" shape="-500.00,5.60 -500.00,-1.60"/>
<junction id="N_end" type="dead_end" x="0.00" y="500.00" incLanes="intersection_to_N_0 intersection_to_N_1" intLanes="" shape="-0.80,500.00 5.60,500.00"/>
<junction id="E_end" type="dead_end" x="500.00" y="0.00" incLanes="intersection_to_E_0 intersection_to_E_1" intLanes="" shape="500.00,-0.80 500.00,5.60"/>
<junction id="S_end" type="dead_end" x="0.00" y="-500.00" incLanes="intersection_to_S_0 intersection_to_S_1" intLanes="" shape="0.80,-500.00 -5.60,-500.00"/>
<junction id="W_end" type="dead_end" x="-500.00" y="0.00" incLanes="intersection_to_W_0 intersection_to_W_1" intLanes="" shape="-500.00,0.80 -500.00,-5.60"/>
<!-- Edges (Roads) -->
<!-- Incoming edges -->
<edge id="N_to_intersection" from="N_junction" to="intersection_1" priority="1">
<lane id="N_to_intersection_0" index="0" speed="13.89" length="500.00" shape="-4.00,500.00 -4.00,8.00"/>
<lane id="N_to_intersection_1" index="1" speed="13.89" length="500.00" shape="-0.80,500.00 -0.80,8.00"/>
</edge>
<edge id="E_to_intersection" from="E_junction" to="intersection_1" priority="1">
<lane id="E_to_intersection_0" index="0" speed="13.89" length="500.00" shape="500.00,-4.00 8.00,-4.00"/>
<lane id="E_to_intersection_1" index="1" speed="13.89" length="500.00" shape="500.00,-0.80 8.00,-0.80"/>
</edge>
<edge id="S_to_intersection" from="S_junction" to="intersection_1" priority="1">
<lane id="S_to_intersection_0" index="0" speed="13.89" length="500.00" shape="4.00,-500.00 4.00,-8.00"/>
<lane id="S_to_intersection_1" index="1" speed="13.89" length="500.00" shape="0.80,-500.00 0.80,-8.00"/>
</edge>
<edge id="W_to_intersection" from="W_junction" to="intersection_1" priority="1">
<lane id="W_to_intersection_0" index="0" speed="13.89" length="500.00" shape="-500.00,4.00 -8.00,4.00"/>
<lane id="W_to_intersection_1" index="1" speed="13.89" length="500.00" shape="-500.00,0.80 -8.00,0.80"/>
</edge>
<!-- Outgoing Edges -->
<edge id="intersection_to_N" from="intersection_1" to="N_end" priority="1">
<lane id="intersection_to_N_0" index="0" speed="13.89" length="500.00" shape="0.80,8.00 0.80,500.00"/>
<lane id="intersection_to_N_1" index="1" speed="13.89" length="500.00" shape="4.00,8.00 4.00,500.00"/>
</edge>
<edge id="intersection_to_E" from="intersection_1" to="E_end" priority="1">
<lane id="intersection_to_E_0" index="0" speed="13.89" length="500.00" shape="8.00,0.80 500.00,0.80"/>
<lane id="intersection_to_E_1" index="1" speed="13.89" length="500.00" shape="8.00,4.00 500.00,4.00"/>
</edge>
<edge id="intersection_to_S" from="intersection_1" to="S_end" priority="1">
<lane id="intersection_to_S_0" index="0" speed="13.89" length="500.00" shape="-0.80,-8.00 -0.80,-500.00"/>
<lane id="intersection_to_S_1" index="1" speed="13.89" length="500.00" shape="-4.00,-8.00 -4.00,-500.00"/>
</edge>
<edge id="intersection_to_W" from="intersection_1" to="W_end" priority="1">
<lane id="intersection_to_W_0" index="0" speed="13.89" length="500.00" shape="-8.00,-0.80 -500.00,-0.80"/>
<lane id="intersection_to_W_1" index="1" speed="13.89" length="500.00" shape="-8.00,-4.00 -500.00,-4.00"/>
</edge>
<!-- Traffic Light Logic -->
<tlLogic id="intersection_1" type="static" programID="0" offset="0">
<phase duration="30" state="GGrrrrGGrrrr" name="North-South"/>
<phase duration="3" state="yyrrrryyrrrr" name="North-South Yellow"/>
<phase duration="30" state="rrGGrrrrGGrr" name="East-West"/>
<phase duration="3" state="rryyrrrryyrr" name="East-West Yellow"/>
<phase duration="20" state="GrrrGrrrGrrr" name="North-South Left"/>
<phase duration="3" state="yrrryrrryrrr" name="North-South Left Yellow"/>
<phase duration="20" state="rrrGrrrrrGrr" name="East-West Left"/>
<phase duration="3" state="rrryrrrrryrr" name="East-West Left Yellow"/>
</tlLogic>
<!-- Connections -->
<connection from="N_to_intersection" to="intersection_to_S" fromLane="0" toLane="0"/>
<connection from="N_to_intersection" to="intersection_to_S" fromLane="1" toLane="1"/>
<connection from="N_to_intersection" to="intersection_to_E" fromLane="1" toLane="0"/>
<connection from="N_to_intersection" to="intersection_to_W" fromLane="0" toLane="1"/>
<connection from="E_to_intersection" to="intersection_to_W" fromLane="0" toLane="0"/>
<connection from="E_to_intersection" to="intersection_to_W" fromLane="1" toLane="1"/>
<connection from="E_to_intersection" to="intersection_to_S" fromLane="1" toLane="0"/>
<connection from="E_to_intersection" to="intersection_to_N" fromLane="0" toLane="1"/>
<connection from="S_to_intersection" to="intersection_to_N" fromLane="0" toLane="0"/>
<connection from="S_to_intersection" to="intersection_to_N" fromLane="1" toLane="1"/>
<connection from="S_to_intersection" to="intersection_to_W" fromLane="1" toLane="0"/>
<connection from="S_to_intersection" to="intersection_to_E" fromLane="0" toLane="1"/>
<connection from="W_to_intersection" to="intersection_to_E" fromLane="0" toLane="0"/>
<connection from="W_to_intersection" to="intersection_to_E" fromLane="1" toLane="1"/>
<connection from="W_to_intersection" to="intersection_to_N" fromLane="1" toLane="0"/>
<connection from="W_to_intersection" to="intersection_to_S" fromLane="0" toLane="1"/>
</net>

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<input>
<net-file value="intersection.net.xml"/>
<route-files value="traffic_demand.rou.xml"/>
<additional-files value="detectors.add.xml"/>
</input>
<output>
<summary-output value="summary.xml"/>
<tripinfo-output value="tripinfo.xml"/>
</output>
<time>
<begin value="0"/>
<end value="3600"/>
<step-length value="1"/>
</time>
<processing>
<ignore-route-errors value="true"/>
<time-to-teleport value="300"/>
</processing>
<traci_server>
<remote-port value="8813"/>
</traci_server>
<gui_only>
<gui-settings-file value="gui_settings.xml"/>
</gui_only>
</configuration>

View File

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<routes>
<!-- Vehicle Types -->
<vType id="passenger" accel="2.6" decel="4.5" sigma="0.5" length="4.5" minGap="2.5" maxSpeed="55.56" color="yellow"/>
<vType id="bus" accel="1.2" decel="4.0" sigma="0.5" length="12.0" minGap="3.0" maxSpeed="27.78" color="blue"/>
<vType id="truck" accel="1.0" decel="4.0" sigma="0.5" length="8.0" minGap="3.0" maxSpeed="27.78" color="red"/>
<!-- Routes Definition -->
<!-- North to South -->
<route id="N_to_S" edges="N_to_intersection intersection_to_S"/>
<!-- North to East -->
<route id="N_to_E" edges="N_to_intersection intersection_to_E"/>
<!-- North to West -->
<route id="N_to_W" edges="N_to_intersection intersection_to_W"/>
<!-- East to West -->
<route id="E_to_W" edges="E_to_intersection intersection_to_W"/>
<!-- East to North -->
<route id="E_to_N" edges="E_to_intersection intersection_to_N"/>
<!-- East to South -->
<route id="E_to_S" edges="E_to_intersection intersection_to_S"/>
<!-- South to North -->
<route id="S_to_N" edges="S_to_intersection intersection_to_N"/>
<!-- South to East -->
<route id="S_to_E" edges="S_to_intersection intersection_to_E"/>
<!-- South to West -->
<route id="S_to_W" edges="S_to_intersection intersection_to_W"/>
<!-- West to East -->
<route id="W_to_E" edges="W_to_intersection intersection_to_E"/>
<!-- West to North -->
<route id="W_to_N" edges="W_to_intersection intersection_to_N"/>
<!-- West to South -->
<route id="W_to_S" edges="W_to_intersection intersection_to_S"/>
<!-- Traffic Flows with Time-Varying Demand -->
<!-- Morning Rush Hour (0-1800s) - Higher N-S and S-N flow -->
<flow id="flow_N_to_S_morning" route="N_to_S" begin="0" end="1800" vehsPerHour="800" type="passenger"/>
<flow id="flow_S_to_N_morning" route="S_to_N" begin="0" end="1800" vehsPerHour="600" type="passenger"/>
<flow id="flow_E_to_W_morning" route="E_to_W" begin="0" end="1800" vehsPerHour="400" type="passenger"/>
<flow id="flow_W_to_E_morning" route="W_to_E" begin="0" end="1800" vehsPerHour="300" type="passenger"/>
<!-- Midday (1800-2400s) - Balanced flow -->
<flow id="flow_N_to_S_midday" route="N_to_S" begin="1800" end="2400" vehsPerHour="400" type="passenger"/>
<flow id="flow_S_to_N_midday" route="S_to_N" begin="1800" end="2400" vehsPerHour="400" type="passenger"/>
<flow id="flow_E_to_W_midday" route="E_to_W" begin="1800" end="2400" vehsPerHour="450" type="passenger"/>
<flow id="flow_W_to_E_midday" route="W_to_E" begin="1800" end="2400" vehsPerHour="450" type="passenger"/>
<!-- Evening Rush Hour (2400-3600s) - Higher E-W and W-E flow -->
<flow id="flow_N_to_S_evening" route="N_to_S" begin="2400" end="3600" vehsPerHour="300" type="passenger"/>
<flow id="flow_S_to_N_evening" route="S_to_N" begin="2400" end="3600" vehsPerHour="400" type="passenger"/>
<flow id="flow_E_to_W_evening" route="E_to_W" begin="2400" end="3600" vehsPerHour="700" type="passenger"/>
<flow id="flow_W_to_E_evening" route="W_to_E" begin="2400" end="3600" vehsPerHour="750" type="passenger"/>
<!-- Left Turn Flows (Lower Volume) -->
<flow id="flow_N_to_E" route="N_to_E" begin="0" end="3600" vehsPerHour="150" type="passenger"/>
<flow id="flow_N_to_W" route="N_to_W" begin="0" end="3600" vehsPerHour="100" type="passenger"/>
<flow id="flow_E_to_N" route="E_to_N" begin="0" end="3600" vehsPerHour="120" type="passenger"/>
<flow id="flow_E_to_S" route="E_to_S" begin="0" end="3600" vehsPerHour="130" type="passenger"/>
<flow id="flow_S_to_E" route="S_to_E" begin="0" end="3600" vehsPerHour="110" type="passenger"/>
<flow id="flow_S_to_W" route="S_to_W" begin="0" end="3600" vehsPerHour="90" type="passenger"/>
<flow id="flow_W_to_N" route="W_to_N" begin="0" end="3600" vehsPerHour="100" type="passenger"/>
<flow id="flow_W_to_S" route="W_to_S" begin="0" end="3600" vehsPerHour="140" type="passenger"/>
<!-- Heavy Vehicles (Buses and Trucks) -->
<flow id="bus_N_to_S" route="N_to_S" begin="0" end="3600" vehsPerHour="20" type="bus"/>
<flow id="bus_E_to_W" route="E_to_W" begin="0" end="3600" vehsPerHour="15" type="bus"/>
<flow id="truck_mixed" route="N_to_S" begin="0" end="3600" vehsPerHour="50" type="truck"/>
</routes>