Useful Snippets — Isaac Sim Documentation

Last updated: 12/12/2025

Useful Snippets# Various examples of Isaac Sim Replicator snippets that can be run as Standalone Applications or from the UI using the Script Editor. Annotator and Custom Writer Data from Multiple Cameras# Example on how to access data from multiple cameras in a scene using annotators or custom writers. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/multi_camera.py Script Editor Annotator and Custom Writer Data from Multiple Cameras 1import asyncio 2import os 3import omni.usd 4import omni.kit 5import omni.replicator.core as rep 6import carb.settings 7from omni.replicator.core import AnnotatorRegistry, Writer 8from PIL import Image 9from pxr import UsdGeom, Sdf 10 11NUM_FRAMES = 5 12 13# Save rgb image to file 14def save_rgb(rgb_data, file_name): 15 rgb_img = Image.fromarray(rgb_data).convert("RGBA") 16 rgb_img.save(file_name + ".png") 17 18 19# Randomize cube color every frame using a replicator randomizer 20def cube_color_randomizer(): 21 cube_prims = rep.get.prims(path_pattern="Cube") 22 with cube_prims: 23 rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1))) 24 return cube_prims.node 25 26 27# Access data through a custom replicator writer 28class MyWriter(Writer): 29 def __init__(self, rgb: bool = True): 30 self._frame_id = 0 31 self.annotators = [] 32 if rgb: 33 self.annotators.append(AnnotatorRegistry.get_annotator("rgb")) 34 # Create writer output directory 35 self.file_path = os.path.join(os.getcwd(), "_out_mc_writer", "") 36 print(f"Writing writer data to {self.file_path}") 37 dir = os.path.dirname(self.file_path) 38 os.makedirs(dir, exist_ok=True) 39 40 def write(self, data): 41 for annotator in data.keys(): 42 annotator_split = annotator.split("-") 43 if len(annotator_split) > 1: 44 render_product_name = annotator_split[-1] 45 if annotator.startswith("rgb"): 46 save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}") 47 self._frame_id += 1 48 49 50rep.WriterRegistry.register(MyWriter) 51 52# Create a new stage with a dome light 53omni.usd.get_context().new_stage() 54 55# Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 56carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 57 58stage = omni.usd.get_context().get_stage() 59dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight") 60dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(900.0) 61 62# Create cube 63cube_prim = stage.DefinePrim("/World/Cube", "Cube") 64UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0)) 65 66# Register cube color randomizer to trigger on every frame 67rep.randomizer.register(cube_color_randomizer) 68with rep.trigger.on_frame(): 69 rep.randomizer.cube_color_randomizer() 70 71# Create cameras 72camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera") 73UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0)) 74UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0)) 75 76camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera") 77UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0)) 78UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0)) 79 80# Create render products 81rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320)) 82rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640)) 83rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024)) 84 85# Access the data through a custom writer 86writer = rep.WriterRegistry.get("MyWriter") 87writer.initialize(rgb=True) 88writer.attach([rp1, rp2, rp3]) 89 90# Access the data through annotators 91rgb_annotators = [] 92for rp in [rp1, rp2, rp3]: 93 rgb = rep.AnnotatorRegistry.get_annotator("rgb") 94 rgb.attach(rp) 95 rgb_annotators.append(rgb) 96 97# Create annotator output directory 98file_path = os.path.join(os.getcwd(), "_out_mc_annot", "") 99print(f"Writing annotator data to {file_path}") 100dir = os.path.dirname(file_path) 101os.makedirs(dir, exist_ok=True) 102 103# Data will be captured manually using step 104rep.orchestrator.set_capture_on_play(False) 105 106async def run_example_async(): 107 for i in range(NUM_FRAMES): 108 # The step function provides new data to the annotators, triggers the randomizers and the writer 109 await rep.orchestrator.step_async(rt_subframes=4) 110 for j, rgb_annot in enumerate(rgb_annotators): 111 save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}") 112 113 114asyncio.ensure_future(run_example_async()) Standalone Application Annotator and Custom Writer Data from Multiple Cameras 1from isaacsim import SimulationApp 2 3simulation_app = SimulationApp(launch_config={"headless": False}) 4 5import os 6import omni.usd 7import omni.kit 8import omni.replicator.core as rep 9import carb.settings 10from omni.replicator.core import AnnotatorRegistry, Writer 11from PIL import Image 12from pxr import UsdGeom, Sdf 13 14NUM_FRAMES = 5 15 16# Save rgb image to file 17def save_rgb(rgb_data, file_name): 18 rgb_img = Image.fromarray(rgb_data).convert("RGBA") 19 rgb_img.save(file_name + ".png") 20 21 22# Randomize cube color every frame using a replicator randomizer 23def cube_color_randomizer(): 24 cube_prims = rep.get.prims(path_pattern="Cube") 25 with cube_prims: 26 rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1))) 27 return cube_prims.node 28 29 30# Access data through a custom replicator writer 31class MyWriter(Writer): 32 def __init__(self, rgb: bool = True): 33 self._frame_id = 0 34 self.annotators = [] 35 if rgb: 36 self.annotators.append(AnnotatorRegistry.get_annotator("rgb")) 37 # Create writer output directory 38 self.file_path = os.path.join(os.getcwd(), "_out_mc_writer", "") 39 print(f"Writing writer data to {self.file_path}") 40 dir = os.path.dirname(self.file_path) 41 os.makedirs(dir, exist_ok=True) 42 43 def write(self, data): 44 for annotator in data.keys(): 45 annotator_split = annotator.split("-") 46 if len(annotator_split) > 1: 47 render_product_name = annotator_split[-1] 48 if annotator.startswith("rgb"): 49 save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}") 50 self._frame_id += 1 51 52 53rep.WriterRegistry.register(MyWriter) 54 55# Create a new stage with a dome light 56omni.usd.get_context().new_stage() 57 58# Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 59carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 60 61stage = omni.usd.get_context().get_stage() 62dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight") 63dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(900.0) 64 65# Create cube 66cube_prim = stage.DefinePrim("/World/Cube", "Cube") 67UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0)) 68 69# Register cube color randomizer to trigger on every frame 70rep.randomizer.register(cube_color_randomizer) 71with rep.trigger.on_frame(): 72 rep.randomizer.cube_color_randomizer() 73 74# Create cameras 75camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera") 76UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0)) 77UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0)) 78 79camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera") 80UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0)) 81UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0)) 82 83# Create render products 84rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320)) 85rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640)) 86rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024)) 87 88# Access the data through a custom writer 89writer = rep.WriterRegistry.get("MyWriter") 90writer.initialize(rgb=True) 91writer.attach([rp1, rp2, rp3]) 92 93# Access the data through annotators 94rgb_annotators = [] 95for rp in [rp1, rp2, rp3]: 96 rgb = rep.AnnotatorRegistry.get_annotator("rgb") 97 rgb.attach(rp) 98 rgb_annotators.append(rgb) 99 100# Create annotator output directory 101file_path = os.path.join(os.getcwd(), "_out_mc_annot", "") 102print(f"Writing annotator data to {file_path}") 103dir = os.path.dirname(file_path) 104os.makedirs(dir, exist_ok=True) 105 106# Data will be captured manually using step 107rep.orchestrator.set_capture_on_play(False) 108 109for i in range(NUM_FRAMES): 110 # The step function provides new data to the annotators, triggers the randomizers and the writer 111 rep.orchestrator.step(rt_subframes=4) 112 for j, rgb_annot in enumerate(rgb_annotators): 113 save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}") 114 115simulation_app.close() Synthetic Data Access at Specific Simulation Timepoints# Example on how to access synthetic data (RGB, semantic segmentation) from multiple cameras in a simulation scene at specific events using annotators or writers. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/simulation_get_data.py Script Editor Synthetic Data Access at Specific Simulation Timepoints 1import asyncio 2import json 3import os 4 5import carb.settings 6import numpy as np 7import omni 8import omni.replicator.core as rep 9from isaacsim.core.api import World 10from isaacsim.core.api.objects import DynamicCuboid 11from isaacsim.core.utils.semantics import add_labels 12from PIL import Image 13 14 15# Util function to save rgb annotator data 16def write_rgb_data(rgb_data, file_path): 17 rgb_img = Image.fromarray(rgb_data).convert("RGBA") 18 rgb_img.save(file_path + ".png") 19 20 21# Util function to save semantic segmentation annotator data 22def write_sem_data(sem_data, file_path): 23 id_to_labels = sem_data["info"]["idToLabels"] 24 with open(file_path + ".json", "w") as f: 25 json.dump(id_to_labels, f) 26 sem_image_data = sem_data["data"] 27 sem_img = Image.fromarray(sem_image_data).convert("RGBA") 28 sem_img.save(file_path + ".png") 29 30 31# Create a new stage with the default ground plane 32omni.usd.get_context().new_stage() 33 34# Setup the simulation world 35world = World() 36world.scene.add_default_ground_plane() 37 38# Setting capture on play to False will prevent the replicator from capturing data each frame 39carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False) 40 41# Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 42carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 43 44# Create a camera and render product to collect the data from 45cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0)) 46rp = rep.create.render_product(cam, (512, 512)) 47 48# Set the output directory for the data 49out_dir = os.path.join(os.getcwd(), "_out_sim_event") 50os.makedirs(out_dir, exist_ok=True) 51print(f"Outputting data to {out_dir}..") 52 53# Example of using a writer to save the data 54writer = rep.WriterRegistry.get("BasicWriter") 55writer.initialize( 56 output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True 57) 58writer.attach(rp) 59 60# Run a preview to ensure the replicator graph is initialized 61rep.orchestrator.preview() 62 63# Example of accessing the data directly from annotators 64rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb") 65rgb_annot.attach(rp) 66sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True}) 67sem_annot.attach(rp) 68 69 70async def run_example_async(): 71 await world.initialize_simulation_context_async() 72 await world.reset_async() 73 74 # Spawn and drop a few cubes, capture data when they stop moving 75 for i in range(5): 76 cuboid = world.scene.add( 77 DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i)) 78 ) 79 add_labels(cuboid.prim, labels=["Cuboid"], instance_name="class") 80 81 for s in range(500): 82 await omni.kit.app.get_app().next_update_async() 83 vel = np.linalg.norm(cuboid.get_linear_velocity()) 84 if vel < 0.1: 85 print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..") 86 # Trigger the writer and update the annotators with new data 87 await rep.orchestrator.step_async(rt_subframes=4, delta_time=0.0, pause_timeline=False) 88 write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb") 89 write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem") 90 break 91 92 93asyncio.ensure_future(run_example_async()) Standalone Application Synthetic Data Access at Specific Simulation Timepoints 1from isaacsim import SimulationApp 2 3simulation_app = SimulationApp(launch_config={"renderer": "RayTracedLighting", "headless": False}) 4 5import json 6import os 7 8import carb.settings 9import numpy as np 10import omni 11import omni.replicator.core as rep 12from isaacsim.core.api import World 13from isaacsim.core.api.objects import DynamicCuboid 14from isaacsim.core.utils.semantics import add_labels 15from PIL import Image 16 17 18# Util function to save rgb annotator data 19def write_rgb_data(rgb_data, file_path): 20 rgb_img = Image.fromarray(rgb_data).convert("RGBA") 21 rgb_img.save(file_path + ".png") 22 23 24# Util function to save semantic segmentation annotator data 25def write_sem_data(sem_data, file_path): 26 id_to_labels = sem_data["info"]["idToLabels"] 27 with open(file_path + ".json", "w") as f: 28 json.dump(id_to_labels, f) 29 sem_image_data = sem_data["data"] 30 sem_img = Image.fromarray(sem_image_data).convert("RGBA") 31 sem_img.save(file_path + ".png") 32 33 34# Create a new stage with the default ground plane 35omni.usd.get_context().new_stage() 36 37# Setup the simulation world 38world = World() 39world.scene.add_default_ground_plane() 40world.reset() 41 42# Setting capture on play to False will prevent the replicator from capturing data each frame 43carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False) 44 45# Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 46carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 47 48# Create a camera and render product to collect the data from 49cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0)) 50rp = rep.create.render_product(cam, (512, 512)) 51 52# Set the output directory for the data 53out_dir = os.path.join(os.getcwd(), "_out_sim_event") 54os.makedirs(out_dir, exist_ok=True) 55print(f"Outputting data to {out_dir}..") 56 57# Example of using a writer to save the data 58writer = rep.WriterRegistry.get("BasicWriter") 59writer.initialize( 60 output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True 61) 62writer.attach(rp) 63 64# Run a preview to ensure the replicator graph is initialized 65rep.orchestrator.preview() 66 67# Example of accessing the data directly from annotators 68rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb") 69rgb_annot.attach(rp) 70sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True}) 71sem_annot.attach(rp) 72 73# Spawn and drop a few cubes, capture data when they stop moving 74for i in range(5): 75 cuboid = world.scene.add(DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i))) 76 add_labels(cuboid.prim, labels=["Cuboid"], instance_name="class") 77 78 for s in range(500): 79 world.step(render=False) 80 vel = np.linalg.norm(cuboid.get_linear_velocity()) 81 if vel < 0.1: 82 print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..") 83 # Trigger the writer and update the annotators with new data 84 rep.orchestrator.step(rt_subframes=4, delta_time=0.0, pause_timeline=False) 85 write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb") 86 write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem") 87 break 88 89simulation_app.close() Custom Event Randomization and Writing# The following example showcases the use of custom events to trigger randomizations and data writing at various times throughout the simulation. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_event_and_write.py Script Editor Custom Event Randomization and Writing 1import asyncio 2import os 3 4import carb.settings 5import omni.replicator.core as rep 6import omni.usd 7 8omni.usd.get_context().new_stage() 9# Set DLSS to Quality mode (2) for best SDG results, options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 10carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 11distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant") 12 13large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0)) 14small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0)) 15large_cube_prim = large_cube.get_output_prims()["prims"][0] 16small_cube_prim = small_cube.get_output_prims()["prims"][0] 17 18rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512)) 19writer = rep.WriterRegistry.get("BasicWriter") 20out_dir = os.path.join(os.getcwd(), "_out_custom_event") 21print(f"Writing data to {out_dir}") 22writer.initialize(output_dir=out_dir, rgb=True) 23writer.attach(rp) 24 25with rep.trigger.on_custom_event(event_name="randomize_large_cube"): 26 with large_cube: 27 rep.randomizer.rotation() 28 29with rep.trigger.on_custom_event(event_name="randomize_small_cube"): 30 with small_cube: 31 rep.randomizer.rotation() 32 33 34async def run_example_async(): 35 print(f"Randomizing small cube") 36 rep.utils.send_og_event(event_name="randomize_small_cube") 37 print("Capturing frame") 38 await rep.orchestrator.step_async(rt_subframes=8) 39 40 print("Moving small cube") 41 small_cube_prim.GetAttribute("xformOp:translate").Set((-2, -2, 0)) 42 print("Capturing frame") 43 await rep.orchestrator.step_async(rt_subframes=8) 44 45 print(f"Randomizing large cube") 46 rep.utils.send_og_event(event_name="randomize_large_cube") 47 print("Capturing frame") 48 await rep.orchestrator.step_async(rt_subframes=8) 49 50 print("Moving large cube") 51 large_cube_prim.GetAttribute("xformOp:translate").Set((2, 2, 0)) 52 print("Capturing frame") 53 await rep.orchestrator.step_async(rt_subframes=8) 54 55 # Wait until all the data is saved to disk 56 await rep.orchestrator.wait_until_complete_async() 57 58 59asyncio.ensure_future(run_example_async()) Standalone Application Custom Event Randomization and Writing 1from isaacsim import SimulationApp 2 3simulation_app = SimulationApp(launch_config={"headless": False}) 4 5import os 6 7import carb.settings 8import omni.replicator.core as rep 9import omni.usd 10 11omni.usd.get_context().new_stage() 12# Set DLSS to Quality mode (2) for best SDG results, options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 13carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 14distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant") 15 16large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0)) 17small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0)) 18large_cube_prim = large_cube.get_output_prims()["prims"][0] 19small_cube_prim = small_cube.get_output_prims()["prims"][0] 20 21rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512)) 22writer = rep.WriterRegistry.get("BasicWriter") 23out_dir = os.path.join(os.getcwd(), "_out_custom_event") 24print(f"Writing data to {out_dir}") 25writer.initialize(output_dir=out_dir, rgb=True) 26writer.attach(rp) 27 28with rep.trigger.on_custom_event(event_name="randomize_large_cube"): 29 with large_cube: 30 rep.randomizer.rotation() 31 32with rep.trigger.on_custom_event(event_name="randomize_small_cube"): 33 with small_cube: 34 rep.randomizer.rotation() 35 36 37def run_example(): 38 print(f"Randomizing small cube") 39 rep.utils.send_og_event(event_name="randomize_small_cube") 40 print("Capturing frame") 41 rep.orchestrator.step(rt_subframes=8) 42 43 print("Moving small cube") 44 small_cube_prim.GetAttribute("xformOp:translate").Set((-2, -2, 0)) 45 print("Capturing frame") 46 rep.orchestrator.step(rt_subframes=8) 47 48 print(f"Randomizing large cube") 49 rep.utils.send_og_event(event_name="randomize_large_cube") 50 print("Capturing frame") 51 rep.orchestrator.step(rt_subframes=8) 52 53 print("Moving large cube") 54 large_cube_prim.GetAttribute("xformOp:translate").Set((2, 2, 0)) 55 print("Capturing frame") 56 rep.orchestrator.step(rt_subframes=8) 57 58 # Wait until all the data is saved to disk 59 rep.orchestrator.wait_until_complete() 60 61 62run_example() 63 64simulation_app.close() Motion Blur# This example demonstrates how to capture motion blur data using RTX Real-Time and RTX Interactive (Path Tracing) rendering modes. For the RTX - Real-Time mode, refer to motion blur parameters. For the RTX – Interactive (Path Tracing) mode, motion blur is achieved by rendering multiple subframes (/omni/replicator/pathTracedMotionBlurSubSamples) and combining them to create the effect. The example uses animated and physics-enabled assets with synchronized motion. Keyframe animated assets can be advanced at any custom delta time due to their interpolated motion, whereas physics-enabled assets require a custom physics FPS to ensure motion samples at any custom delta time. The example showcases how to compute the target physics FPS, change it if needed, and restore the original physics FPS after capturing the motion blur. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/motion_blur.py Script Editor Motion Blur 1import asyncio 2import os 3 4import carb.settings 5import omni.kit.app 6import omni.replicator.core as rep 7import omni.timeline 8import omni.usd 9from isaacsim.storage.native import get_assets_root_path 10from pxr import PhysxSchema, Sdf, UsdGeom, UsdPhysics 11 12# Paths to the animated and physics-ready assets 13PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd" 14ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd" 15 16# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s) 17ASSET_VELOCITIES = [0, 5, 10] 18ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)] 19 20# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets 21ANIMATION_DURATION = 10 22 23# Create a new stage with animated and physics-enabled assets with synchronized motion 24def setup_stage(): 25 # Create new stage 26 omni.usd.get_context().new_stage() 27 stage = omni.usd.get_context().get_stage() 28 timeline = omni.timeline.get_timeline_interface() 29 timeline.set_end_time(ANIMATION_DURATION) 30 31 # Create lights 32 dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight") 33 dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(100.0) 34 distant_light = stage.DefinePrim("/World/DistantLight", "DistantLight") 35 if not distant_light.GetAttribute("xformOp:rotateXYZ"): 36 UsdGeom.Xformable(distant_light).AddRotateXYZOp() 37 distant_light.GetAttribute("xformOp:rotateXYZ").Set((-75, 0, 0)) 38 distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(2500) 39 40 # Setup the physics assets with gravity disabled and the requested velocity 41 assets_root_path = get_assets_root_path() 42 physics_asset_url = assets_root_path + PHYSICS_ASSET_URL 43 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES): 44 prim = stage.DefinePrim(f"/World/physics_asset_{int(abs(vel))}", "Xform") 45 prim.GetReferences().AddReference(physics_asset_url) 46 if not prim.GetAttribute("xformOp:translate"): 47 UsdGeom.Xformable(prim).AddTranslateOp() 48 prim.GetAttribute("xformOp:translate").Set(loc) 49 prim.GetAttribute("physxRigidBody:disableGravity").Set(True) 50 prim.GetAttribute("physxRigidBody:angularDamping").Set(0.0) 51 prim.GetAttribute("physxRigidBody:linearDamping").Set(0.0) 52 prim.GetAttribute("physics:velocity").Set((0, 0, -vel)) 53 54 # Setup animated assets maintaining the same velocity as the physics assets 55 anim_asset_url = assets_root_path + ANIM_ASSET_URL 56 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES): 57 start_loc = (-loc[0], loc[1], loc[2]) 58 prim = stage.DefinePrim(f"/World/anim_asset_{int(abs(vel))}", "Xform") 59 prim.GetReferences().AddReference(anim_asset_url) 60 if not prim.GetAttribute("xformOp:translate"): 61 UsdGeom.Xformable(prim).AddTranslateOp() 62 anim_distance = vel * ANIMATION_DURATION 63 end_loc = (start_loc[0], start_loc[1], start_loc[2] - anim_distance) 64 end_keyframe = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION 65 # Timesampled keyframe (animated) translation 66 prim.GetAttribute("xformOp:translate").Set(start_loc, time=0) 67 prim.GetAttribute("xformOp:translate").Set(end_loc, time=end_keyframe) 68 69 70# Capture motion blur frames with the given delta time step and render mode 71async def run_motion_blur_example_async( 72 num_frames=3, custom_delta_time=None, use_path_tracing=True, pt_subsamples=8, pt_spp=64 73): 74 # Create a new stage with the assets 75 setup_stage() 76 stage = omni.usd.get_context().get_stage() 77 78 # Set replicator settings (capture only on request and enable motion blur) 79 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False) 80 carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True) 81 82 # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 83 carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 84 85 # Set motion blur settings based on the render mode 86 if use_path_tracing: 87 print(f"[MotionBlur] Setting PathTracing render mode motion blur settings") 88 carb.settings.get_settings().set("/rtx/rendermode", "PathTracing") 89 # (int): Total number of samples for each rendered pixel, per frame. 90 carb.settings.get_settings().set("/rtx/pathtracing/spp", pt_spp) 91 # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit. 92 carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", pt_spp) 93 carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0) 94 # Number of sub samples to render if in PathTracing render mode and motion blur is enabled. 95 carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", pt_subsamples) 96 else: 97 print(f"[MotionBlur] Setting RayTracedLighting render mode motion blur settings") 98 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting") 99 # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA 100 carb.settings.get_settings().set("/rtx/post/aa/op", 2) 101 # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter. 102 carb.settings.get_settings().set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02) 103 # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample. 104 carb.settings.get_settings().set("/rtx/post/motionblur/exposureFraction", 1.0) 105 # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance. 106 carb.settings.get_settings().set("/rtx/post/motionblur/numSamples", 8) 107 108 # Setup camera and writer 109 camera = rep.create.camera(position=(0, 1.5, 0), look_at=(0, 0, 0), name="MotionBlurCam") 110 render_product = rep.create.render_product(camera, (1280, 720)) 111 basic_writer = rep.WriterRegistry.get("BasicWriter") 112 delta_time_str = "None" if custom_delta_time is None else f"{custom_delta_time:.4f}" 113 render_mode_str = f"pt_subsamples_{pt_subsamples}_spp_{pt_spp}" if use_path_tracing else "rt" 114 output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_dt_{delta_time_str}_{render_mode_str}") 115 print(f"[MotionBlur] Output directory: {output_directory}") 116 basic_writer.initialize(output_dir=output_directory, rgb=True) 117 basic_writer.attach(render_product) 118 119 # Run a few updates to make sure all materials are fully loaded for capture 120 for _ in range(50): 121 await omni.kit.app.get_app().next_update_async() 122 123 # Use the physics scene to modify the physics FPS (if needed) to guarantee motion samples at any custom delta time 124 physx_scene = None 125 for prim in stage.Traverse(): 126 if prim.IsA(UsdPhysics.Scene): 127 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim) 128 break 129 if physx_scene is None: 130 print(f"[MotionBlur] Creating a new PhysicsScene") 131 physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene") 132 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene")) 133 134 # Check the target physics depending on the custom delta time and the render mode 135 target_physics_fps = stage.GetTimeCodesPerSecond() if custom_delta_time is None else 1 / custom_delta_time 136 if use_path_tracing: 137 target_physics_fps *= pt_subsamples 138 139 # Check if the physics FPS needs to be increased to match the custom delta time 140 orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get() 141 if target_physics_fps > orig_physics_fps: 142 print(f"[MotionBlur] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}") 143 physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps) 144 145 # Start the timeline for physics updates in the step function 146 timeline = omni.timeline.get_timeline_interface() 147 timeline.play() 148 149 # Capture frames 150 for i in range(num_frames): 151 print(f"[MotionBlur] \tCapturing frame {i}") 152 await rep.orchestrator.step_async(delta_time=custom_delta_time) 153 154 # Restore the original physics FPS 155 if target_physics_fps > orig_physics_fps: 156 print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}") 157 physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps) 158 159 # Switch back to the raytracing render mode 160 if use_path_tracing: 161 print(f"[MotionBlur] Restoring render mode to RayTracedLighting") 162 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting") 163 164 # Wait until all the data is saved to disk 165 await rep.orchestrator.wait_until_complete_async() 166 167 168async def run_motion_blur_examples_async(): 169 motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240] 170 for custom_delta_time in motion_blur_step_duration: 171 # RayTracing examples 172 await run_motion_blur_example_async(custom_delta_time=custom_delta_time, use_path_tracing=False) 173 # PathTracing examples 174 spps = [32, 128] 175 motion_blur_sub_samples = [4, 16] 176 for motion_blur_sub_sample in motion_blur_sub_samples: 177 for spp in spps: 178 await run_motion_blur_example_async( 179 custom_delta_time=custom_delta_time, 180 use_path_tracing=True, 181 pt_subsamples=motion_blur_sub_sample, 182 pt_spp=spp, 183 ) 184 185 186asyncio.ensure_future(run_motion_blur_examples_async()) Standalone Application Motion Blur 1from isaacsim import SimulationApp 2 3simulation_app = SimulationApp({"headless": False}) 4 5import os 6 7import carb.settings 8import omni.kit.app 9import omni.replicator.core as rep 10import omni.timeline 11import omni.usd 12from isaacsim.storage.native import get_assets_root_path 13from pxr import PhysxSchema, Sdf, UsdGeom, UsdPhysics 14 15# Paths to the animated and physics-ready assets 16PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd" 17ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd" 18 19# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s) 20ASSET_VELOCITIES = [0, 5, 10] 21ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)] 22 23# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets 24ANIMATION_DURATION = 10 25 26# Create a new stage with animated and physics-enabled assets with synchronized motion 27def setup_stage(): 28 # Create new stage 29 omni.usd.get_context().new_stage() 30 stage = omni.usd.get_context().get_stage() 31 timeline = omni.timeline.get_timeline_interface() 32 timeline.set_end_time(ANIMATION_DURATION) 33 34 # Create lights 35 dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight") 36 dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(100.0) 37 distant_light = stage.DefinePrim("/World/DistantLight", "DistantLight") 38 if not distant_light.GetAttribute("xformOp:rotateXYZ"): 39 UsdGeom.Xformable(distant_light).AddRotateXYZOp() 40 distant_light.GetAttribute("xformOp:rotateXYZ").Set((-75, 0, 0)) 41 distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(2500) 42 43 # Setup the physics assets with gravity disabled and the requested velocity 44 assets_root_path = get_assets_root_path() 45 physics_asset_url = assets_root_path + PHYSICS_ASSET_URL 46 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES): 47 prim = stage.DefinePrim(f"/World/physics_asset_{int(abs(vel))}", "Xform") 48 prim.GetReferences().AddReference(physics_asset_url) 49 if not prim.GetAttribute("xformOp:translate"): 50 UsdGeom.Xformable(prim).AddTranslateOp() 51 prim.GetAttribute("xformOp:translate").Set(loc) 52 prim.GetAttribute("physxRigidBody:disableGravity").Set(True) 53 prim.GetAttribute("physxRigidBody:angularDamping").Set(0.0) 54 prim.GetAttribute("physxRigidBody:linearDamping").Set(0.0) 55 prim.GetAttribute("physics:velocity").Set((0, 0, -vel)) 56 57 # Setup animated assets maintaining the same velocity as the physics assets 58 anim_asset_url = assets_root_path + ANIM_ASSET_URL 59 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES): 60 start_loc = (-loc[0], loc[1], loc[2]) 61 prim = stage.DefinePrim(f"/World/anim_asset_{int(abs(vel))}", "Xform") 62 prim.GetReferences().AddReference(anim_asset_url) 63 if not prim.GetAttribute("xformOp:translate"): 64 UsdGeom.Xformable(prim).AddTranslateOp() 65 anim_distance = vel * ANIMATION_DURATION 66 end_loc = (start_loc[0], start_loc[1], start_loc[2] - anim_distance) 67 end_keyframe = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION 68 # Timesampled keyframe (animated) translation 69 prim.GetAttribute("xformOp:translate").Set(start_loc, time=0) 70 prim.GetAttribute("xformOp:translate").Set(end_loc, time=end_keyframe) 71 72 73# Capture motion blur frames with the given delta time step and render mode 74def run_motion_blur_example(num_frames=3, custom_delta_time=None, use_path_tracing=True, pt_subsamples=8, pt_spp=64): 75 # Create a new stage with the assets 76 setup_stage() 77 stage = omni.usd.get_context().get_stage() 78 79 # Set replicator settings (capture only on request and enable motion blur) 80 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False) 81 carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True) 82 83 # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 84 carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 85 86 # Set motion blur settings based on the render mode 87 if use_path_tracing: 88 print(f"[MotionBlur] Setting PathTracing render mode motion blur settings") 89 carb.settings.get_settings().set("/rtx/rendermode", "PathTracing") 90 # (int): Total number of samples for each rendered pixel, per frame. 91 carb.settings.get_settings().set("/rtx/pathtracing/spp", pt_spp) 92 # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit. 93 carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", pt_spp) 94 carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0) 95 # Number of sub samples to render if in PathTracing render mode and motion blur is enabled. 96 carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", pt_subsamples) 97 else: 98 print(f"[MotionBlur] Setting RayTracedLighting render mode motion blur settings") 99 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting") 100 # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA 101 carb.settings.get_settings().set("/rtx/post/aa/op", 2) 102 # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter. 103 carb.settings.get_settings().set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02) 104 # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample. 105 carb.settings.get_settings().set("/rtx/post/motionblur/exposureFraction", 1.0) 106 # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance. 107 carb.settings.get_settings().set("/rtx/post/motionblur/numSamples", 8) 108 109 # Setup camera and writer 110 camera = rep.create.camera(position=(0, 1.5, 0), look_at=(0, 0, 0), name="MotionBlurCam") 111 render_product = rep.create.render_product(camera, (1280, 720)) 112 basic_writer = rep.WriterRegistry.get("BasicWriter") 113 delta_time_str = "None" if custom_delta_time is None else f"{custom_delta_time:.4f}" 114 render_mode_str = f"pt_subsamples_{pt_subsamples}_spp_{pt_spp}" if use_path_tracing else "rt" 115 output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_dt_{delta_time_str}_{render_mode_str}") 116 print(f"[MotionBlur] Output directory: {output_directory}") 117 basic_writer.initialize(output_dir=output_directory, rgb=True) 118 basic_writer.attach(render_product) 119 120 # Run a few updates to make sure all materials are fully loaded for capture 121 for _ in range(50): 122 simulation_app.update() 123 124 # Use the physics scene to modify the physics FPS (if needed) to guarantee motion samples at any custom delta time 125 physx_scene = None 126 for prim in stage.Traverse(): 127 if prim.IsA(UsdPhysics.Scene): 128 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim) 129 break 130 if physx_scene is None: 131 print(f"[MotionBlur] Creating a new PhysicsScene") 132 physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene") 133 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene")) 134 135 # Check the target physics depending on the custom delta time and the render mode 136 target_physics_fps = stage.GetTimeCodesPerSecond() if custom_delta_time is None else 1 / custom_delta_time 137 if use_path_tracing: 138 target_physics_fps *= pt_subsamples 139 140 # Check if the physics FPS needs to be increased to match the custom delta time 141 orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get() 142 if target_physics_fps > orig_physics_fps: 143 print(f"[MotionBlur] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}") 144 physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps) 145 146 # Start the timeline for physics updates in the step function 147 timeline = omni.timeline.get_timeline_interface() 148 timeline.play() 149 150 # Capture frames 151 for i in range(num_frames): 152 print(f"[MotionBlur] \tCapturing frame {i}") 153 rep.orchestrator.step(delta_time=custom_delta_time) 154 155 # Restore the original physics FPS 156 if target_physics_fps > orig_physics_fps: 157 print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}") 158 physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps) 159 160 # Switch back to the raytracing render mode 161 if use_path_tracing: 162 print(f"[MotionBlur] Restoring render mode to RayTracedLighting") 163 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting") 164 165 # Wait until all the data is saved to disk 166 rep.orchestrator.wait_until_complete() 167 168 169def run_motion_blur_examples(): 170 motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240] 171 for custom_delta_time in motion_blur_step_duration: 172 # RayTracing examples 173 run_motion_blur_example(custom_delta_time=custom_delta_time, use_path_tracing=False) 174 # PathTracing examples 175 spps = [32, 128] 176 motion_blur_sub_samples = [4, 16] 177 for motion_blur_sub_sample in motion_blur_sub_samples: 178 for spp in spps: 179 run_motion_blur_example( 180 custom_delta_time=custom_delta_time, 181 use_path_tracing=True, 182 pt_subsamples=motion_blur_sub_sample, 183 pt_spp=spp, 184 ) 185 186 187run_motion_blur_examples() 188 189simulation_app.close() Subscribers and Events at Custom FPS# Examples of subscribing to various events (such as stage, physics, and render/app), setting custom update rates, and adjusting various related settings. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/subscribers_and_events.py Script Editor Subscribers and Events at Custom FPS 1import asyncio 2import time 3 4import carb.eventdispatcher 5import carb.settings 6import omni.kit.app 7import omni.physx 8import omni.timeline 9import omni.usd 10from pxr import PhysxSchema, UsdPhysics 11 12# TIMELINE / STAGE 13USE_CUSTOM_TIMELINE_SETTINGS = True 14USE_FIXED_TIME_STEPPING = True 15PLAY_EVERY_FRAME = True 16PLAY_DELAY_COMPENSATION = 0.0 17SUBSAMPLE_RATE = 1 18STAGE_FPS = 30.0 19 20# PHYSX 21USE_CUSTOM_PHYSX_FPS = False 22PHYSX_FPS = 60.0 23MIN_SIM_FPS = 30 24 25# Simulations can also be enabled/disabled at runtime 26DISABLE_SIMULATIONS = False 27 28# APP / RENDER 29LIMIT_APP_FPS = False 30APP_FPS = 120 31 32# Number of app updates to run while collecting events 33NUM_APP_UPDATES = 100 34 35# Print the captured events 36VERBOSE = False 37 38async def run_subscribers_and_events_async(): 39 def on_timeline_event(event: omni.timeline.TimelineEventType): 40 nonlocal timeline_events 41 if event.type == omni.timeline.TimelineEventType.CURRENT_TIME_TICKED.value: 42 timeline_events.append(event.payload) 43 if VERBOSE: 44 print(f" [timeline][{len(timeline_events)}] {event.payload}") 45 46 def on_physics_step(dt: float): 47 nonlocal physx_events 48 physx_events.append(dt) 49 if VERBOSE: 50 print(f" [physics][{len(physx_events)}] dt={dt}") 51 52 def on_stage_render_event(event: carb.eventdispatcher.Event): 53 nonlocal stage_render_events 54 stage_render_events.append(event.event_name) 55 if VERBOSE: 56 print(f" [stage render][{len(stage_render_events)}] {event.event_name}") 57 58 def on_app_update(event: carb.eventdispatcher.Event): 59 nonlocal app_update_events 60 app_update_events.append(event.event_name) 61 if VERBOSE: 62 print(f" [app update][{len(app_update_events)}] {event.event_name}") 63 64 stage = omni.usd.get_context().get_stage() 65 timeline = omni.timeline.get_timeline_interface() 66 67 if USE_CUSTOM_TIMELINE_SETTINGS: 68 # Ideal to make simulation and animation synchronized. 69 # Default: True in editor, False in standalone. 70 # NOTE: 71 # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time. 72 # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs'). 73 # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate` 74 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING) 75 76 # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback. 77 # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect. 78 # Default: 0.0 79 # NOTE: 80 # - only effective if `useFixedTimeStepping` is set to True 81 # - setting a large value results in long fast playback after a huge lag spike 82 carb.settings.get_settings().set("/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION) 83 84 # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`. 85 # Default: False 86 # NOTE: 87 # - only effective if `useFixedTimeStepping` is set to True 88 # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop 89 # - useful for recording 90 # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)` 91 timeline.set_play_every_frame(PLAY_EVERY_FRAME) 92 93 # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame. 94 # Default: 1 95 # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)` 96 timeline.set_ticks_per_frame(SUBSAMPLE_RATE) 97 98 # Time codes per second for the stage 99 # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)` 100 timeline.set_time_codes_per_second(STAGE_FPS) 101 102 # Create a PhysX scene to set the physics time step 103 if USE_CUSTOM_PHYSX_FPS: 104 physx_scene = None 105 for prim in stage.Traverse(): 106 if prim.IsA(UsdPhysics.Scene): 107 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim) 108 break 109 if physx_scene is None: 110 UsdPhysics.Scene.Define(stage, "/PhysicsScene") 111 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene")) 112 113 # Time step for the physics simulation 114 # Default: 60.0 115 physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS) 116 117 # Minimum simulation frequency to prevent clamping; if the frame rate drops below this, 118 # physics steps are discarded to avoid app slowdown if the overall frame rate is too low. 119 # Default: 30.0 120 # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update. 121 carb.settings.get_settings().set("/persistent/simulation/minFrameRate", MIN_SIM_FPS) 122 123 # Throttle Render/UI/Main thread update rate 124 if LIMIT_APP_FPS: 125 # Enable rate limiting of the main run loop (UI, rendering, etc.) 126 # Default: False 127 carb.settings.get_settings().set("/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS) 128 129 # FPS limit of the main run loop (UI, rendering, etc.) 130 # Default: 120 131 # NOTE: disabled if `/app/player/useFixedTimeStepping` is False 132 carb.settings.get_settings().set("/app/runLoops/main/rateLimitFrequency", int(APP_FPS)) 133 134 # Simulations can be selectively disabled (or toggled at specific times) 135 if DISABLE_SIMULATIONS: 136 carb.settings.get_settings().set("/app/player/playSimulations", False) 137 138 print("Configuration:") 139 print(f" Timeline:") 140 print(f" - Stage FPS: {STAGE_FPS} (/app/stage/timeCodesPerSecond)") 141 print(f" - Fixed time stepping: {USE_FIXED_TIME_STEPPING} (/app/player/useFixedTimeStepping)") 142 print(f" - Play every frame: {PLAY_EVERY_FRAME} (/app/player/useFastMode)") 143 print(f" - Subsample rate: {SUBSAMPLE_RATE} (/app/player/timelineSubsampleRate)") 144 print(f" - Play delay compensation: {PLAY_DELAY_COMPENSATION}s (/app/player/CompensatePlayDelayInSecs)") 145 print(f" Physics:") 146 print(f" - PhysX FPS: {PHYSX_FPS} (physxScene.timeStepsPerSecond)") 147 print(f" - Min simulation FPS: {MIN_SIM_FPS} (/persistent/simulation/minFrameRate)") 148 print(f" - Simulations enabled: {not DISABLE_SIMULATIONS} (/app/player/playSimulations)") 149 print(f" Rendering:") 150 print( 151 f" - App FPS limit: {APP_FPS if LIMIT_APP_FPS else 'unlimited'} (/app/runLoops/main/rateLimitFrequency)" 152 ) 153 154 # Start the timeline 155 print(f"Starting the timeline...") 156 timeline.set_current_time(0) 157 timeline.set_end_time(10000) 158 timeline.set_looping(False) 159 timeline.play() 160 timeline.commit() 161 wall_start_time = time.time() 162 163 # Subscribe to events 164 print(f"Subscribing to events...") 165 timeline_events = [] 166 timeline_sub = timeline.get_timeline_event_stream().create_subscription_to_pop(on_timeline_event) 167 physx_events = [] 168 physx_sub = omni.physx.get_physx_interface().subscribe_physics_step_events(on_physics_step) 169 stage_render_events = [] 170 stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event( 171 event_name=omni.usd.get_context().stage_rendering_event_name( 172 omni.usd.StageRenderingEventType.NEW_FRAME, True 173 ), 174 on_event=on_stage_render_event, 175 observer_name="subscribers_and_events.on_stage_render_event", 176 ) 177 app_update_events = [] 178 app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event( 179 event_name=omni.kit.app.GLOBAL_EVENT_UPDATE, 180 on_event=on_app_update, 181 observer_name="subscribers_and_events.on_app_update", 182 ) 183 184 # Run app updates and cache events 185 print(f"Starting running the application for {NUM_APP_UPDATES} updates...") 186 for i in range(NUM_APP_UPDATES): 187 if VERBOSE: 188 print(f"[app update loop][{i+1}/{NUM_APP_UPDATES}]") 189 await omni.kit.app.get_app().next_update_async() 190 elapsed_wall_time = time.time() - wall_start_time 191 print(f"Finished running the application for {NUM_APP_UPDATES} updates...") 192 193 # Stop timeline and unsubscribe from all events 194 print(f"Stopping timeline and unsubscribing from all events...") 195 timeline.stop() 196 if app_sub: 197 app_sub.reset() 198 app_sub = None 199 if stage_render_sub: 200 stage_render_sub.reset() 201 stage_render_sub = None 202 if physx_sub: 203 physx_sub.unsubscribe() 204 physx_sub = None 205 if timeline_sub: 206 timeline_sub.unsubscribe() 207 timeline_sub = None 208 209 # Print summary statistics 210 print("\nStats:") 211 print(f"- App updates: {NUM_APP_UPDATES}") 212 print(f"- Wall time: {elapsed_wall_time:.4f} seconds") 213 print(f"- Timeline events: {len(timeline_events)}") 214 print(f"- Physics events: {len(physx_events)}") 215 print(f"- Stage render events: {len(stage_render_events)}") 216 print(f"- App update events: {len(app_update_events)}") 217 218 # Calculate and display real-time performance factor 219 if len(physx_events) > 0: 220 sim_time = sum(physx_events) 221 realtime_factor = sim_time / elapsed_wall_time if elapsed_wall_time > 0 else 0 222 print(f"- Simulation time: {sim_time:.4f}s") 223 print(f"- Real-time factor: {realtime_factor:.2f}x") 224 225asyncio.ensure_future(run_subscribers_and_events_async()) Standalone Application Subscribers and Events at Custom FPS 1from isaacsim import SimulationApp 2 3simulation_app = SimulationApp({"headless": False}) 4 5import time 6 7import carb.eventdispatcher 8import carb.settings 9import omni.kit.app 10import omni.physx 11import omni.timeline 12import omni.usd 13from pxr import PhysxSchema, UsdPhysics 14 15# TIMELINE / STAGE 16USE_CUSTOM_TIMELINE_SETTINGS = True 17USE_FIXED_TIME_STEPPING = True 18PLAY_EVERY_FRAME = True 19PLAY_DELAY_COMPENSATION = 0.0 20SUBSAMPLE_RATE = 1 21STAGE_FPS = 30.0 22 23# PHYSX 24USE_CUSTOM_PHYSX_FPS = False 25PHYSX_FPS = 60.0 26MIN_SIM_FPS = 30 27 28# Simulations can also be enabled/disabled at runtime 29DISABLE_SIMULATIONS = False 30 31# APP / RENDER 32LIMIT_APP_FPS = False 33APP_FPS = 120 34 35# Number of app updates to run while collecting events 36NUM_APP_UPDATES = 100 37 38# Print the captured events 39VERBOSE = False 40 41 42def on_timeline_event(event: omni.timeline.TimelineEventType): 43 global timeline_events 44 if event.type == omni.timeline.TimelineEventType.CURRENT_TIME_TICKED.value: 45 timeline_events.append(event.payload) 46 if VERBOSE: 47 print(f" [timeline][{len(timeline_events)}] {event.payload}") 48 49 50def on_physics_step(dt: float): 51 global physx_events 52 physx_events.append(dt) 53 if VERBOSE: 54 print(f" [physics][{len(physx_events)}] dt={dt}") 55 56 57def on_stage_render_event(event: carb.eventdispatcher.Event): 58 global stage_render_events 59 stage_render_events.append(event.event_name) 60 if VERBOSE: 61 print(f" [stage render][{len(stage_render_events)}] {event.event_name}") 62 63 64def on_app_update(event: carb.eventdispatcher.Event): 65 global app_update_events 66 app_update_events.append(event.event_name) 67 if VERBOSE: 68 print(f" [app update][{len(app_update_events)}] {event.event_name}") 69 70 71stage = omni.usd.get_context().get_stage() 72timeline = omni.timeline.get_timeline_interface() 73 74 75if USE_CUSTOM_TIMELINE_SETTINGS: 76 # Ideal to make simulation and animation synchronized. 77 # Default: True in editor, False in standalone. 78 # NOTE: 79 # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time. 80 # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs'). 81 # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate` 82 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING) 83 84 # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback. 85 # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect. 86 # Default: 0.0 87 # NOTE: 88 # - only effective if `useFixedTimeStepping` is set to True 89 # - setting a large value results in long fast playback after a huge lag spike 90 carb.settings.get_settings().set("/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION) 91 92 # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`. 93 # Default: False 94 # NOTE: 95 # - only effective if `useFixedTimeStepping` is set to True 96 # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop 97 # - useful for recording 98 # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)` 99 timeline.set_play_every_frame(PLAY_EVERY_FRAME) 100 101 # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame. 102 # Default: 1 103 # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)` 104 timeline.set_ticks_per_frame(SUBSAMPLE_RATE) 105 106 # Time codes per second for the stage 107 # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)` 108 timeline.set_time_codes_per_second(STAGE_FPS) 109 110 111# Create a PhysX scene to set the physics time step 112if USE_CUSTOM_PHYSX_FPS: 113 physx_scene = None 114 for prim in stage.Traverse(): 115 if prim.IsA(UsdPhysics.Scene): 116 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim) 117 break 118 if physx_scene is None: 119 UsdPhysics.Scene.Define(stage, "/PhysicsScene") 120 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene")) 121 122 # Time step for the physics simulation 123 # Default: 60.0 124 physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS) 125 126 # Minimum simulation frequency to prevent clamping; if the frame rate drops below this, 127 # physics steps are discarded to avoid app slowdown if the overall frame rate is too low. 128 # Default: 30.0 129 # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update. 130 carb.settings.get_settings().set("/persistent/simulation/minFrameRate", MIN_SIM_FPS) 131 132 133# Throttle Render/UI/Main thread update rate 134if LIMIT_APP_FPS: 135 # Enable rate limiting of the main run loop (UI, rendering, etc.) 136 # Default: False 137 carb.settings.get_settings().set("/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS) 138 139 # FPS limit of the main run loop (UI, rendering, etc.) 140 # Default: 120 141 # NOTE: disabled if `/app/player/useFixedTimeStepping` is False 142 carb.settings.get_settings().set("/app/runLoops/main/rateLimitFrequency", int(APP_FPS)) 143 144 145# Simulations can be selectively disabled (or toggled at specific times) 146if DISABLE_SIMULATIONS: 147 carb.settings.get_settings().set("/app/player/playSimulations", False) 148 149print("Configuration:") 150print(f" Timeline:") 151print(f" - Stage FPS: {STAGE_FPS} (/app/stage/timeCodesPerSecond)") 152print(f" - Fixed time stepping: {USE_FIXED_TIME_STEPPING} (/app/player/useFixedTimeStepping)") 153print(f" - Play every frame: {PLAY_EVERY_FRAME} (/app/player/useFastMode)") 154print(f" - Subsample rate: {SUBSAMPLE_RATE} (/app/player/timelineSubsampleRate)") 155print(f" - Play delay compensation: {PLAY_DELAY_COMPENSATION}s (/app/player/CompensatePlayDelayInSecs)") 156print(f" Physics:") 157print(f" - PhysX FPS: {PHYSX_FPS} (physxScene.timeStepsPerSecond)") 158print(f" - Min simulation FPS: {MIN_SIM_FPS} (/persistent/simulation/minFrameRate)") 159print(f" - Simulations enabled: {not DISABLE_SIMULATIONS} (/app/player/playSimulations)") 160print(f" Rendering:") 161print(f" - App FPS limit: {APP_FPS if LIMIT_APP_FPS else 'unlimited'} (/app/runLoops/main/rateLimitFrequency)") 162 163 164# Start the timeline 165print(f"Starting the timeline...") 166timeline.set_current_time(0) 167timeline.set_end_time(10000) 168timeline.set_looping(False) 169timeline.play() 170timeline.commit() 171wall_start_time = time.time() 172 173# Subscribe to events 174print(f"Subscribing to events...") 175timeline_events = [] 176timeline_sub = timeline.get_timeline_event_stream().create_subscription_to_pop(on_timeline_event) 177physx_events = [] 178physx_sub = omni.physx.get_physx_interface().subscribe_physics_step_events(on_physics_step) 179stage_render_events = [] 180stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event( 181 event_name=omni.usd.get_context().stage_rendering_event_name(omni.usd.StageRenderingEventType.NEW_FRAME, True), 182 on_event=on_stage_render_event, 183 observer_name="subscribers_and_events.on_stage_render_event", 184) 185app_update_events = [] 186app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event( 187 event_name=omni.kit.app.GLOBAL_EVENT_UPDATE, 188 on_event=on_app_update, 189 observer_name="subscribers_and_events.on_app_update", 190) 191 192# Run app updates and cache events 193print(f"Starting running the application for {NUM_APP_UPDATES} updates.") 194for i in range(NUM_APP_UPDATES): 195 if VERBOSE: 196 print(f"[app update loop][{i+1}/{NUM_APP_UPDATES}]") 197 simulation_app.update() 198elapsed_wall_time = time.time() - wall_start_time 199print(f"Finished running the application for {NUM_APP_UPDATES} updates...") 200 201# Stop timeline and unsubscribe from all events 202timeline.stop() 203if app_sub: 204 app_sub.reset() 205 app_sub = None 206if stage_render_sub: 207 stage_render_sub.reset() 208 stage_render_sub = None 209if physx_sub: 210 physx_sub.unsubscribe() 211 physx_sub = None 212if timeline_sub: 213 timeline_sub.unsubscribe() 214 timeline_sub = None 215 216 217# Print summary statistics 218print("\nStats:") 219print(f"- App updates: {NUM_APP_UPDATES}") 220print(f"- Wall time: {elapsed_wall_time:.4f} seconds") 221print(f"- Timeline events: {len(timeline_events)}") 222print(f"- Physics events: {len(physx_events)}") 223print(f"- Stage render events: {len(stage_render_events)}") 224print(f"- App update events: {len(app_update_events)}") 225 226# Calculate and display real-time performance factor 227if len(physx_events) > 0: 228 sim_time = sum(physx_events) 229 realtime_factor = sim_time / elapsed_wall_time if elapsed_wall_time > 0 else 0 230 print(f"- Simulation time: {sim_time:.4f}s") 231 print(f"- Real-time factor: {realtime_factor:.2f}x") 232 233simulation_app.close() Accessing Writer and Annotator Data at Custom FPS# Example of how to trigger a writer and access annotator data at a custom FPS, with product rendering disabled when the data is not needed. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_fps_writer_annotator.py Note It is currently not possible to change timeline (stage) FPS after the replicator graph creation as it causes a graph reset. This issue is being addressed. As a workaround make sure you are setting the timeline (stage) parameters before creating the replicator graph. Script Editor Accessing Writer and Annotator Data at Custom FPS 1import asyncio 2import os 3 4import carb.settings 5import omni.kit.app 6import omni.replicator.core as rep 7import omni.timeline 8import omni.usd 9 10# Configuration 11NUM_CAPTURES = 6 12VERBOSE = True 13 14# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate 15STAGE_FPS = 100.0 16SENSOR_FPS = 10.0 17SENSOR_DT = 1.0 / SENSOR_FPS 18 19 20async def run_custom_fps_example_async(duration_seconds): 21 # Create a new stage 22 await omni.usd.get_context().new_stage_async() 23 24 # Set DLSS to Quality mode (2) for best SDG results, options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 25 carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 26 27 # Disable capture on play (data will only be accessed at custom times) 28 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False) 29 30 # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time) 31 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True) 32 33 # Set the timeline parameters 34 timeline = omni.timeline.get_timeline_interface() 35 timeline.set_looping(False) 36 timeline.set_current_time(0.0) 37 timeline.set_end_time(10) 38 timeline.set_time_codes_per_second(STAGE_FPS) 39 timeline.play() 40 timeline.commit() 41 42 # Create scene with a semantically annotated cube with physics 43 rep.functional.create.dome_light(intensity=250) 44 cube = rep.functional.create.cube(position=(0, 0, 3), semantics={"class": "cube"}) 45 rep.functional.physics.apply_collider(cube) 46 rep.functional.physics.apply_rigid_body(cube) 47 48 # Create render product (disabled until data capture is needed) 49 rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512), name="rp") 50 rp.hydra_texture.set_updates_enabled(False) 51 52 # Create a writer and an annotator as examples of different ways of accessing data 53 out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb") 54 print(f"Writer data will be written to: {out_dir_rgb}") 55 writer_rgb = rep.WriterRegistry.get("BasicWriter") 56 writer_rgb.initialize(output_dir=out_dir_rgb, rgb=True) 57 writer_rgb.attach(rp) 58 annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera") 59 annot_depth.attach(rp) 60 61 # Run the simulation for the given number of frames and access the data at the desired framerates 62 print( 63 f"Starting simulation: {duration_seconds:.2f}s duration, {SENSOR_FPS:.0f} FPS sensor, {STAGE_FPS:.0f} FPS timeline" 64 ) 65 66 frame_count = 0 67 previous_time = timeline.get_current_time() 68 elapsed_time = 0.0 69 iteration = 0 70 71 while timeline.get_current_time() < duration_seconds: 72 current_time = timeline.get_current_time() 73 delta_time = current_time - previous_time 74 elapsed_time += delta_time 75 76 # Simulation progress 77 if VERBOSE: 78 print(f"Step {iteration}: timeline time={current_time:.3f}s, elapsed time={elapsed_time:.3f}s") 79 80 # Trigger sensor at desired framerate (use small epsilon for floating point comparison) 81 if elapsed_time >= SENSOR_DT - 1e-9: 82 elapsed_time -= SENSOR_DT # Reset with remainder to maintain accuracy 83 84 rp.hydra_texture.set_updates_enabled(True) 85 await rep.orchestrator.step_async(delta_time=0.0, pause_timeline=False, rt_subframes=16) 86 annot_data = annot_depth.get_data() 87 88 print(f"\n >> Capturing frame {frame_count} at time={current_time:.3f}s | shape={annot_data.shape}\n") 89 frame_count += 1 90 91 rp.hydra_texture.set_updates_enabled(False) 92 93 previous_time = current_time 94 # Advance the app (timeline) by one frame 95 await omni.kit.app.get_app().next_update_async() 96 iteration += 1 97 98 # Wait for writer to finish 99 await rep.orchestrator.wait_until_complete_async() 100 101 102# Run example with duration for all captures plus a buffer of 5 frames 103duration = (NUM_CAPTURES * SENSOR_DT) + (5.0 / STAGE_FPS) 104asyncio.ensure_future(run_custom_fps_example_async(duration_seconds=duration)) Standalone Application Accessing Writer and Annotator Data at Custom FPS 1from isaacsim import SimulationApp 2 3from isaacsim import SimulationApp 4 5simulation_app = SimulationApp({"headless": False}) 6 7import os 8 9import carb.settings 10import omni.kit.app 11import omni.replicator.core as rep 12import omni.timeline 13import omni.usd 14 15# Configuration 16NUM_CAPTURES = 6 17VERBOSE = True 18 19# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate 20STAGE_FPS = 100.0 21SENSOR_FPS = 10.0 22SENSOR_DT = 1.0 / SENSOR_FPS 23 24 25def run_custom_fps_example(duration_seconds): 26 # Create a new stage 27 omni.usd.get_context().new_stage() 28 29 # Set DLSS to Quality mode (2) for best SDG results, options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 30 carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 31 32 # Disable capture on play (data will only be accessed at custom times) 33 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False) 34 35 # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time) 36 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True) 37 38 # Set the timeline parameters 39 timeline = omni.timeline.get_timeline_interface() 40 timeline.set_looping(False) 41 timeline.set_current_time(0.0) 42 timeline.set_end_time(10) 43 timeline.set_time_codes_per_second(STAGE_FPS) 44 timeline.play() 45 timeline.commit() 46 47 # Create scene with a semantically annotated cube with physics 48 rep.functional.create.dome_light(intensity=250) 49 cube = rep.functional.create.cube(position=(0, 0, 3), semantics={"class": "cube"}) 50 rep.functional.physics.apply_collider(cube) 51 rep.functional.physics.apply_rigid_body(cube) 52 53 # Create render product (disabled until data capture is needed) 54 rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512), name="rp") 55 rp.hydra_texture.set_updates_enabled(False) 56 57 # Create a writer and an annotator as examples of different ways of accessing data 58 out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb") 59 print(f"Writer data will be written to: {out_dir_rgb}") 60 writer_rgb = rep.WriterRegistry.get("BasicWriter") 61 writer_rgb.initialize(output_dir=out_dir_rgb, rgb=True) 62 writer_rgb.attach(rp) 63 annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera") 64 annot_depth.attach(rp) 65 66 # Run the simulation for the given number of frames and access the data at the desired framerates 67 print( 68 f"Starting simulation: {duration_seconds:.2f}s duration, {SENSOR_FPS:.0f} FPS sensor, {STAGE_FPS:.0f} FPS timeline" 69 ) 70 71 frame_count = 0 72 previous_time = timeline.get_current_time() 73 elapsed_time = 0.0 74 iteration = 0 75 76 while timeline.get_current_time() < duration_seconds: 77 current_time = timeline.get_current_time() 78 delta_time = current_time - previous_time 79 elapsed_time += delta_time 80 81 # Simulation progress 82 if VERBOSE: 83 print(f"Step {iteration}: timeline time={current_time:.3f}s, elapsed time={elapsed_time:.3f}s") 84 85 # Trigger sensor at desired framerate (use small epsilon for floating point comparison) 86 if elapsed_time >= SENSOR_DT - 1e-9: 87 elapsed_time -= SENSOR_DT # Reset with remainder to maintain accuracy 88 89 rp.hydra_texture.set_updates_enabled(True) 90 rep.orchestrator.step(delta_time=0.0, pause_timeline=False, rt_subframes=16) 91 annot_data = annot_depth.get_data() 92 93 print(f"\n >> Capturing frame {frame_count} at time={current_time:.3f}s | shape={annot_data.shape}\n") 94 frame_count += 1 95 96 rp.hydra_texture.set_updates_enabled(False) 97 98 previous_time = current_time 99 # Advance the app (timeline) by one frame 100 simulation_app.update() 101 iteration += 1 102 103 # Wait for writer to finish 104 rep.orchestrator.wait_until_complete() 105 106 107# Run example with duration for all captures plus a buffer of 5 frames 108duration = (NUM_CAPTURES * SENSOR_DT) + (5.0 / STAGE_FPS) 109run_custom_fps_example(duration_seconds=duration) 110 111simulation_app.close() Cosmos Writer Example# This example demonstrates the CosmosWriter for capturing multi-modal synthetic data compatible with NVIDIA Cosmos world foundation models. It creates a simple falling box scene and captures synchronized RGB, segmentation, depth, and edge data (images and videos) that can be used with Cosmos Transfer to generate photorealistic variations. For a more detailed tutorial please see Cosmos Synthetic Data Generation. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):./python.sh standalone_examples/api/isaacsim.replicator.examples/cosmos_writer_simple.py Script Editor Cosmos Writer Example 1import asyncio 2import os 3 4import carb.settings 5import omni.replicator.core as rep 6import omni.timeline 7import omni.usd 8 9SEGMENTATION_MAPPING = { 10 "plane": [0, 0, 255, 255], 11 "cube": [255, 0, 0, 255], 12 "sphere": [0, 255, 0, 255], 13} 14NUM_FRAMES = 60 15 16async def run_cosmos_example_async(num_frames, segmentation_mapping=None): 17 # Create a new stage 18 omni.usd.get_context().new_stage() 19 20 # CosmosWriter requires script nodes to be enabled 21 carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True) 22 23 # Disable capture on play, data is captured manually using the step function 24 rep.orchestrator.set_capture_on_play(False) 25 26 # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 27 carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 28 29 # Set the stage properties 30 rep.settings.set_stage_up_axis("Z") 31 rep.settings.set_stage_meters_per_unit(1.0) 32 rep.functional.create.dome_light(intensity=500) 33 34 # Create the scenario with a ground plane and a falling sphere and cube. 35 plane = rep.functional.create.plane(position=(0, 0, 0), scale=(10, 10, 1), semantics={"class": "plane"}) 36 rep.functional.physics.apply_collider(plane) 37 38 sphere = rep.functional.create.sphere(position=(0, 0, 3), semantics={"class": "sphere"}) 39 rep.functional.physics.apply_collider(sphere) 40 rep.functional.physics.apply_rigid_body(sphere) 41 42 cube = rep.functional.create.cube(position=(1, 1, 2), scale=0.5, semantics={"class": "cube"}) 43 rep.functional.physics.apply_collider(cube) 44 rep.functional.physics.apply_rigid_body(cube) 45 46 # Set up the writer 47 camera = rep.functional.create.camera(position=(5, 5, 3), look_at=(0, 0, 0)) 48 rp = rep.create.render_product(camera, (1280, 720)) 49 out_dir = os.path.join(os.getcwd(), "_out_cosmos_simple") 50 print(f"Output directory: {out_dir}") 51 cosmos_writer = rep.WriterRegistry.get("CosmosWriter") 52 cosmos_writer.initialize(output_dir=out_dir, segmentation_mapping=segmentation_mapping) 53 cosmos_writer.attach(rp) 54 55 # Start the simulation 56 timeline = omni.timeline.get_timeline_interface() 57 timeline.play() 58 59 # Capture a frame every app update 60 for i in range(num_frames): 61 print(f"Frame {i+1}/{num_frames}") 62 await omni.kit.app.get_app().next_update_async() 63 await rep.orchestrator.step_async(delta_time=0.0, pause_timeline=False) 64 timeline.pause() 65 66 # Wait for all data to be written 67 await rep.orchestrator.wait_until_complete_async() 68 print("Data generation complete!") 69 cosmos_writer.detach() 70 rp.destroy() 71 72asyncio.ensure_future(run_cosmos_example_async(num_frames=NUM_FRAMES, segmentation_mapping=SEGMENTATION_MAPPING)) Standalone Application Cosmos Writer Example 1from isaacsim import SimulationApp 2 3simulation_app = SimulationApp(launch_config={"headless": False}) 4 5import os 6 7import carb.settings 8import omni.replicator.core as rep 9import omni.timeline 10import omni.usd 11 12SEGMENTATION_MAPPING = { 13 "plane": [0, 0, 255, 255], 14 "cube": [255, 0, 0, 255], 15 "sphere": [0, 255, 0, 255], 16} 17NUM_FRAMES = 60 18 19 20def run_cosmos_example(num_frames, segmentation_mapping=None): 21 # Create a new stage 22 omni.usd.get_context().new_stage() 23 24 # CosmosWriter requires script nodes to be enabled 25 carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True) 26 27 # Disable capture on play, data is captured manually using the step function 28 rep.orchestrator.set_capture_on_play(False) 29 30 # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto) 31 carb.settings.get_settings().set("rtx/post/dlss/execMode", 2) 32 33 # Set the stage properties 34 rep.settings.set_stage_up_axis("Z") 35 rep.settings.set_stage_meters_per_unit(1.0) 36 rep.functional.create.dome_light(intensity=500) 37 38 # Create the scenario with a ground plane and a falling sphere and cube. 39 plane = rep.functional.create.plane(position=(0, 0, 0), scale=(10, 10, 1), semantics={"class": "plane"}) 40 rep.functional.physics.apply_collider(plane) 41 42 sphere = rep.functional.create.sphere(position=(0, 0, 3), semantics={"class": "sphere"}) 43 rep.functional.physics.apply_collider(sphere) 44 rep.functional.physics.apply_rigid_body(sphere) 45 46 cube = rep.functional.create.cube(position=(1, 1, 2), scale=0.5, semantics={"class": "cube"}) 47 rep.functional.physics.apply_collider(cube) 48 rep.functional.physics.apply_rigid_body(cube) 49 50 # Set up the writer 51 camera = rep.functional.create.camera(position=(5, 5, 3), look_at=(0, 0, 0)) 52 rp = rep.create.render_product(camera, (1280, 720)) 53 out_dir = os.path.join(os.getcwd(), "_out_cosmos_simple") 54 print(f"Output directory: {out_dir}") 55 cosmos_writer = rep.WriterRegistry.get("CosmosWriter") 56 cosmos_writer.initialize(output_dir=out_dir, segmentation_mapping=segmentation_mapping) 57 cosmos_writer.attach(rp) 58 59 # Start the simulation 60 timeline = omni.timeline.get_timeline_interface() 61 timeline.play() 62 63 # Capture a frame every app update 64 for i in range(num_frames): 65 print(f"Frame {i+1}/{num_frames}") 66 simulation_app.update() 67 rep.orchestrator.step(delta_time=0.0, pause_timeline=False) 68 timeline.pause() 69 70 # Wait for all data to be written 71 rep.orchestrator.wait_until_complete() 72 print("Data generation complete!") 73 cosmos_writer.detach() 74 rp.destroy() 75 76 77run_cosmos_example(num_frames=NUM_FRAMES, segmentation_mapping=SEGMENTATION_MAPPING) 78 79simulation_app.close()

Related Articles