gpuq:为独立 ML 研究者打造的轻量 GPU 任务队列
一张 GPU,五个实验要跑。你会不断地 nvidia-smi 检查、手动 kill 进程、忘记上次跑到哪个 checkpoint。我写了 gpuq 来解决这个问题 —— 一个单文件 Python 工具,把 GPU 实验排成队列顺序执行。
最初只是 50 行的 subprocess 封装,后来逐渐长出了进程接管、ETA 估算、checkpoint 抢占恢复、崩溃恢复等功能。每个功能都是被真实痛点逼出来的。
问题背景
我的硬件:RTX 5080 笔记本,16GB 显存。硕士论文有 5+ 个实验:
- 各自独立的
.venv(不同版本的 PyTorch/trl) - 单次运行 30 分钟到 3 小时以上
- 两个同时跑必 OOM
- checkpoint 存在不同目录
我需要的东西介于"手动跑实验"和"搭 Kubernetes + Slurm"之间 —— 单个文件、零依赖、马上能用。
v1:队列 + 执行(30 分钟搞定)
核心循环简单到令人不好意思:
while True:
pending = db.execute(
"SELECT * FROM jobs WHERE status = 'pending' ORDER BY id"
).fetchall()
if not pending:
break
job = find_next_runnable(pending) # 检查依赖
run_job(job) # subprocess.Popen, wait, 更新状态
通过 CLI 添加任务:
# 使用当前目录,自动查找 .venv/bin/python
gpuq add train.py --lr 1e-4 --epochs 10
# 链式依赖
gpuq add eval.py --after 1
# 后台运行队列
gpuq run --daemon
--daemon 会 fork 到后台。自动从工作目录向上查找 .venv/bin/python。
设计决策:SQLite 而非 JSON。 最初用 JSON 文件存状态,直到机器在写入过程中崩溃,JSON 损坏了。SQLite 的 WAL 模式天然崩溃安全,支持并发读取(daemon 运行时查看状态),而且在 Python 里零依赖。
db.execute("PRAGMA journal_mode=WAL")
迁移是自动的 —— 首次运行检测到 queue.json 就自动导入:
if not DB_FILE.exists():
_migrate_json() # 一次性操作,原文件改名为 .json.bak
接管运行中的进程
第一个"早该有这个"的时刻:一个训练跑了 2 小时,是在 gpuq 之外启动的。我想把它纳入队列管理,这样就能在它后面挂一个 eval 任务。
gpuq adopt 从 /proc 读取所有信息:
def _read_proc_info(pid):
proc = Path(f"/proc/{pid}")
info = {}
# 命令行:/proc/pid/cmdline(null 分隔)
info["cmdline"] = (proc / "cmdline").read_bytes()
.decode().replace("\x00", " ")
# 工作目录:/proc/pid/cwd(符号链接)
info["cwd"] = str((proc / "cwd").resolve())
# 真实启动时间:/proc/pid/stat 第 22 个字段
stat = (proc / "stat").read_text().split(")")[-1].split()
starttime_ticks = int(stat[19])
clk_tck = os.sysconf("SC_CLK_TCK")
boot_time = time.time() - float(open("/proc/uptime").read().split()[0])
info["started_at"] = datetime.fromtimestamp(
boot_time + starttime_ticks / clk_tck
)
return info
真实启动时间很关键 —— 没有它,ETA 估算会以为任务刚开始。/proc/pid/stat 的第 22 个字段是进程自系统启动以来的 clock ticks,结合 /proc/uptime 就能算出实际的 wall-clock 启动时间。
使用方式:
$ gpuq adopt 158500 --name "exp02-training"
[+] Adopted PID 158500 as Job #1: exp02-training
cmd: python -u run_mixed_train.py --max_steps 300 ...
cwd: /home/yuxu/repo/msc-fast-exp/exp02-grpo-retrieval-agent
现在 gpuq status 能追踪它了,也能用 --after 1 在后面挂任务。
ETA 估算
两种策略,按优先级尝试:
1. 日志解析 —— 扫描任务日志的最后 200 行,匹配进度模式:
_PROGRESS_PATTERNS = [
re.compile(r'[Ss]tep\s+(\d+)\s*/\s*(\d+)'), # Step 50/300
re.compile(r'\[(\d+)/(\d+)\]'), # [50/300]
re.compile(r'[Ee]poch\s+(\d+)\s*/\s*(\d+)'), # Epoch 2/10
re.compile(r'(\d+(?:\.\d+)?)\s*%'), # 50.0%
]
2. Checkpoint 扫描 —— 对于没有日志的接管进程,从命令行参数中解析 --max_steps 和 --output_dir,然后扫描 checkpoint 目录:
# HuggingFace Trainer 保存 checkpoint-{step}/
for d in scan_dir.glob("checkpoint-*"):
step = int(d.name.split("-")[-1])
current_step = max(current_step, step)
# TRL 保存 completions_{step:05d}.parquet
for f in scan_dir.glob("completions/completions_*.parquet"):
step = int(f.stem.split("_")[-1])
current_step = max(current_step, step)
ETA = 已用时间 × (总步数 - 当前步数) / 当前步数。简单,但需要正确的扫描目录 —— 我踩过坑:rglob 扫描整个项目目录会找到旧 run 的 checkpoint。现在改为只看 --output_dir。
效果直接显示在 status 里:
ID St Name Script Time Info
──────────────────────────────────────────────────────────────
1 🔄 exp02-training run_mixed_train.py 1h36m pid=158500 53% ETA 1h24m
2 ⏳ exp10-eval run_transfer_eval.py after #1
抢占与恢复
最复杂的功能。场景:训练跑了 60%,但你急需 GPU 做一个快速评估。不想浪费已有的 60% 进度。
Preempt 发送 SIGTERM(HuggingFace Trainer 会优雅处理 —— 保存 checkpoint 后退出),等待保存完成,然后标记任务:
def cmd_preempt(args):
os.kill(pid, signal.SIGTERM)
for i in range(timeout):
if not _pid_alive(pid):
break
time.sleep(1)
# 找到最新 checkpoint
checkpoint = _find_latest_checkpoint(job)
db.execute("UPDATE jobs SET status='preempted' ...")
Resume 创建新任务,注入 --resume_from_checkpoint 参数:
def cmd_resume(args):
checkpoint = _find_latest_checkpoint(job)
new_args = original_args + ["--resume_from_checkpoint", checkpoint]
db.execute("INSERT INTO jobs ...")
这能工作是因为 HuggingFace Trainer、PyTorch Lightning 等主流训练框架都原生支持 --resume_from_checkpoint。训练会从中断处继续 —— optimizer 状态、学习率 schedule、step 计数全部恢复。
$ gpuq preempt 1
Sending SIGTERM to PID 158500...
Job #1 preempted.
Latest checkpoint: outputs/checkpoint-150
Use `gpuq resume 1` to continue later.
$ gpuq add urgent_eval.py --name "quick-eval"
$ gpuq run # 跑 eval
$ gpuq resume 1
[+] Job #3: exp02-training (resumed)
Resuming from: outputs/checkpoint-150
崩溃恢复
gpuq status 自动检测死进程:
running = db.execute("SELECT id, pid FROM jobs WHERE status='running'")
for r in running:
if not _pid_alive(r["pid"]):
db.execute("UPDATE jobs SET status='interrupted' ...")
重启后:
$ gpuq status
1 ⚡ exp02-training run_train.py 2h15m interrupted
2 ⏳ exp10-eval run_eval.py after #1
$ gpuq recover --all # 或 --jobs 1
Re-queued 1 job(s). Run `gpuq run` to execute.
AI Skill 集成
gpuq 自带一个 skill/gpuq.md 文件,可以教 AI 编程助手(Claude Code、Cursor 等)用自然语言管理队列:
/gpuq 看看队列
/gpuq 跑一下 exp05 的 train.py
/gpuq 还要多久
/gpuq 先暂停当前任务,跑个紧急 eval
skill 文件把意图映射到命令 —— 用户不需要记 CLI 语法。
回头看,哪些可以改进
PID 追踪只支持 Linux。 /proc 文件系统在 macOS 上不存在。可移植版本应该用 psutil,但我刻意避免了外部依赖。
Checkpoint 检测是启发式的。 对 HuggingFace Trainer 的 checkpoint-{step} 模式和 TRL 的 completions_{step}.parquet 有效,但自定义的 torch.save("model_epoch_5.pt") 检测不到。更健壮的方案是回调机制(训练脚本主动报告进度给 gpuq),但那就不够零配置了。
不支持多 GPU。 这是刻意的 —— 我的场景是单张笔记本 GPU。多 GPU 请用 Slurm。
技术栈
- 语言: Python 3(纯标准库,无 pip 依赖)
- 存储: SQLite WAL 模式
- 进程信息:
/proc文件系统(Linux) - 代码量: 单文件 ~700 行
- 仓库: github.com/geyuxu/gpuq
git clone [email protected]:geyuxu/gpuq.git
cd gpuq && chmod +x gpuq.py
ln -sf $(pwd)/gpuq.py ~/.local/bin/gpuq
就这样。不需要 pip install,不需要 Docker,不需要配置文件。一个 Python 脚本搞定 GPU 队列管理。