Skip to main content

ref_observation_manager

Module: GBC.gyms.isaaclab_45.managers.ref_observation_manager

This module implements the core reference observation registration and management system for imitation learning tasks. The RefObservationManager serves as the central hub for loading, processing, and providing reference motion data from GBC standard format pickle files to the training pipeline. It integrates seamlessly with the high-performance BufferManager to enable real-time, efficient computation of reference variables during training, while supporting advanced features like observation mirroring, AMP observation extraction, and flexible network input configuration.

🎯 Core Functionality

The RefObservationManager provides:

  • Reference Data Integration: Loads and manages GBC standard format pickle files containing reference motion data
  • BufferManager Integration: Seamless connection with the high-performance buffer system for efficient data access
  • Observation Term Registration: Flexible system for defining and configuring reference observation terms
  • Real-Time Computation: High-performance computation of reference variables during training episodes
  • Advanced Features: Support for observation mirroring, AMP extraction, history buffering, and selective network inclusion
  • Multi-Mode Operation: Support for singular, recurrent, and recurrent_strict playback modes

📚 Dependencies

from collections.abc import Sequence
from prettytable import PrettyTable
from isaaclab.envs import ManagerBasedEnv
from isaaclab.managers import ManagerBase, ManagerTermBase
from isaaclab.utils import modifiers
from isaaclab.utils.buffers import CircularBuffer
from .ref_obs_term_cfg import ReferenceObservationCfg, ReferenceObservationTermCfg, ReferenceObservationGroupCfg
from glob import glob
from GBC.utils.buffer.ref_buffer import BufferManager
import torch
import time
import copy
import os

🏭 RefObservationManager Class

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager

Class Definition:

class RefObservationManager(ManagerBase):
def __init__(self, cfg: ReferenceObservationCfg, env: ManagerBasedEnv):
assert cfg is not None, "The Configuration for RefObservationManager shouldn't be empty (in order to call _prepare_terms())"
self._env = env
self._env_start_time = torch.zeros(self.num_envs, device=self.device)
self._load_pickles(cfg)
self.static_delay = cfg.static_delay
# Debug timing attributes
self.debug_show_time_between_steps = 100
self.debug_count_steps = 0
self.debug_not_compute_obs_time = 0
self.debug_compute_obs_time = 0
self.debug_last_done_time = None
super().__init__(cfg, env)

📥 Initialization Parameters:

  • cfg (ReferenceObservationCfg): Configuration object containing data directories, working mode, and observation terms
  • env (ManagerBasedEnv): The IsaacLab environment instance

🔧 Core Attributes:

  • _env_start_time (torch.Tensor): Environment-specific start times for temporal synchronization [num_envs]
  • static_delay (float): Global static delay for reference observation retrieval
  • _group_ref_obs_term_buffer_manager (dict[str, BufferManager]): BufferManager instances for each observation group
  • _group_ref_obs_term_names (dict[str, list[str]]): Registered observation term names per group
  • _group_ref_obs_term_cfgs (dict[str, list[ReferenceObservationTermCfg]]): Configuration objects for each term

🔧 Core Properties and Methods

📊 Essential Properties

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.start_time

Property Definition:

@property
def start_time(self) -> torch.Tensor:
return self._env_start_time

@start_time.setter
def start_time(self, time: torch.Tensor):
assert time.shape == (self.num_envs,), "start_time should be of shape (num_envs,)"
self._env_start_time = time

🎯 Purpose: Manages environment-specific start times for precise temporal synchronization between simulation and reference data.


Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.active_terms

Property Definition:

@property
def active_terms(self) -> dict[str, list[str]]:
return self._group_ref_obs_term_names

🎯 Purpose: Returns dictionary mapping group names to lists of active observation term names.


Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.group_ref_obs_dim

Property Definition:

@property
def group_ref_obs_dim(self) -> dict[str, tuple[int, ...] | list[tuple[int, ...]]]:
return self._group_ref_obs_dim

🎯 Purpose: Provides dimension information for each observation group, essential for network architecture design.


🔄 Environment Reset and Synchronization

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.reset

Method Signature:

def reset(self, env, env_ids: Sequence[int] | None = None) -> dict:

📥 Input Parameters:

  • env: Environment instance for accessing robot state
  • env_ids (Sequence[int] | None): Environment indices to reset (None for all environments)

🔧 Reset Process:

def reset(self, env, env_ids: Sequence[int] | None = None):
# 1. Reset observation term configurations
for group_name, group_cfg in self._group_ref_obs_class_term_cfgs.items():
for term_cfg in group_cfg:
if hasattr(term_cfg, "reset"):
term_cfg.reset(env_ids=env_ids)

# 2. Reset observation term storage
for term_name in self._group_ref_obs_term_names[group_name]:
if term_name in self._group_ref_obs_term_tmp_storage[group_name]:
# Clear temporary storage for fresh computation
del self._group_ref_obs_term_tmp_storage[group_name][term_name]

# 3. Reset modifiers
for mod in self._group_ref_obs_class_modifiers:
mod.reset(env_ids=env_ids)

# 4. Reset buffer managers with domain randomization
for buf_manager in self._group_ref_obs_term_buffer_manager.values():
buf_manager.reset(env, env_ids)

return {}

🎯 Reset Capabilities:

  • Term-Level Reset: Individual observation terms can implement custom reset logic
  • Storage Cleanup: Temporary storage cleared to ensure fresh computation
  • Modifier Reset: All registered modifiers reset their internal state
  • Buffer Synchronization: BufferManager instances reset with potential reference reassignment

Core Computation Engine

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute

Method Signature:

def compute(self, cur_time: torch.Tensor, add_start_time: bool = True, symmetry: bool = False) -> dict[str, tuple[torch.Tensor, torch.Tensor] | None | dict[str, tuple[torch.Tensor, torch.Tensor] | None]]:

📥 Input Parameters:

  • cur_time (torch.Tensor): Current simulation time for each environment [num_envs]
  • add_start_time (bool): Whether to add environment start time offset
  • symmetry (bool): Whether to compute symmetrical observations (for data augmentation)

📤 Return Values:

  • dict: Mapping group names to observation tuples (observations, masks) or nested dictionaries

🔧 Computation Logic:

def compute(self, cur_time: torch.Tensor, add_start_time: bool = True, symmetry: bool = False):
ref_obs = dict()

# Apply temporal offset if requested
if add_start_time:
cur_time = cur_time + self._env_start_time

# Compute observations for each registered group
for group_name in self._group_ref_obs_term_names:
ref_obs[group_name] = self.compute_group(group_name, cur_time, symmetry)

return ref_obs

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute_group

Method Signature:

def compute_group(self, group_name: str, cur_time: torch.Tensor, symmetry: bool = False) -> tuple[torch.Tensor, torch.Tensor] | None | dict[str, tuple[torch.Tensor, torch.Tensor] | None]:

📥 Input Parameters:

  • group_name (str): Name of the observation group to compute
  • cur_time (torch.Tensor): Current simulation time [num_envs]
  • symmetry (bool): Enable symmetrical observation computation

🔧 Advanced Computation Pipeline:

Phase 1: Time Synchronization and Validation

# Apply static delay compensation
cur_time = cur_time - self.static_delay

# Validate group name
if group_name not in self._group_ref_obs_term_names:
raise ValueError(f"Invalid group name '{group_name}'. Expected one of: {self._group_ref_obs_term_names.keys()}.")

# Initialize computation structures
group_term_names = self._group_ref_obs_term_names[group_name]
group_buffer_manager = self._group_ref_obs_term_buffer_manager[group_name]
group_ref_obs = dict.fromkeys(group_term_names, None)
obs_mask = None

Phase 2: Term-by-Term Computation

for term_name, term_cfg in zip(group_term_names, self._group_ref_obs_term_cfgs[group_name]):
# Apply term-specific delay
term_delay = self._group_ref_obs_init_delay[group_name][term_index]
cp_cur_time = torch.clamp(cur_time - term_delay, min=0.0)

# Compute observation based on term type
if term_cfg.is_base_pose:
# Use cumulative base pose computation for precise integration
obs = group_buffer_manager.calc_base_pose_cumulative(
cp_cur_time, "lin_vel", "ang_vel"
)
mask = group_buffer_manager.calc_mask(cp_cur_time)
else:
# Standard observation from buffer
obs = group_buffer_manager.calc_obs(term_name, cp_cur_time)
mask = group_buffer_manager.calc_mask(cp_cur_time)

# Initialize global mask if first valid term
if obs_mask is None and not term_cfg.make_empty:
obs_mask = mask

# Apply processing pipeline
obs = self._apply_processing_pipeline(obs, term_cfg)

# Update storage and history (if not in symmetry mode)
if not symmetry:
self._update_term_storage_and_history(term_name, group_name, obs, mask)

# Include in observation tensor if configured
if term_cfg.in_obs_tensor:
group_ref_obs[term_name] = (obs, mask)

Phase 3: Output Formatting

if self._group_ref_obs_concatenate[group_name]:
# Concatenated output mode
term_obs = [group_ref_obs[term_name] for term_name in group_term_names]
term_obs = filter(lambda x: x is not None, term_obs)

# Concatenate observations along last dimension
obs = torch.cat([tup[0] for tup in term_obs], dim=-1)

# Handle invalid observations
if obs_mask is None or not obs_mask.any():
return None

return (obs, obs_mask)
else:
# Dictionary output mode
return group_ref_obs

⚡ Processing Pipeline for Each Term:

  1. Modifier Application: Custom transformation functions applied in sequence
  2. Noise Injection: Configurable noise models for domain randomization
  3. Clipping: Value range constraints to prevent outliers
  4. Scaling: Linear scaling transformations for normalization
  5. History Buffer Update: Temporal history management for recurrent observations

🎭 Advanced Symmetry and AMP Support

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute_policy_symmetry

Method Signature:

def compute_policy_symmetry(self, ref_observations_tuple: tuple[torch.Tensor, torch.Tensor]) -> torch.Tensor:

📥 Input Parameters:

  • ref_observations_tuple (tuple[torch.Tensor, torch.Tensor]): Original observations and masks

🔧 Symmetry Computation Logic:

def compute_policy_symmetry(self, ref_observations_tuple: tuple[torch.Tensor, torch.Tensor]):
assert "policy" in self._group_ref_obs_term_names, "policy group is not defined, cannot compute policy symmetry"

# Extract group configuration
group_term_names = self._group_ref_obs_term_names["policy"]
group_term_cfgs = self._group_ref_obs_term_cfgs["policy"]
group_term_concatenate_last_dim = copy.deepcopy(self._group_ref_obs_term_concatenate_last_dim["policy"])

ref_observations, ref_masks = ref_observations_tuple
group_term_concatenate_last_dim += [ref_observations.shape[-1]]

# Initialize symmetrical observations
symmetry_ref_observations = torch.zeros_like(ref_observations)

# Apply symmetry to each term
for i, (term_name, term_cfg) in enumerate(zip(group_term_names, group_term_cfgs)):
if not term_cfg.in_obs_tensor:
continue

assert term_cfg.symmetry is not None, f"Symmetry is not implemented for term {term_name} in group policy"

# Extract term data from concatenated observation
start_idx = group_term_concatenate_last_dim[i]
end_idx = group_term_concatenate_last_dim[i+1]
term_data = ref_observations[..., start_idx:end_idx]

# Apply symmetry transformation
if term_cfg.symmetry_params and term_cfg.symmetry_params.get("term_name", None) is not None:
# Custom symmetry with parameters
symmetry_term_data = term_cfg.symmetry(term_data, **term_cfg.symmetry_params)
else:
# Standard symmetry function
symmetry_term_data = term_cfg.symmetry(term_data)

# Insert symmetrical data
symmetry_ref_observations[..., start_idx:end_idx] = symmetry_term_data

return (symmetry_ref_observations, ref_masks)

🎯 Symmetry Applications:

  • Data Augmentation: Left-right motion mirroring for training robustness
  • Sample Efficiency: Doubles effective training data through symmetrical variations
  • Domain Adaptation: Improves generalization across different motion patterns

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute_amp_dims

Method Signature:

def compute_amp_dims(self, term_names: list[str]) -> list[tuple[int, int]]:

📥 Input Parameters:

  • term_names (list[str]): List of observation term names for AMP extraction

📤 Return Values:

  • list[tuple[int, int]]: List of (start_idx, end_idx) pairs for each term in concatenated observation

🔧 AMP Dimension Computation:

def compute_amp_dims(self, term_names: list[str]) -> list[tuple[int, int]]:
# Caching mechanism for performance
if hasattr(self, "last_amp_term_names"):
if term_names == self.last_amp_term_names and hasattr(self, "last_amp_dims"):
return self.last_amp_dims

# Compute dimension indices for each term
amp_dims = []
for term_name in term_names:
amp_dims.append(self._compute_policy_term_idx(term_name))

# Cache results for future calls
self.last_amp_term_names = term_names
self.last_amp_dims = amp_dims
return amp_dims

🎯 AMP Integration Benefits:

  • Selective Extraction: Efficiently extract specific observation components for AMP discriminator
  • Performance Optimization: Cached dimension computation for repeated access
  • Flexible Configuration: Support for arbitrary term combinations in AMP training

📁 Data Loading and Management

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager._load_pickles

Method Signature:

def _load_pickles(self, cfg: ReferenceObservationCfg) -> None:

🔧 Data Loading Process:

def _load_pickles(self, cfg: ReferenceObservationCfg):
self.data_dir = cfg.data_dir
files = []

# Collect all pickle files from specified directories
for data_dir in self.data_dir:
if os.path.isdir(data_dir):
# Directory: collect all .pkl files
pattern = os.path.join(data_dir, "**/*.pkl")
files.extend(glob(pattern, recursive=True))
else:
# Single file: add directly
files.append(data_dir)

assert len(files) > 0, "No pickle files found in the data directory"

# Random assignment of files to environments for domain randomization
self.file_indices = torch.randint(0, len(files), (self.num_envs,), device=self.device)
self.files = [files[i] for i in self.file_indices]

# Load all pickle files into memory
self.tmp_storage = [torch.load(file) for file in files]

📊 Data Loading Features:

  • Multi-Directory Support: Load from multiple data directories with recursive search
  • Random Assignment: Environment-to-file mapping for domain randomization
  • Memory Efficiency: Shared pickle data across environments using the same reference
  • GBC Format Compatibility: Direct loading of GBC standard format pickle files

Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager._resolve_reference_term_cfg

Method Signature:

def _resolve_reference_term_cfg(self, term_name: str, term_cfg: ReferenceObservationTermCfg) -> tuple:

🔧 Term Resolution Process:

def _resolve_reference_term_cfg(self, term_name: str, term_cfg: ReferenceObservationTermCfg):
# Parse group and term names
if '/' in term_name:
group_name, term_name = term_name.split('/')
else:
raise ValueError(f"Invalid term name {term_name}. Expected format: group_name/term_name")

# Register term in group
self._group_ref_obs_term_names[group_name].append(term_name)
self._group_ref_obs_term_cfgs[group_name].append(term_cfg)

# Initialize BufferManager for group if needed
if self._group_ref_obs_term_buffer_manager[group_name] is None:
self._group_ref_obs_term_buffer_manager[group_name] = BufferManager(
self.num_envs, len(self.tmp_storage), self.cfg.working_mode, self.device
)
self._group_ref_obs_term_buffer_manager[group_name].set_all_env_ref_id(self.file_indices)

# Configure term delay
self._group_ref_obs_init_delay[group_name].append(term_cfg.load_seq_delay)

# Determine observation dimensions
if term_cfg.is_base_pose:
# Base pose: 7D (position + quaternion) or 0D if not in tensor
ref_obs_dims = (0,) if not term_cfg.in_obs_tensor else (7,)
else:
# Standard observation: load from pickle data
buffer_manager = self._group_ref_obs_term_buffer_manager[group_name]
for ref_id, pkl in enumerate(self.tmp_storage):
# Extract cyclic subsequence if specified
cyclic_subseq = pkl.get("cyclic_subseq", None)

# Add reference data to buffer
buffer_manager.add_reference(
key=term_name,
ref_id=ref_id,
buffer_raw=pkl[term_name].to(self.device),
is_constant=term_cfg.is_constant,
frame_rate=pkl["fps"],
cyclic_subseq=cyclic_subseq
)

# Prepare buffers for efficient access
buffer_manager.prepare_buffers(term_name)

# Get dimensions from prepared buffer
ref_obs_dims = (0,) if not term_cfg.in_obs_tensor or term_cfg.make_empty else buffer_manager.get_dim(term_name)

# Handle history buffering
if term_cfg.history_length > 0:
old_dims = list(ref_obs_dims)
old_dims.insert(1, term_cfg.history_length) # Add history dimension
ref_obs_dims = tuple(old_dims)

if term_cfg.flatten_history_dim:
# Flatten history dimension: (N, H, D) -> (N, H*D)
flat_dim = 1
for dim in ref_obs_dims[1:]:
flat_dim *= dim
ref_obs_dims = (ref_obs_dims[0], flat_dim)

# Store dimensions
self._group_ref_obs_term_dim[group_name].append(ref_obs_dims)
return ref_obs_dims

📋 Configuration Framework

🏗️ ReferenceObservationTermCfg

Module Name: GBC.gyms.isaaclab_45.managers.ref_obs_term_cfg.ReferenceObservationTermCfg

Key Configuration Options:

Core Settings

  • name (str | dict | None): Reference observation term identifier
  • func (Callable | None): Processing function for term modification
  • params (dict[str, Any] | None): Parameters for processing function

Symmetry and Mirroring

  • symmetry (Callable | None): Function for symmetrical observation computation
  • symmetry_params (dict[str, Any]): Parameters for symmetry function

Processing Pipeline

  • modifiers (list[ModifierCfg] | None): Sequential modification functions
  • noise (NoiseCfg | None): Noise model for domain randomization
  • clip (tuple[float, float] | None): Value clipping bounds
  • scale (float | None): Linear scaling factor

Behavior Control

  • make_empty (bool): Generate empty observations (for debugging/testing)
  • in_obs_tensor (bool): Include in network input tensor
  • is_constant (bool): Time-invariant observation flag
  • is_base_pose (bool): Use cumulative base pose computation

Temporal Configuration

  • load_seq_delay (float): Sequence loading delay in seconds
  • history_length (int): Number of historical observations to store
  • flatten_history_dim (bool): Flatten history dimension for network input

💡 Usage Examples

🎯 Basic Reference Action Configuration

from GBC.gyms.isaaclab_45.managers.ref_obs_term_cfg import ReferenceObservationTermCfg
from isaaclab.assets import SceneEntityCfg

# Configure target actions for policy training
target_actions = ReferenceObservationTermCfg(
func=reference_action_reshape, # Processing function
name="actions", # Data key in pickle files
noise=None, # No noise for precise imitation
symmetry=actions_symmetry, # Symmetry function for data augmentation
symmetry_params={"flipper": flipper}, # Parameters for symmetry function
params={
"urdf_path": DATA_PATHS.urdf_path,
"asset_cfg": SceneEntityCfg("robot", joint_names=[".*"]),
"add_default_joint_pos": False,
}
)

🎯 Configuration Features:

  • Function-Based Processing: Custom reshaping for robot-specific joint mappings
  • Symmetry Support: Bilateral mirroring for left-right data augmentation
  • URDF Integration: Robot-specific configuration through URDF parameters
  • Joint Selection: Flexible joint subset selection through regex patterns

📊 Constant Reference Data (Statistics)

# Configure action standard deviations (constant across time)
target_actions_std = ReferenceObservationTermCfg(
func=reference_action_std, # Compute standard deviations
name="actions", # Source data key
noise=None, # No noise for statistics
in_obs_tensor=False, # Not included in network input
is_constant=True, # Time-invariant data
symmetry=actions_symmetry, # Symmetry for consistency
symmetry_params={"flipper": flipper},
params={
"urdf_path": DATA_PATHS.urdf_path,
"asset_cfg": SceneEntityCfg("robot", joint_names=[".*"]),
"add_default_joint_pos": False,
}
)

📈 Statistical Configuration Features:

  • Constant Data: is_constant=True for time-invariant statistics
  • Network Exclusion: in_obs_tensor=False prevents inclusion in policy input
  • Statistical Functions: Compute standard deviations, means, ranges, etc.
  • Consistency: Same symmetry functions ensure statistical consistency

🏗️ Complete Group Configuration

from GBC.gyms.isaaclab_45.managers.ref_obs_term_cfg import ReferenceObservationGroupCfg, ReferenceObservationCfg

# Configure observation group
@configclass
class PolicyReferenceObservationGroupCfg(ReferenceObservationGroupCfg):
"""Policy-specific reference observations"""

# Core action targets
target_actions = target_actions
target_actions_std = target_actions_std

# Base pose tracking
target_base_pose = ReferenceObservationTermCfg(
name="base_pose",
is_base_pose=True, # Use cumulative pose computation
history_length=5, # 5-step history
flatten_history_dim=True, # Flatten for network
)

# Joint positions with noise
target_joint_pos = ReferenceObservationTermCfg(
name="joint_positions",
noise=GaussianNoiseCfg(mean=0.0, std=0.02), # Add realistic noise
clip=(-3.14, 3.14), # Joint limits
history_length=3, # Short history
)

# Main configuration
@configclass
class RefObservationCfg(ReferenceObservationCfg):
data_dir = ["/path/to/gbc/data/converted_actions"]
working_mode = "recurrent" # Cyclic playback
static_delay = 0.1 # 100ms static delay

# Register observation groups
policy = PolicyReferenceObservationGroupCfg()
critic = PolicyReferenceObservationGroupCfg() # Same for critic

🔄 Advanced Features Demo

# Environment setup with RefObservationManager
env_cfg.ref_observation = RefObservationCfg()
env = ManagerBasedRefRLEnv(env_cfg)

# Compute reference observations
current_time = torch.tensor([1.5, 2.0, 0.5, 3.2], device="cuda:0") # [num_envs]
ref_obs = env.ref_observation_manager.compute(current_time)

# Access specific group observations
policy_obs, policy_mask = ref_obs["policy"] # [num_envs, obs_dim], [num_envs]
print(f"Policy observations shape: {policy_obs.shape}")
print(f"Valid environments: {policy_mask.sum()}/{len(policy_mask)}")

# Compute symmetrical observations for data augmentation
if ref_obs["policy"] is not None:
sym_policy_obs = env.ref_observation_manager.compute_policy_symmetry(ref_obs["policy"])
print(f"Symmetrical observations computed: {sym_policy_obs[0].shape}")

# Extract AMP-specific observations
amp_term_names = ["target_actions", "target_joint_pos"]
amp_dims = env.ref_observation_manager.compute_amp_dims(amp_term_names)
print(f"AMP dimensions: {amp_dims}")

# Extract AMP observations from concatenated tensor
amp_observations = []
for start_idx, end_idx in amp_dims:
amp_obs = policy_obs[:, start_idx:end_idx]
amp_observations.append(amp_obs)

🚨 Best Practices and Guidelines

Configuration Best Practices

  1. Group Organization: Organize observations by their intended use (policy, critic, discriminator)
  2. Network Inclusion: Use in_obs_tensor=False for statistics and debug information
  3. History Management: Configure appropriate history lengths for temporal dependencies
  4. Noise Application: Add realistic noise only to observations that benefit from domain randomization

🔧 Performance Optimization

  1. BufferManager Integration: Leverage high-performance buffer access for real-time training
  2. Caching Mechanisms: Utilize built-in caching for repeated computations (AMP dims, symmetry)
  3. Memory Management: Use is_constant=True for time-invariant data to reduce memory usage
  4. Batch Processing: All operations support batch processing across environments

📊 Data Format Requirements

GBC Standard Pickle Format:

{
"actions": torch.Tensor, # [T, num_actions] - Robot joint actions
"trans": torch.Tensor, # [T, 3] - Root translations
"root_orient": torch.Tensor, # [T, 3] - Root orientations (angle-axis)
"lin_vel": torch.Tensor, # [T, 3] - Linear velocities
"ang_vel": torch.Tensor, # [T, 3] - Angular velocities
"fps": float, # Frame rate
"cyclic_subseq": tuple, # Optional: (start_idx, end_idx) for cyclic sequences
# Additional custom fields as needed
}

This comprehensive reference observation management system provides the foundation for sophisticated imitation learning, enabling efficient real-time access to reference motion data while supporting advanced features like symmetry computation, AMP integration, and flexible network input configuration.