Code Docs

Implementing Diffusion Policies in Ark

Introduction

This tutorial walks you through the end‑to‑end workflow of training, validating, and simulating a Diffusion Policy for robotic manipulation in Ark.

After completing it you will know how to:

  1. Gather data using Ark.
  2. Choose task‑appropriate hyper‑parameters.
  3. Train a diffusion‑based action model.
  4. Diagnose model quality with visual tools.
  5. Execute rollouts using the Gym Interface.

Screen Recording 2025-07-25 at 12.55.07 pm.mov

With these steps, you can reliably reproduce state‑of‑the‑art imitation learning performance on your own tasks.

Pre Requisites

Make sure Ark_framework and Ark_types are installed

Setting Up

Git clone the repository:

https://github.com/Robotics-Ark/ark_diffusion_policies_on_franka

Data Collection Using Ark

  1. Open the project folder

    cd data_collection
    
  2. Start the simulation node

    Follow the steps in PyBullet Simulation → https://www.notion.so/Pybullet-Simulation-22be053d9c6f80a89cfdcda43fc4eee6?pvs=21.

  3. Launch the data-collection environment

    Refer to Ark Gym → https://www.notion.so/Ark-Gym-22be053d9c6f80a4bad8df1e2d55b8f2?pvs=21.

  4. Don’t forget the Ark registry ⚠️

    Make sure an Ark registry is running before you start the gym; otherwise, trajectory files won’t be recorded.

  5. Verify the simulation window

    A pop-up should appear showing the robot grasping the cube and placing it on the target.

  6. Check your trajectories

    After every successful attempt, a new .pkl file is saved automatically in:

    data_collection/trajectories/
    

That’s it—run, watch, and let the trajectories accumulate!

Our Expert Policy

class PickPlacePolicy:
    def __init__(self, steps_per_phase=100):
        self.state_id = 0
        self.step_in_phase = 0
        self.steps_per_phase = steps_per_phase
        self.gripper = 1.0  # Open initially

        self.trajectory = []  # Will be list of [position, quaternion, gripper]

    def plan_trajectory(self, obs):
        cube = np.array(obs["cube"])
        target = np.array(obs["target"])

        down_quat = R.from_euler("xyz", [np.pi, 0, 0]).as_quat()

        # Define key poses
        waypoints = [
            (cube + [0, 0, 0.5], 1.0),  # above cube
            (cube + [0, 0, 0.15], 1.0),  # at cube (open)
            (cube + [0, 0, 0.15], 0.0),  # grasp (close)
            (cube + [0, 0, 0.3], 0.0),  # lift
            (target + [0, 0, 0.4], 0.0),  # above target
            (target + [0, 0, 0.3], 0.0),  # at target (closed)
            (target + [0, 0, 0.3], 1.0),  # release
            (target + [0, 0, 0.3], 1.0),  # hover
        ]

        self.trajectory = []
        for i in range(len(waypoints) - 1):
            start_pos, start_grip = waypoints[i]
            end_pos, end_grip = waypoints[i + 1]
            for s in range(self.steps_per_phase):
                alpha = s / self.steps_per_phase
                pos = (1 - alpha) * start_pos + alpha * end_pos
                grip = (1 - alpha) * start_grip + alpha * end_grip
                self.trajectory.append((pos, down_quat, grip))

    def __call__(self, obs):
        if not self.trajectory:
            self.plan_trajectory(obs)

        if self.state_id >= len(self.trajectory):
            # Hold last pose
            pos, quat, grip = self.trajectory[-1]
        else:
            pos, quat, grip = self.trajectory[self.state_id]
            self.state_id += 1

        return list(pos) + list(quat) + [grip]

Here’s a walkthrough of what the PickPlacePolicy is doing, step by step.


1. Initialization

def __init__(self, steps_per_phase=100):
    self.state_id = 0            # index of the next timestep to output
    self.step_in_phase = 0       # (unused—leftover from an earlier design)
    self.steps_per_phase = steps_per_phase
    self.gripper = 1.0           # 1 = open, 0 = closed

    self.trajectory = []         # filled lazily the first time the policy is called

2. Planning the Trajectory

This happens only the first time __call__ sees an observation (or if you explicitly reset the object).

cube   = np.array(obs["cube"])     # (x, y, z) of the cube’s center
target = np.array(obs["target"])   # (x, y, z) of the target center
down_quat = R.from_euler("xyz", [π, 0, 0]).as_quat(

Waypoints (key poses)

1  cube + (0,0,0.50)   gripper 1.0   → hover above cube
2  cube + (0,0,0.15)   gripper 1.0   → descend to grasp height, still open
3  cube + (0,0,0.15)   gripper 0.0   → close gripper (grasp)
4  cube + (0,0,0.30)   gripper 0.0   → lift cube
5  target+(0,0,0.40)   gripper 0.0   → hover above target
6  target+(0,0,0.30)   gripper 0.0   → descend to placement height
7  target+(0,0,0.30)   gripper 1.0   → open gripper (release)
8  target+(0,0,0.30)   gripper 1.0   → brief hover before stopping

Between each consecutive pair, linear interpolation is performed for steps_per_phase steps:

alpha = s / steps_per_phase          # 0 → 1
pos   = (1-alpha)·start + alpha·end
grip  = (1-alpha)·start_grip + alpha·end_grip

Every interpolated triple (pos, down_quat, grip) is stored in self.trajectory.


3. Producing Actions (__call__)

if not self.trajectory:
    self.plan_trajectory(obs)

if self.state_id >= len(self.trajectory):
    pos, quat, grip = self.trajectory[-1]      # hold final pose
else:
    pos, quat, grip = self.trajectory[self.state_id]
    self.state_id += 1

The output format is:

return list(pos) + list(quat) + [grip]
component length meaning
pos 3 x, y, z position of the end-effector (meters)
quat 4 (x, y, z, w) quaternion for downward orientation
grip 1 1.0 =open, 0.0 =closed

Total: 8 numbers—exactly what many low-level controllers expect.


Training

This short guide explains how to prepare data, choose hyper‑parameters, train, and evaluate a Diffusion Policy for robot manipulation using the open‑source code from diffusion‑policy.cs.columbia.edu.

https://diffusion-policy.cs.columbia.edu/

1 What is a Diffusion Policy?

diffusion policy treats action generation as a denoising diffusion process: it starts from random noise in action space and iteratively refines it into a physically valid, goal‑directed action sequence that moves the robot toward the desired state. Because the model learns the entire distribution of expert actions, it naturally captures multimodal behaviours and stays robust to sensor noise.

Key advantages over conventional behavioural cloning:

If you are new to diffusion policies, try the hands‑on Colab tutorial:

https://colab.research.google.com/drive/1gxdkgRVfM55zihY9TFLja97cSVZOZq2B?usp=sharing

2 Prepare & Inspect Your Dataset

  1. Collect demonstrations (images + joint states + actions) in the format expected by the repo.
  2. Launch data_inspector.ipynb and point it to your dataset root.
  3. Use the notebook widgets to:

If anything looks suspicious—e.g.

image.png

3 Choose Hyper‑parameters

Good defaults are provided in the repo, but every task is different. The Data Inspector prints suggested ranges (based on data statistics) for: