ref_observation_manager
Module: GBC.gyms.isaaclab_45.managers.ref_observation_manager
This module implements the core reference observation registration and management system for imitation learning tasks. The RefObservationManager serves as the central hub for loading, processing, and providing reference motion data from GBC standard format pickle files to the training pipeline. It integrates seamlessly with the high-performance BufferManager to enable real-time, efficient computation of reference variables during training, while supporting advanced features like observation mirroring, AMP observation extraction, and flexible network input configuration.
🎯 Core Functionality
The RefObservationManager provides:
- Reference Data Integration: Loads and manages GBC standard format pickle files containing reference motion data
- BufferManager Integration: Seamless connection with the high-performance buffer system for efficient data access
- Observation Term Registration: Flexible system for defining and configuring reference observation terms
- Real-Time Computation: High-performance computation of reference variables during training episodes
- Advanced Features: Support for observation mirroring, AMP extraction, history buffering, and selective network inclusion
- Multi-Mode Operation: Support for singular, recurrent, and recurrent_strict playback modes
📚 Dependencies
from collections.abc import Sequence
from prettytable import PrettyTable
from isaaclab.envs import ManagerBasedEnv
from isaaclab.managers import ManagerBase, ManagerTermBase
from isaaclab.utils import modifiers
from isaaclab.utils.buffers import CircularBuffer
from .ref_obs_term_cfg import ReferenceObservationCfg, ReferenceObservationTermCfg, ReferenceObservationGroupCfg
from glob import glob
from GBC.utils.buffer.ref_buffer import BufferManager
import torch
import time
import copy
import os
🏭 RefObservationManager Class
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager
Class Definition:
class RefObservationManager(ManagerBase):
    def __init__(self, cfg: ReferenceObservationCfg, env: ManagerBasedEnv):
        assert cfg is not None, "The Configuration for RefObservationManager shouldn't be empty (in order to call _prepare_terms())"
        self._env = env
        self._env_start_time = torch.zeros(self.num_envs, device=self.device)
        self._load_pickles(cfg)
        self.static_delay = cfg.static_delay
        # Debug timing attributes
        self.debug_show_time_between_steps = 100
        self.debug_count_steps = 0
        self.debug_not_compute_obs_time = 0
        self.debug_compute_obs_time = 0
        self.debug_last_done_time = None
        super().__init__(cfg, env)
📥 Initialization Parameters:
- cfg (ReferenceObservationCfg): Configuration object containing data directories, working mode, and observation terms
- env (ManagerBasedEnv): The IsaacLab environment instance
🔧 Core Attributes:
- _env_start_time (torch.Tensor): Environment-specific start times for temporal synchronization [num_envs]
- static_delay (float): Global static delay for reference observation retrieval
- _group_ref_obs_term_buffer_manager (dict[str, BufferManager]): BufferManager instances for each observation group
- _group_ref_obs_term_names (dict[str, list[str]]): Registered observation term names per group
- _group_ref_obs_term_cfgs (dict[str, list[ReferenceObservationTermCfg]]): Configuration objects for each term
🔧 Core Properties and Methods
📊 Essential Properties
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.start_time
Property Definition:
@property
def start_time(self) -> torch.Tensor:
    return self._env_start_time
@start_time.setter
def start_time(self, time: torch.Tensor):
    assert time.shape == (self.num_envs,), "start_time should be of shape (num_envs,)"
    self._env_start_time = time
🎯 Purpose: Manages environment-specific start times for precise temporal synchronization between simulation and reference data.
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.active_terms
Property Definition:
@property
def active_terms(self) -> dict[str, list[str]]:
    return self._group_ref_obs_term_names
🎯 Purpose: Returns dictionary mapping group names to lists of active observation term names.
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.group_ref_obs_dim
Property Definition:
@property
def group_ref_obs_dim(self) -> dict[str, tuple[int, ...] | list[tuple[int, ...]]]:
    return self._group_ref_obs_dim
🎯 Purpose: Provides dimension information for each observation group, essential for network architecture design.
🔄 Environment Reset and Synchronization
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.reset
Method Signature:
def reset(self, env, env_ids: Sequence[int] | None = None) -> dict:
📥 Input Parameters:
- env: Environment instance for accessing robot state
- env_ids (Sequence[int] | None): Environment indices to reset (None for all environments)
🔧 Reset Process:
def reset(self, env, env_ids: Sequence[int] | None = None):
    # 1. Reset observation term configurations
    for group_name, group_cfg in self._group_ref_obs_class_term_cfgs.items():
        for term_cfg in group_cfg:
            if hasattr(term_cfg, "reset"):
                term_cfg.reset(env_ids=env_ids)
    # 2. Reset observation term storage
    for term_name in self._group_ref_obs_term_names[group_name]:
        if term_name in self._group_ref_obs_term_tmp_storage[group_name]:
            # Clear temporary storage for fresh computation
            del self._group_ref_obs_term_tmp_storage[group_name][term_name]
    # 3. Reset modifiers
    for mod in self._group_ref_obs_class_modifiers:
        mod.reset(env_ids=env_ids)
    
    # 4. Reset buffer managers with domain randomization
    for buf_manager in self._group_ref_obs_term_buffer_manager.values():
        buf_manager.reset(env, env_ids)
    
    return {}
🎯 Reset Capabilities:
- Term-Level Reset: Individual observation terms can implement custom reset logic
- Storage Cleanup: Temporary storage cleared to ensure fresh computation
- Modifier Reset: All registered modifiers reset their internal state
- Buffer Synchronization: BufferManager instances reset with potential reference reassignment
⚡ Core Computation Engine
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute
Method Signature:
def compute(self, cur_time: torch.Tensor, add_start_time: bool = True, symmetry: bool = False) -> dict[str, tuple[torch.Tensor, torch.Tensor] | None | dict[str, tuple[torch.Tensor, torch.Tensor] | None]]:
📥 Input Parameters:
- cur_time (torch.Tensor): Current simulation time for each environment [num_envs]
- add_start_time (bool): Whether to add environment start time offset
- symmetry (bool): Whether to compute symmetrical observations (for data augmentation)
📤 Return Values:
- dict: Mapping group names to observation tuples (observations, masks)or nested dictionaries
🔧 Computation Logic:
def compute(self, cur_time: torch.Tensor, add_start_time: bool = True, symmetry: bool = False):
    ref_obs = dict()
    
    # Apply temporal offset if requested
    if add_start_time:
        cur_time = cur_time + self._env_start_time
    
    # Compute observations for each registered group
    for group_name in self._group_ref_obs_term_names:
        ref_obs[group_name] = self.compute_group(group_name, cur_time, symmetry)
    
    return ref_obs
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute_group
Method Signature:
def compute_group(self, group_name: str, cur_time: torch.Tensor, symmetry: bool = False) -> tuple[torch.Tensor, torch.Tensor] | None | dict[str, tuple[torch.Tensor, torch.Tensor] | None]:
📥 Input Parameters:
- group_name (str): Name of the observation group to compute
- cur_time (torch.Tensor): Current simulation time [num_envs]
- symmetry (bool): Enable symmetrical observation computation
🔧 Advanced Computation Pipeline:
Phase 1: Time Synchronization and Validation
# Apply static delay compensation
cur_time = cur_time - self.static_delay
# Validate group name
if group_name not in self._group_ref_obs_term_names:
    raise ValueError(f"Invalid group name '{group_name}'. Expected one of: {self._group_ref_obs_term_names.keys()}.")
# Initialize computation structures
group_term_names = self._group_ref_obs_term_names[group_name]
group_buffer_manager = self._group_ref_obs_term_buffer_manager[group_name]
group_ref_obs = dict.fromkeys(group_term_names, None)
obs_mask = None
Phase 2: Term-by-Term Computation
for term_name, term_cfg in zip(group_term_names, self._group_ref_obs_term_cfgs[group_name]):
    # Apply term-specific delay
    term_delay = self._group_ref_obs_init_delay[group_name][term_index]
    cp_cur_time = torch.clamp(cur_time - term_delay, min=0.0)
    
    # Compute observation based on term type
    if term_cfg.is_base_pose:
        # Use cumulative base pose computation for precise integration
        obs = group_buffer_manager.calc_base_pose_cumulative(
            cp_cur_time, "lin_vel", "ang_vel"
        )
        mask = group_buffer_manager.calc_mask(cp_cur_time)
    else:
        # Standard observation from buffer
        obs = group_buffer_manager.calc_obs(term_name, cp_cur_time)
        mask = group_buffer_manager.calc_mask(cp_cur_time)
    
    # Initialize global mask if first valid term
    if obs_mask is None and not term_cfg.make_empty:
        obs_mask = mask
    
    # Apply processing pipeline
    obs = self._apply_processing_pipeline(obs, term_cfg)
    
    # Update storage and history (if not in symmetry mode)
    if not symmetry:
        self._update_term_storage_and_history(term_name, group_name, obs, mask)
    
    # Include in observation tensor if configured
    if term_cfg.in_obs_tensor:
        group_ref_obs[term_name] = (obs, mask)
Phase 3: Output Formatting
if self._group_ref_obs_concatenate[group_name]:
    # Concatenated output mode
    term_obs = [group_ref_obs[term_name] for term_name in group_term_names]
    term_obs = filter(lambda x: x is not None, term_obs)
    
    # Concatenate observations along last dimension
    obs = torch.cat([tup[0] for tup in term_obs], dim=-1)
    
    # Handle invalid observations
    if obs_mask is None or not obs_mask.any():
        return None
    
    return (obs, obs_mask)
else:
    # Dictionary output mode
    return group_ref_obs
⚡ Processing Pipeline for Each Term:
- Modifier Application: Custom transformation functions applied in sequence
- Noise Injection: Configurable noise models for domain randomization
- Clipping: Value range constraints to prevent outliers
- Scaling: Linear scaling transformations for normalization
- History Buffer Update: Temporal history management for recurrent observations
🎭 Advanced Symmetry and AMP Support
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute_policy_symmetry
Method Signature:
def compute_policy_symmetry(self, ref_observations_tuple: tuple[torch.Tensor, torch.Tensor]) -> torch.Tensor:
📥 Input Parameters:
- ref_observations_tuple (tuple[torch.Tensor, torch.Tensor]): Original observations and masks
🔧 Symmetry Computation Logic:
def compute_policy_symmetry(self, ref_observations_tuple: tuple[torch.Tensor, torch.Tensor]):
    assert "policy" in self._group_ref_obs_term_names, "policy group is not defined, cannot compute policy symmetry"
    
    # Extract group configuration
    group_term_names = self._group_ref_obs_term_names["policy"]
    group_term_cfgs = self._group_ref_obs_term_cfgs["policy"]
    group_term_concatenate_last_dim = copy.deepcopy(self._group_ref_obs_term_concatenate_last_dim["policy"])
    
    ref_observations, ref_masks = ref_observations_tuple
    group_term_concatenate_last_dim += [ref_observations.shape[-1]]
    
    # Initialize symmetrical observations
    symmetry_ref_observations = torch.zeros_like(ref_observations)
    
    # Apply symmetry to each term
    for i, (term_name, term_cfg) in enumerate(zip(group_term_names, group_term_cfgs)):
        if not term_cfg.in_obs_tensor:
            continue
            
        assert term_cfg.symmetry is not None, f"Symmetry is not implemented for term {term_name} in group policy"
        
        # Extract term data from concatenated observation
        start_idx = group_term_concatenate_last_dim[i]
        end_idx = group_term_concatenate_last_dim[i+1]
        term_data = ref_observations[..., start_idx:end_idx]
        
        # Apply symmetry transformation
        if term_cfg.symmetry_params and term_cfg.symmetry_params.get("term_name", None) is not None:
            # Custom symmetry with parameters
            symmetry_term_data = term_cfg.symmetry(term_data, **term_cfg.symmetry_params)
        else:
            # Standard symmetry function
            symmetry_term_data = term_cfg.symmetry(term_data)
        
        # Insert symmetrical data
        symmetry_ref_observations[..., start_idx:end_idx] = symmetry_term_data
    
    return (symmetry_ref_observations, ref_masks)
🎯 Symmetry Applications:
- Data Augmentation: Left-right motion mirroring for training robustness
- Sample Efficiency: Doubles effective training data through symmetrical variations
- Domain Adaptation: Improves generalization across different motion patterns
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager.compute_amp_dims
Method Signature:
def compute_amp_dims(self, term_names: list[str]) -> list[tuple[int, int]]:
📥 Input Parameters:
- term_names (list[str]): List of observation term names for AMP extraction
📤 Return Values:
- list[tuple[int, int]]: List of (start_idx, end_idx) pairs for each term in concatenated observation
🔧 AMP Dimension Computation:
def compute_amp_dims(self, term_names: list[str]) -> list[tuple[int, int]]:
    # Caching mechanism for performance
    if hasattr(self, "last_amp_term_names"):
        if term_names == self.last_amp_term_names and hasattr(self, "last_amp_dims"):
            return self.last_amp_dims
    
    # Compute dimension indices for each term
    amp_dims = []
    for term_name in term_names:
        amp_dims.append(self._compute_policy_term_idx(term_name))
    
    # Cache results for future calls
    self.last_amp_term_names = term_names
    self.last_amp_dims = amp_dims
    return amp_dims
🎯 AMP Integration Benefits:
- Selective Extraction: Efficiently extract specific observation components for AMP discriminator
- Performance Optimization: Cached dimension computation for repeated access
- Flexible Configuration: Support for arbitrary term combinations in AMP training
📁 Data Loading and Management
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager._load_pickles
Method Signature:
def _load_pickles(self, cfg: ReferenceObservationCfg) -> None:
🔧 Data Loading Process:
def _load_pickles(self, cfg: ReferenceObservationCfg):
    self.data_dir = cfg.data_dir
    files = []
    
    # Collect all pickle files from specified directories
    for data_dir in self.data_dir:
        if os.path.isdir(data_dir):
            # Directory: collect all .pkl files
            pattern = os.path.join(data_dir, "**/*.pkl")
            files.extend(glob(pattern, recursive=True))
        else:
            # Single file: add directly
            files.append(data_dir)
    
    assert len(files) > 0, "No pickle files found in the data directory"
    
    # Random assignment of files to environments for domain randomization
    self.file_indices = torch.randint(0, len(files), (self.num_envs,), device=self.device)
    self.files = [files[i] for i in self.file_indices]
    
    # Load all pickle files into memory
    self.tmp_storage = [torch.load(file) for file in files]
📊 Data Loading Features:
- Multi-Directory Support: Load from multiple data directories with recursive search
- Random Assignment: Environment-to-file mapping for domain randomization
- Memory Efficiency: Shared pickle data across environments using the same reference
- GBC Format Compatibility: Direct loading of GBC standard format pickle files
Module Name: GBC.gyms.isaaclab_45.managers.ref_observation_manager.RefObservationManager._resolve_reference_term_cfg
Method Signature:
def _resolve_reference_term_cfg(self, term_name: str, term_cfg: ReferenceObservationTermCfg) -> tuple:
🔧 Term Resolution Process:
def _resolve_reference_term_cfg(self, term_name: str, term_cfg: ReferenceObservationTermCfg):
    # Parse group and term names
    if '/' in term_name:
        group_name, term_name = term_name.split('/')
    else:
        raise ValueError(f"Invalid term name {term_name}. Expected format: group_name/term_name")
    
    # Register term in group
    self._group_ref_obs_term_names[group_name].append(term_name)
    self._group_ref_obs_term_cfgs[group_name].append(term_cfg)
    
    # Initialize BufferManager for group if needed
    if self._group_ref_obs_term_buffer_manager[group_name] is None:
        self._group_ref_obs_term_buffer_manager[group_name] = BufferManager(
            self.num_envs, len(self.tmp_storage), self.cfg.working_mode, self.device
        )
        self._group_ref_obs_term_buffer_manager[group_name].set_all_env_ref_id(self.file_indices)
    
    # Configure term delay
    self._group_ref_obs_init_delay[group_name].append(term_cfg.load_seq_delay)
    
    # Determine observation dimensions
    if term_cfg.is_base_pose:
        # Base pose: 7D (position + quaternion) or 0D if not in tensor
        ref_obs_dims = (0,) if not term_cfg.in_obs_tensor else (7,)
    else:
        # Standard observation: load from pickle data
        buffer_manager = self._group_ref_obs_term_buffer_manager[group_name]
        for ref_id, pkl in enumerate(self.tmp_storage):
            # Extract cyclic subsequence if specified
            cyclic_subseq = pkl.get("cyclic_subseq", None)
            
            # Add reference data to buffer
            buffer_manager.add_reference(
                key=term_name,
                ref_id=ref_id,
                buffer_raw=pkl[term_name].to(self.device),
                is_constant=term_cfg.is_constant,
                frame_rate=pkl["fps"],
                cyclic_subseq=cyclic_subseq
            )
        
        # Prepare buffers for efficient access
        buffer_manager.prepare_buffers(term_name)
        
        # Get dimensions from prepared buffer
        ref_obs_dims = (0,) if not term_cfg.in_obs_tensor or term_cfg.make_empty else buffer_manager.get_dim(term_name)
    
    # Handle history buffering
    if term_cfg.history_length > 0:
        old_dims = list(ref_obs_dims)
        old_dims.insert(1, term_cfg.history_length)  # Add history dimension
        ref_obs_dims = tuple(old_dims)
        
        if term_cfg.flatten_history_dim:
            # Flatten history dimension: (N, H, D) -> (N, H*D)
            flat_dim = 1
            for dim in ref_obs_dims[1:]:
                flat_dim *= dim
            ref_obs_dims = (ref_obs_dims[0], flat_dim)
    
    # Store dimensions
    self._group_ref_obs_term_dim[group_name].append(ref_obs_dims)
    return ref_obs_dims
📋 Configuration Framework
🏗️ ReferenceObservationTermCfg
Module Name: GBC.gyms.isaaclab_45.managers.ref_obs_term_cfg.ReferenceObservationTermCfg
Key Configuration Options:
Core Settings
- name (str | dict | None): Reference observation term identifier
- func (Callable | None): Processing function for term modification
- params (dict[str, Any] | None): Parameters for processing function
Symmetry and Mirroring
- symmetry (Callable | None): Function for symmetrical observation computation
- symmetry_params (dict[str, Any]): Parameters for symmetry function
Processing Pipeline
- modifiers (list[ModifierCfg] | None): Sequential modification functions
- noise (NoiseCfg | None): Noise model for domain randomization
- clip (tuple[float, float] | None): Value clipping bounds
- scale (float | None): Linear scaling factor
Behavior Control
- make_empty (bool): Generate empty observations (for debugging/testing)
- in_obs_tensor (bool): Include in network input tensor
- is_constant (bool): Time-invariant observation flag
- is_base_pose (bool): Use cumulative base pose computation
Temporal Configuration
- load_seq_delay (float): Sequence loading delay in seconds
- history_length (int): Number of historical observations to store
- flatten_history_dim (bool): Flatten history dimension for network input
💡 Usage Examples
🎯 Basic Reference Action Configuration
from GBC.gyms.isaaclab_45.managers.ref_obs_term_cfg import ReferenceObservationTermCfg
from isaaclab.assets import SceneEntityCfg
# Configure target actions for policy training
target_actions = ReferenceObservationTermCfg(
    func=reference_action_reshape,           # Processing function
    name="actions",                          # Data key in pickle files
    noise=None,                             # No noise for precise imitation
    symmetry=actions_symmetry,              # Symmetry function for data augmentation
    symmetry_params={"flipper": flipper},   # Parameters for symmetry function
    params={
        "urdf_path": DATA_PATHS.urdf_path,
        "asset_cfg": SceneEntityCfg("robot", joint_names=[".*"]),
        "add_default_joint_pos": False,
    }
)
🎯 Configuration Features:
- Function-Based Processing: Custom reshaping for robot-specific joint mappings
- Symmetry Support: Bilateral mirroring for left-right data augmentation
- URDF Integration: Robot-specific configuration through URDF parameters
- Joint Selection: Flexible joint subset selection through regex patterns
📊 Constant Reference Data (Statistics)
# Configure action standard deviations (constant across time)
target_actions_std = ReferenceObservationTermCfg(
    func=reference_action_std,              # Compute standard deviations
    name="actions",                         # Source data key
    noise=None,                            # No noise for statistics
    in_obs_tensor=False,                   # Not included in network input
    is_constant=True,                      # Time-invariant data
    symmetry=actions_symmetry,             # Symmetry for consistency
    symmetry_params={"flipper": flipper},
    params={
        "urdf_path": DATA_PATHS.urdf_path,
        "asset_cfg": SceneEntityCfg("robot", joint_names=[".*"]),
        "add_default_joint_pos": False,
    }
)
📈 Statistical Configuration Features:
- Constant Data: is_constant=Truefor time-invariant statistics
- Network Exclusion: in_obs_tensor=Falseprevents inclusion in policy input
- Statistical Functions: Compute standard deviations, means, ranges, etc.
- Consistency: Same symmetry functions ensure statistical consistency
🏗️ Complete Group Configuration
from GBC.gyms.isaaclab_45.managers.ref_obs_term_cfg import ReferenceObservationGroupCfg, ReferenceObservationCfg
# Configure observation group
@configclass
class PolicyReferenceObservationGroupCfg(ReferenceObservationGroupCfg):
    """Policy-specific reference observations"""
    
    # Core action targets
    target_actions = target_actions
    target_actions_std = target_actions_std
    
    # Base pose tracking
    target_base_pose = ReferenceObservationTermCfg(
        name="base_pose",
        is_base_pose=True,                  # Use cumulative pose computation
        history_length=5,                   # 5-step history
        flatten_history_dim=True,           # Flatten for network
    )
    
    # Joint positions with noise
    target_joint_pos = ReferenceObservationTermCfg(
        name="joint_positions",
        noise=GaussianNoiseCfg(mean=0.0, std=0.02),  # Add realistic noise
        clip=(-3.14, 3.14),                # Joint limits
        history_length=3,                   # Short history
    )
# Main configuration
@configclass
class RefObservationCfg(ReferenceObservationCfg):
    data_dir = ["/path/to/gbc/data/converted_actions"]
    working_mode = "recurrent"              # Cyclic playback
    static_delay = 0.1                      # 100ms static delay
    
    # Register observation groups
    policy = PolicyReferenceObservationGroupCfg()
    critic = PolicyReferenceObservationGroupCfg()  # Same for critic
🔄 Advanced Features Demo
# Environment setup with RefObservationManager
env_cfg.ref_observation = RefObservationCfg()
env = ManagerBasedRefRLEnv(env_cfg)
# Compute reference observations
current_time = torch.tensor([1.5, 2.0, 0.5, 3.2], device="cuda:0")  # [num_envs]
ref_obs = env.ref_observation_manager.compute(current_time)
# Access specific group observations
policy_obs, policy_mask = ref_obs["policy"]  # [num_envs, obs_dim], [num_envs]
print(f"Policy observations shape: {policy_obs.shape}")
print(f"Valid environments: {policy_mask.sum()}/{len(policy_mask)}")
# Compute symmetrical observations for data augmentation
if ref_obs["policy"] is not None:
    sym_policy_obs = env.ref_observation_manager.compute_policy_symmetry(ref_obs["policy"])
    print(f"Symmetrical observations computed: {sym_policy_obs[0].shape}")
# Extract AMP-specific observations
amp_term_names = ["target_actions", "target_joint_pos"]
amp_dims = env.ref_observation_manager.compute_amp_dims(amp_term_names)
print(f"AMP dimensions: {amp_dims}")
# Extract AMP observations from concatenated tensor
amp_observations = []
for start_idx, end_idx in amp_dims:
    amp_obs = policy_obs[:, start_idx:end_idx]
    amp_observations.append(amp_obs)
🚨 Best Practices and Guidelines
✅ Configuration Best Practices
- Group Organization: Organize observations by their intended use (policy, critic, discriminator)
- Network Inclusion: Use in_obs_tensor=Falsefor statistics and debug information
- History Management: Configure appropriate history lengths for temporal dependencies
- Noise Application: Add realistic noise only to observations that benefit from domain randomization
🔧 Performance Optimization
- BufferManager Integration: Leverage high-performance buffer access for real-time training
- Caching Mechanisms: Utilize built-in caching for repeated computations (AMP dims, symmetry)
- Memory Management: Use is_constant=Truefor time-invariant data to reduce memory usage
- Batch Processing: All operations support batch processing across environments
📊 Data Format Requirements
GBC Standard Pickle Format:
{
    "actions": torch.Tensor,        # [T, num_actions] - Robot joint actions
    "trans": torch.Tensor,          # [T, 3] - Root translations
    "root_orient": torch.Tensor,    # [T, 3] - Root orientations (angle-axis)
    "lin_vel": torch.Tensor,        # [T, 3] - Linear velocities
    "ang_vel": torch.Tensor,        # [T, 3] - Angular velocities
    "fps": float,                   # Frame rate
    "cyclic_subseq": tuple,         # Optional: (start_idx, end_idx) for cyclic sequences
    # Additional custom fields as needed
}
This comprehensive reference observation management system provides the foundation for sophisticated imitation learning, enabling efficient real-time access to reference motion data while supporting advanced features like symmetry computation, AMP integration, and flexible network input configuration.