"""Unified logger for training and evaluation sessions.
SessionLogger is the main entry point for all logging in Myriad. It composes
three backends to handle different destinations:
1. Memory - Captures metrics for return values
2. Disk - Saves episode trajectories
3. Remote (W&B) - Logs metrics and artifacts
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable
import numpy as np
from myriad.configs.default import Config, EvalConfig
from myriad.platform.types import EvaluationMetrics, TrainingMetrics
from .backends.disk import DiskBackend
from .backends.memory import MemoryBackend
from .backends.wandb import WandbBackend, init_wandb
[docs]
class SessionLogger:
"""Unified logger for training and evaluation sessions.
Focuses on logging metrics and episodes during runs. Artifact persistence
(saving results, checkpoints) is handled by the result objects themselves.
Handles three destinations automatically:
1. Memory - Captures metrics for return values
2. Disk - Saves episode trajectories
3. Remote - Logs to W&B (metrics + artifacts)
Example:
>>> logger = SessionLogger.for_training(config)
>>> logger.log_training_step(...)
>>> logger.log_evaluation(..., save_episodes=True)
>>> training_metrics, eval_metrics = logger.get_results()
>>> logger.finalize()
"""
[docs]
def __init__(
self,
wandb_run: Any | None,
run_dir: Path,
seed: int = 0,
) -> None:
"""Initialize the session logger.
Args:
wandb_run: W&B run instance (None to disable remote logging)
run_dir: Base directory for outputs (episode files, etc.)
seed: Random seed for metadata
"""
self._wandb_run = wandb_run
self._run_dir = run_dir
self._seed = seed
# Initialize backends
self._memory = MemoryBackend()
self._wandb = WandbBackend(wandb_run=wandb_run)
# Disk backend base dir: prefer W&B local dir, fallback to run_dir
episode_base_dir = self._get_episode_base_dir()
self._disk = DiskBackend(base_dir=episode_base_dir, seed=seed)
[docs]
@classmethod
def for_training(cls, config: Config, run_dir: Path | None = None) -> "SessionLogger":
"""Create a logger for training sessions.
Args:
config: Training configuration
run_dir: Output directory for artifacts (default: current directory)
Returns:
Configured SessionLogger instance
"""
wandb_run = init_wandb(config)
if run_dir is None:
run_dir = Path.cwd() # Hydra sets this to the output dir
logger = cls(wandb_run=wandb_run, run_dir=run_dir, seed=config.run.seed)
logger._wandb.log_run_summary(config)
return logger
[docs]
@classmethod
def for_evaluation(cls, config: EvalConfig, run_dir: Path | None = None) -> "SessionLogger":
"""Create a logger for evaluation-only sessions.
Args:
config: Evaluation configuration
run_dir: Output directory for artifacts (default: current directory)
Returns:
Configured SessionLogger instance
"""
wandb_run = init_wandb(config)
if run_dir is None:
run_dir = Path.cwd()
return cls(wandb_run=wandb_run, run_dir=run_dir, seed=config.run.seed)
def _get_episode_base_dir(self) -> Path:
"""Get the base directory for episode storage.
Episodes always save to the run directory for consistency.
W&B syncs from here as artifacts (not the other way around).
"""
return self._run_dir / "episodes"
# --- Training API ---
[docs]
def log_training_step(
self,
global_step: int,
steps_per_env: int,
metrics_history: dict[str, Any],
steps_this_chunk: int,
) -> None:
"""Log training metrics.
Handles memory capture + W&B logging.
Args:
global_step: Global environment steps
steps_per_env: Steps per individual environment
metrics_history: Raw metrics from the training loop
steps_this_chunk: Number of steps in this chunk
"""
# Memory backend processes and returns host metrics
metrics_host = self._memory.log_training_step(global_step, steps_per_env, metrics_history, steps_this_chunk)
# Send to W&B
if metrics_host:
self._wandb.log_training(metrics_host, global_step, steps_per_env)
# --- Evaluation API ---
[docs]
def log_evaluation(
self,
global_step: int,
steps_per_env: int,
eval_results: dict[str, Any],
save_episodes: bool = False,
episode_save_count: int | None = None,
) -> Path | None:
"""Log evaluation results.
One call handles:
- Captures metrics to memory
- Saves episodes to disk (if save_episodes=True)
- Logs metrics to W&B
- Uploads episode artifacts to W&B
Args:
global_step: Global environment steps
steps_per_env: Steps per individual environment
eval_results: Dictionary with 'episode_return', 'episode_length', 'dones',
and optionally 'episodes' (trajectory data)
save_episodes: If True, save episodes to disk and log to W&B
episode_save_count: Number of episodes to save (None = all available)
Returns:
Path to saved episodes directory (if saved), else None
"""
# Memory capture
self._memory.log_evaluation(global_step, steps_per_env, eval_results)
# W&B metrics
self._wandb.log_evaluation(eval_results, global_step, steps_per_env)
# Episode persistence
episode_dir = None
if save_episodes and "episodes" in eval_results:
# Default to all available episodes
if episode_save_count is None:
episode_lengths = eval_results.get("episode_length")
episode_save_count = len(episode_lengths) if episode_lengths is not None else 0
if episode_save_count > 0:
episode_dir = self._disk.save_episodes(eval_results, global_step, steps_per_env, episode_save_count)
# Log artifact to W&B
if episode_dir is not None:
self._wandb.log_episodes(episode_dir, global_step)
return episode_dir
# --- Lifecycle ---
[docs]
def get_results(self) -> tuple[TrainingMetrics, EvaluationMetrics]:
"""Return captured metrics without closing the session."""
return self._memory.get_results()
[docs]
def finalize(self, exit_code: int = 0) -> None:
"""Close the W&B run.
Args:
exit_code: 0 for clean/intentional exit (finished, killed by sweep agent,
user-stopped), 1 for unexpected failure (OOM, crash).
"""
self._wandb.finish(exit_code=exit_code)
# --- Video Rendering ---
[docs]
def log_videos(
self,
episode_dir: Path,
render_frame_fn: Callable[[np.ndarray], np.ndarray],
global_step: int,
fps: int = 50,
max_episodes: int | None = None,
video_dir: Path | None = None,
) -> None:
"""Render saved episodes to videos and log to W&B.
Args:
episode_dir: Path to directory containing .npz episode files
render_frame_fn: Function that takes observation array and returns RGB frame
global_step: Global environment steps (for W&B logging step)
fps: Frames per second for rendered videos
max_episodes: Maximum number of episodes to render (None = all)
video_dir: Optional output directory for videos (if None, creates temporary videos)
"""
self._wandb.log_videos(
episode_dir=episode_dir,
render_frame_fn=render_frame_fn,
global_step=global_step,
fps=fps,
max_episodes=max_episodes,
video_dir=video_dir,
)
# --- Properties ---
@property
def wandb_run(self) -> Any | None:
"""Get the underlying W&B run instance."""
return self._wandb_run
@property
def episode_base_dir(self) -> Path:
"""Get the base directory for episode storage."""
return self._disk.base_dir