Source code for myriad.platform.logging.session_logger

"""Unified logger for training and evaluation sessions.

SessionLogger is the main entry point for all logging in Myriad. It composes
three backends to handle different destinations:
1. Memory - Captures metrics for return values
2. Disk - Saves episode trajectories
3. Remote (W&B) - Logs metrics and artifacts
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Callable

import numpy as np

from myriad.configs.default import Config, EvalConfig
from myriad.platform.types import EvaluationMetrics, TrainingMetrics

from .backends.disk import DiskBackend
from .backends.memory import MemoryBackend
from .backends.wandb import WandbBackend, init_wandb


[docs] class SessionLogger: """Unified logger for training and evaluation sessions. Focuses on logging metrics and episodes during runs. Artifact persistence (saving results, checkpoints) is handled by the result objects themselves. Handles three destinations automatically: 1. Memory - Captures metrics for return values 2. Disk - Saves episode trajectories 3. Remote - Logs to W&B (metrics + artifacts) Example: >>> logger = SessionLogger.for_training(config) >>> logger.log_training_step(...) >>> logger.log_evaluation(..., save_episodes=True) >>> training_metrics, eval_metrics = logger.get_results() >>> logger.finalize() """
[docs] def __init__( self, wandb_run: Any | None, run_dir: Path, seed: int = 0, ) -> None: """Initialize the session logger. Args: wandb_run: W&B run instance (None to disable remote logging) run_dir: Base directory for outputs (episode files, etc.) seed: Random seed for metadata """ self._wandb_run = wandb_run self._run_dir = run_dir self._seed = seed # Initialize backends self._memory = MemoryBackend() self._wandb = WandbBackend(wandb_run=wandb_run) # Disk backend base dir: prefer W&B local dir, fallback to run_dir episode_base_dir = self._get_episode_base_dir() self._disk = DiskBackend(base_dir=episode_base_dir, seed=seed)
[docs] @classmethod def for_training(cls, config: Config, run_dir: Path | None = None) -> "SessionLogger": """Create a logger for training sessions. Args: config: Training configuration run_dir: Output directory for artifacts (default: current directory) Returns: Configured SessionLogger instance """ wandb_run = init_wandb(config) if run_dir is None: run_dir = Path.cwd() # Hydra sets this to the output dir logger = cls(wandb_run=wandb_run, run_dir=run_dir, seed=config.run.seed) logger._wandb.log_run_summary(config) return logger
[docs] @classmethod def for_evaluation(cls, config: EvalConfig, run_dir: Path | None = None) -> "SessionLogger": """Create a logger for evaluation-only sessions. Args: config: Evaluation configuration run_dir: Output directory for artifacts (default: current directory) Returns: Configured SessionLogger instance """ wandb_run = init_wandb(config) if run_dir is None: run_dir = Path.cwd() return cls(wandb_run=wandb_run, run_dir=run_dir, seed=config.run.seed)
def _get_episode_base_dir(self) -> Path: """Get the base directory for episode storage. Episodes always save to the run directory for consistency. W&B syncs from here as artifacts (not the other way around). """ return self._run_dir / "episodes" # --- Training API ---
[docs] def log_training_step( self, global_step: int, steps_per_env: int, metrics_history: dict[str, Any], steps_this_chunk: int, ) -> None: """Log training metrics. Handles memory capture + W&B logging. Args: global_step: Global environment steps steps_per_env: Steps per individual environment metrics_history: Raw metrics from the training loop steps_this_chunk: Number of steps in this chunk """ # Memory backend processes and returns host metrics metrics_host = self._memory.log_training_step(global_step, steps_per_env, metrics_history, steps_this_chunk) # Send to W&B if metrics_host: self._wandb.log_training(metrics_host, global_step, steps_per_env)
# --- Evaluation API ---
[docs] def log_evaluation( self, global_step: int, steps_per_env: int, eval_results: dict[str, Any], save_episodes: bool = False, episode_save_count: int | None = None, ) -> Path | None: """Log evaluation results. One call handles: - Captures metrics to memory - Saves episodes to disk (if save_episodes=True) - Logs metrics to W&B - Uploads episode artifacts to W&B Args: global_step: Global environment steps steps_per_env: Steps per individual environment eval_results: Dictionary with 'episode_return', 'episode_length', 'dones', and optionally 'episodes' (trajectory data) save_episodes: If True, save episodes to disk and log to W&B episode_save_count: Number of episodes to save (None = all available) Returns: Path to saved episodes directory (if saved), else None """ # Memory capture self._memory.log_evaluation(global_step, steps_per_env, eval_results) # W&B metrics self._wandb.log_evaluation(eval_results, global_step, steps_per_env) # Episode persistence episode_dir = None if save_episodes and "episodes" in eval_results: # Default to all available episodes if episode_save_count is None: episode_lengths = eval_results.get("episode_length") episode_save_count = len(episode_lengths) if episode_lengths is not None else 0 if episode_save_count > 0: episode_dir = self._disk.save_episodes(eval_results, global_step, steps_per_env, episode_save_count) # Log artifact to W&B if episode_dir is not None: self._wandb.log_episodes(episode_dir, global_step) return episode_dir
# --- Lifecycle ---
[docs] def get_results(self) -> tuple[TrainingMetrics, EvaluationMetrics]: """Return captured metrics without closing the session.""" return self._memory.get_results()
[docs] def finalize(self, exit_code: int = 0) -> None: """Close the W&B run. Args: exit_code: 0 for clean/intentional exit (finished, killed by sweep agent, user-stopped), 1 for unexpected failure (OOM, crash). """ self._wandb.finish(exit_code=exit_code)
# --- Video Rendering ---
[docs] def log_videos( self, episode_dir: Path, render_frame_fn: Callable[[np.ndarray], np.ndarray], global_step: int, fps: int = 50, max_episodes: int | None = None, video_dir: Path | None = None, ) -> None: """Render saved episodes to videos and log to W&B. Args: episode_dir: Path to directory containing .npz episode files render_frame_fn: Function that takes observation array and returns RGB frame global_step: Global environment steps (for W&B logging step) fps: Frames per second for rendered videos max_episodes: Maximum number of episodes to render (None = all) video_dir: Optional output directory for videos (if None, creates temporary videos) """ self._wandb.log_videos( episode_dir=episode_dir, render_frame_fn=render_frame_fn, global_step=global_step, fps=fps, max_episodes=max_episodes, video_dir=video_dir, )
# --- Properties --- @property def wandb_run(self) -> Any | None: """Get the underlying W&B run instance.""" return self._wandb_run @property def episode_base_dir(self) -> Path: """Get the base directory for episode storage.""" return self._disk.base_dir