← Back to Blog
EN中文

gpuq:为独立 ML 研究者打造的轻量 GPU 任务队列

一张 GPU,五个实验要跑。你会不断地 nvidia-smi 检查、手动 kill 进程、忘记上次跑到哪个 checkpoint。我写了 gpuq 来解决这个问题 —— 一个单文件 Python 工具,把 GPU 实验排成队列顺序执行。

最初只是 50 行的 subprocess 封装,后来逐渐长出了进程接管、ETA 估算、checkpoint 抢占恢复、崩溃恢复等功能。每个功能都是被真实痛点逼出来的。

问题背景

我的硬件:RTX 5080 笔记本,16GB 显存。硕士论文有 5+ 个实验:

  • 各自独立的 .venv(不同版本的 PyTorch/trl)
  • 单次运行 30 分钟到 3 小时以上
  • 两个同时跑必 OOM
  • checkpoint 存在不同目录

我需要的东西介于"手动跑实验"和"搭 Kubernetes + Slurm"之间 —— 单个文件、零依赖、马上能用。

v1:队列 + 执行(30 分钟搞定)

核心循环简单到令人不好意思:

while True:
    pending = db.execute(
        "SELECT * FROM jobs WHERE status = 'pending' ORDER BY id"
    ).fetchall()
    if not pending:
        break

    job = find_next_runnable(pending)  # 检查依赖
    run_job(job)  # subprocess.Popen, wait, 更新状态

通过 CLI 添加任务:

# 使用当前目录,自动查找 .venv/bin/python
gpuq add train.py --lr 1e-4 --epochs 10

# 链式依赖
gpuq add eval.py --after 1

# 后台运行队列
gpuq run --daemon

--daemon 会 fork 到后台。自动从工作目录向上查找 .venv/bin/python

设计决策:SQLite 而非 JSON。 最初用 JSON 文件存状态,直到机器在写入过程中崩溃,JSON 损坏了。SQLite 的 WAL 模式天然崩溃安全,支持并发读取(daemon 运行时查看状态),而且在 Python 里零依赖。

db.execute("PRAGMA journal_mode=WAL")

迁移是自动的 —— 首次运行检测到 queue.json 就自动导入:

if not DB_FILE.exists():
    _migrate_json()  # 一次性操作,原文件改名为 .json.bak

接管运行中的进程

第一个"早该有这个"的时刻:一个训练跑了 2 小时,是在 gpuq 之外启动的。我想把它纳入队列管理,这样就能在它后面挂一个 eval 任务。

gpuq adopt/proc 读取所有信息:

def _read_proc_info(pid):
    proc = Path(f"/proc/{pid}")
    info = {}
    # 命令行:/proc/pid/cmdline(null 分隔)
    info["cmdline"] = (proc / "cmdline").read_bytes()
                       .decode().replace("\x00", " ")
    # 工作目录:/proc/pid/cwd(符号链接)
    info["cwd"] = str((proc / "cwd").resolve())
    # 真实启动时间:/proc/pid/stat 第 22 个字段
    stat = (proc / "stat").read_text().split(")")[-1].split()
    starttime_ticks = int(stat[19])
    clk_tck = os.sysconf("SC_CLK_TCK")
    boot_time = time.time() - float(open("/proc/uptime").read().split()[0])
    info["started_at"] = datetime.fromtimestamp(
        boot_time + starttime_ticks / clk_tck
    )
    return info

真实启动时间很关键 —— 没有它,ETA 估算会以为任务刚开始。/proc/pid/stat 的第 22 个字段是进程自系统启动以来的 clock ticks,结合 /proc/uptime 就能算出实际的 wall-clock 启动时间。

使用方式:

$ gpuq adopt 158500 --name "exp02-training"
[+] Adopted PID 158500 as Job #1: exp02-training
    cmd: python -u run_mixed_train.py --max_steps 300 ...
    cwd: /home/yuxu/repo/msc-fast-exp/exp02-grpo-retrieval-agent

现在 gpuq status 能追踪它了,也能用 --after 1 在后面挂任务。

ETA 估算

两种策略,按优先级尝试:

1. 日志解析 —— 扫描任务日志的最后 200 行,匹配进度模式:

_PROGRESS_PATTERNS = [
    re.compile(r'[Ss]tep\s+(\d+)\s*/\s*(\d+)'),   # Step 50/300
    re.compile(r'\[(\d+)/(\d+)\]'),                  # [50/300]
    re.compile(r'[Ee]poch\s+(\d+)\s*/\s*(\d+)'),    # Epoch 2/10
    re.compile(r'(\d+(?:\.\d+)?)\s*%'),              # 50.0%
]

2. Checkpoint 扫描 —— 对于没有日志的接管进程,从命令行参数中解析 --max_steps--output_dir,然后扫描 checkpoint 目录:

# HuggingFace Trainer 保存 checkpoint-{step}/
for d in scan_dir.glob("checkpoint-*"):
    step = int(d.name.split("-")[-1])
    current_step = max(current_step, step)

# TRL 保存 completions_{step:05d}.parquet
for f in scan_dir.glob("completions/completions_*.parquet"):
    step = int(f.stem.split("_")[-1])
    current_step = max(current_step, step)

ETA = 已用时间 × (总步数 - 当前步数) / 当前步数。简单,但需要正确的扫描目录 —— 我踩过坑:rglob 扫描整个项目目录会找到旧 run 的 checkpoint。现在改为只看 --output_dir

效果直接显示在 status 里:

 ID  St  Name                Script              Time  Info
──────────────────────────────────────────────────────────────
  1  🔄  exp02-training     run_mixed_train.py  1h36m  pid=158500 53% ETA 1h24m
  2  ⏳  exp10-eval         run_transfer_eval.py       after #1

抢占与恢复

最复杂的功能。场景:训练跑了 60%,但你急需 GPU 做一个快速评估。不想浪费已有的 60% 进度。

Preempt 发送 SIGTERM(HuggingFace Trainer 会优雅处理 —— 保存 checkpoint 后退出),等待保存完成,然后标记任务:

def cmd_preempt(args):
    os.kill(pid, signal.SIGTERM)
    for i in range(timeout):
        if not _pid_alive(pid):
            break
        time.sleep(1)
    # 找到最新 checkpoint
    checkpoint = _find_latest_checkpoint(job)
    db.execute("UPDATE jobs SET status='preempted' ...")

Resume 创建新任务,注入 --resume_from_checkpoint 参数:

def cmd_resume(args):
    checkpoint = _find_latest_checkpoint(job)
    new_args = original_args + ["--resume_from_checkpoint", checkpoint]
    db.execute("INSERT INTO jobs ...")

这能工作是因为 HuggingFace Trainer、PyTorch Lightning 等主流训练框架都原生支持 --resume_from_checkpoint。训练会从中断处继续 —— optimizer 状态、学习率 schedule、step 计数全部恢复。

$ gpuq preempt 1
Sending SIGTERM to PID 158500...
Job #1 preempted.
  Latest checkpoint: outputs/checkpoint-150
  Use `gpuq resume 1` to continue later.

$ gpuq add urgent_eval.py --name "quick-eval"
$ gpuq run  # 跑 eval

$ gpuq resume 1
[+] Job #3: exp02-training (resumed)
    Resuming from: outputs/checkpoint-150

崩溃恢复

gpuq status 自动检测死进程:

running = db.execute("SELECT id, pid FROM jobs WHERE status='running'")
for r in running:
    if not _pid_alive(r["pid"]):
        db.execute("UPDATE jobs SET status='interrupted' ...")

重启后:

$ gpuq status
  1  ⚡  exp02-training   run_train.py   2h15m  interrupted
  2  ⏳  exp10-eval       run_eval.py           after #1

$ gpuq recover --all    # 或 --jobs 1
Re-queued 1 job(s). Run `gpuq run` to execute.

AI Skill 集成

gpuq 自带一个 skill/gpuq.md 文件,可以教 AI 编程助手(Claude Code、Cursor 等)用自然语言管理队列:

/gpuq 看看队列
/gpuq 跑一下 exp05 的 train.py
/gpuq 还要多久
/gpuq 先暂停当前任务,跑个紧急 eval

skill 文件把意图映射到命令 —— 用户不需要记 CLI 语法。

回头看,哪些可以改进

PID 追踪只支持 Linux。 /proc 文件系统在 macOS 上不存在。可移植版本应该用 psutil,但我刻意避免了外部依赖。

Checkpoint 检测是启发式的。 对 HuggingFace Trainer 的 checkpoint-{step} 模式和 TRL 的 completions_{step}.parquet 有效,但自定义的 torch.save("model_epoch_5.pt") 检测不到。更健壮的方案是回调机制(训练脚本主动报告进度给 gpuq),但那就不够零配置了。

不支持多 GPU。 这是刻意的 —— 我的场景是单张笔记本 GPU。多 GPU 请用 Slurm。

技术栈

  • 语言: Python 3(纯标准库,无 pip 依赖)
  • 存储: SQLite WAL 模式
  • 进程信息: /proc 文件系统(Linux)
  • 代码量: 单文件 ~700 行
  • 仓库: github.com/geyuxu/gpuq
git clone [email protected]:geyuxu/gpuq.git
cd gpuq && chmod +x gpuq.py
ln -sf $(pwd)/gpuq.py ~/.local/bin/gpuq

就这样。不需要 pip install,不需要 Docker,不需要配置文件。一个 Python 脚本搞定 GPU 队列管理。