Shallow-π Profiling

Shallow-$\pi$가 잘 구현되었다는 것은 확인했으니, 다음으로 이 모델의 inference 과정에서 주요 bottleneck이 어디인지를 알아보기 위해 profiling을 수행한다. 이 서버는 여러 사용자가 함께 쓰는 공용 GPU 서버이므로, 전체 시스템을 독점한 dedicated benchmark 환경은 아니다. 따라서 이번 profiling에서는 실행 GPU를 1개의 L40S로 고정하고, 해당 GPU는 실험 중 단독으로 사용하기로 합의했다. 다만 CPU, memory, storage I/O, OS background load는 다른 사용자의 작업 영향을 받을 수 있으므로, 이후 latency 수치는 절대적인 서버 최대 성능이라기보다 shared-server 환경에서의 병목 분석용 측정값으로 해석한다.

1. baseline latency script

Profiler를 켜면 overhead가 추가적으로 생기므로, 먼저 profiler 없는 순수 latency, 즉 model-only latency baseline를 알아야 나중에 profiler 결과를 해석하기에 유리하다. 따라서 아래와 같은 파이썬 코드를 만들었다:

profile_shallow_pi_latency.py
#!/usr/bin/env python3

import argparse
import json
import pathlib
import statistics
import time
from typing import Any

import jax
import numpy as np
import torch

from openpi.models import model as _model
from openpi.policies import libero_policy
from openpi.policies import policy_config
from openpi.training import config as _config


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()

    parser.add_argument("--config", type=str, default="pi0_libero_l06")
    parser.add_argument("--ckpt", type=str, required=True)
    parser.add_argument("--device", type=str, default="cuda:0")

    parser.add_argument("--num-steps", type=int, default=10)
    parser.add_argument("--mode", type=str, choices=["model", "policy"], default="model")
    parser.add_argument("--fixed-noise", action="store_true")

    parser.add_argument("--warmup", type=int, default=30)
    parser.add_argument("--iters", type=int, default=100)
    parser.add_argument("--seed", type=int, default=0)

    parser.add_argument("--out-json", type=str, default="profiles/latency/shallow_pi_latency.json")

    return parser.parse_args()


def summarize(values: list[float]) -> dict[str, float]:
    values = sorted(values)
    n = len(values)

    def percentile(p: float) -> float:
        idx = min(n - 1, int(round((p / 100.0) * (n - 1))))
        return values[idx]

    return {
        "count": n,
        "mean_ms": statistics.mean(values),
        "median_ms": statistics.median(values),
        "p90_ms": percentile(90),
        "p95_ms": percentile(95),
        "p99_ms": percentile(99),
        "min_ms": min(values),
        "max_ms": max(values),
    }


def make_policy(args: argparse.Namespace):
    train_config = _config.get_config(args.config)

    # create_trained_policy() detects PyTorch checkpoint by model.safetensors.
    # sample_kwargs is passed to model.sample_actions().
    policy = policy_config.create_trained_policy(
        train_config,
        args.ckpt,
        pytorch_device=args.device,
        sample_kwargs={"num_steps": args.num_steps},
    )

    return policy


def make_example() -> dict[str, Any]:
    # Random LIBERO-style dummy observation.
    # This checks model inference path, not task success.
    return libero_policy.make_libero_example()


def prepare_model_only_observation(policy, example: dict[str, Any], device: str):
    """
    Reproduce the PyTorch branch of Policy.infer(), but only once.

    Purpose:
      - input transform once
      - CPU -> GPU copy once
      - Observation object once
      - repeated benchmark measures only sample_actions()
    """
    inputs = jax.tree.map(lambda x: x, example)
    inputs = policy._input_transform(inputs)

    inputs = jax.tree.map(
        lambda x: torch.from_numpy(np.array(x)).to(device)[None, ...],
        inputs,
    )

    observation = _model.Observation.from_dict(inputs)
    return observation, inputs


def make_fixed_noise(policy, observation, device: str, seed: int):
    bsize = observation.state.shape[0]
    action_horizon = policy._model.config.action_horizon
    action_dim = policy._model.config.action_dim

    generator = torch.Generator(device=device)
    generator.manual_seed(seed)

    return torch.randn(
        (bsize, action_horizon, action_dim),
        device=device,
        dtype=torch.float32,
        generator=generator,
    )


@torch.inference_mode()
def run_model_only(policy, device: str, observation, noise, num_steps: int):
    if noise is None:
        return policy._sample_actions(device, observation, num_steps=num_steps)

    return policy._sample_actions(device, observation, noise=noise, num_steps=num_steps)


@torch.inference_mode()
def run_policy(policy, example: dict[str, Any]):
    return policy.infer(example)


def measure_cuda_event_ms(fn) -> float:
    start = torch.cuda.Event(enable_timing=True)
    end = torch.cuda.Event(enable_timing=True)

    torch.cuda.synchronize()
    start.record()

    _ = fn()

    end.record()
    end.synchronize()

    return float(start.elapsed_time(end))


def measure_sync_wall_ms(fn) -> float:
    torch.cuda.synchronize()
    t0 = time.perf_counter()

    _ = fn()

    torch.cuda.synchronize()
    t1 = time.perf_counter()

    return (t1 - t0) * 1000.0


def main() -> None:
    args = parse_args()

    pathlib.Path(args.out_json).parent.mkdir(parents=True, exist_ok=True)

    torch.manual_seed(args.seed)
    np.random.seed(args.seed)

    if not torch.cuda.is_available():
        raise RuntimeError("CUDA is not available. This profiling script expects a CUDA GPU.")

    print("[INFO] config:", args.config)
    print("[INFO] ckpt:", args.ckpt)
    print("[INFO] device:", args.device)
    print("[INFO] num_steps:", args.num_steps)
    print("[INFO] mode:", args.mode)
    print("[INFO] fixed_noise:", args.fixed_noise)
    print("[INFO] torch:", torch.__version__)
    print("[INFO] torch cuda:", torch.version.cuda)
    print("[INFO] visible devices:", torch.cuda.device_count())
    print("[INFO] device name:", torch.cuda.get_device_name(0))

    policy = make_policy(args)
    example = make_example()

    if args.mode == "model":
        observation, _ = prepare_model_only_observation(policy, example, args.device)
        noise = make_fixed_noise(policy, observation, args.device, args.seed) if args.fixed_noise else None

        def fn():
            return run_model_only(policy, args.device, observation, noise, args.num_steps)

    else:
        def fn():
            return run_policy(policy, example)

    print(f"[INFO] warmup start: {args.warmup}")
    for _ in range(args.warmup):
        _ = fn()
    torch.cuda.synchronize()
    print("[INFO] warmup done")

    # One output shape sanity check.
    out = fn()
    torch.cuda.synchronize()

    if isinstance(out, dict):
        print("[INFO] output keys:", list(out.keys()))
        if "actions" in out:
            print("[INFO] output actions shape:", np.asarray(out["actions"]).shape)
    else:
        print("[INFO] output tensor shape:", tuple(out.shape))
        print("[INFO] output dtype:", out.dtype)

    cuda_event_ms = []
    sync_wall_ms = []

    print(f"[INFO] measurement start: {args.iters}")
    for i in range(args.iters):
        cuda_event_ms.append(measure_cuda_event_ms(fn))
        sync_wall_ms.append(measure_sync_wall_ms(fn))

    result = {
        "config": args.config,
        "ckpt": args.ckpt,
        "device": args.device,
        "num_steps": args.num_steps,
        "mode": args.mode,
        "fixed_noise": args.fixed_noise,
        "warmup": args.warmup,
        "iters": args.iters,
        "cuda_event": summarize(cuda_event_ms),
        "sync_wall": summarize(sync_wall_ms),
    }

    print(json.dumps(result, indent=2))

    with open(args.out_json, "w", encoding="utf-8") as f:
        json.dump(result, f, indent=2)


if __name__ == "__main__":
    main()

위 코드는 실제 LIBERO observation 대신 shape만 같은 랜덤한 observation과 “do something”이라는 prompt를 줘서 inference path에만 집중한다. 두 가지 mode로 돌아가는데

Comments