gpuq: A Lightweight GPU Job Queue Built for Solo ML Research
When you have one GPU and five experiments to run, you inevitably end up with a terminal full of nvidia-smi checks, manually killing processes, and forgetting which checkpoint you left off at. I built gpuq to solve this — a single-file Python tool that queues GPU experiments and runs them sequentially.
What started as a 50-line wrapper around subprocess grew into something genuinely useful: process adoption, ETA estimation, checkpoint-based preemption, and crash recovery. Here's the story of how each feature earned its way in.
The Problem
My setup: an RTX 5080 laptop with 16GB VRAM, running 5+ experiments for my MSc thesis. Each experiment:
- Has its own
.venv(different PyTorch/trl versions) - Takes 30min to 3+ hours
- Will OOM if two run simultaneously
- Saves checkpoints to different directories
I needed something between "run experiments manually" and "set up Kubernetes + Slurm." Something that's a single file, zero dependencies, and works right now.
v1: Queue + Run (30 minutes to build)
The core loop is almost embarrassingly simple:
while True:
pending = db.execute(
"SELECT * FROM jobs WHERE status = 'pending' ORDER BY id"
).fetchall()
if not pending:
break
job = find_next_runnable(pending) # check dependencies
run_job(job) # subprocess.Popen, wait, update status
Jobs are added via CLI:
# Uses current directory, auto-finds .venv/bin/python
gpuq add train.py --lr 1e-4 --epochs 10
# Chain dependencies
gpuq add eval.py --after 1
# Run the queue
gpuq run --daemon
The --daemon flag forks to background. Auto-discovers .venv/bin/python by walking up from the working directory.
Design decision: SQLite, not JSON. I started with a JSON file for state. It worked fine until my machine crashed mid-write and the JSON was corrupted. SQLite with WAL mode is crash-safe, handles concurrent reads (status checks while daemon runs), and is still zero-dependency in Python.
db.execute("PRAGMA journal_mode=WAL")
The migration was automatic — first run detects queue.json and imports it:
if not DB_FILE.exists():
_migrate_json() # one-time, renames to .json.bak
Adopting Running Processes
The first "I wish I had this" moment: I had a training run going for 2 hours, started outside gpuq. I wanted it in the queue so I could chain an eval job after it.
gpuq adopt reads everything from /proc:
def _read_proc_info(pid):
proc = Path(f"/proc/{pid}")
info = {}
# Command line: /proc/pid/cmdline (null-separated)
info["cmdline"] = (proc / "cmdline").read_bytes()
.decode().replace("\x00", " ")
# Working directory: /proc/pid/cwd (symlink)
info["cwd"] = str((proc / "cwd").resolve())
# Real start time: /proc/pid/stat field 22
stat = (proc / "stat").read_text().split(")")[-1].split()
starttime_ticks = int(stat[19])
clk_tck = os.sysconf("SC_CLK_TCK")
boot_time = time.time() - float(open("/proc/uptime").read().split()[0])
info["started_at"] = datetime.fromtimestamp(
boot_time + starttime_ticks / clk_tck
)
return info
The real start time matters — without it, ETA estimation thinks the job just started. Field 22 of /proc/pid/stat is the start time in clock ticks since boot. Combined with /proc/uptime, you get the actual wall-clock start time.
Usage:
$ gpuq adopt 158500 --name "exp02-training"
[+] Adopted PID 158500 as Job #1: exp02-training
cmd: python -u run_mixed_train.py --max_steps 300 ...
cwd: /home/yuxu/repo/msc-fast-exp/exp02-grpo-retrieval-agent
Now gpuq status tracks it, and I can chain --after 1 jobs.
ETA Estimation
Two strategies, tried in order:
1. Log parsing — scan the last 200 lines of the job's log for progress patterns:
_PROGRESS_PATTERNS = [
re.compile(r'[Ss]tep\s+(\d+)\s*/\s*(\d+)'), # Step 50/300
re.compile(r'\[(\d+)/(\d+)\]'), # [50/300]
re.compile(r'[Ee]poch\s+(\d+)\s*/\s*(\d+)'), # Epoch 2/10
re.compile(r'(\d+(?:\.\d+)?)\s*%'), # 50.0%
]
2. Checkpoint scanning — for adopted processes without logs, parse --max_steps and --output_dir from the command args, then scan for checkpoint directories:
# HuggingFace Trainer saves checkpoint-{step}/
for d in scan_dir.glob("checkpoint-*"):
step = int(d.name.split("-")[-1])
current_step = max(current_step, step)
# TRL saves completions_{step:05d}.parquet
for f in scan_dir.glob("completions/completions_*.parquet"):
step = int(f.stem.split("_")[-1])
current_step = max(current_step, step)
ETA = elapsed × (total - current) / current. Simple, but needs the right scan_dir — I learned the hard way that rglob across the entire project directory picks up stale checkpoints from old runs. Now it respects --output_dir.
The result shows inline in status:
ID St Name Script Time Info
──────────────────────────────────────────────────────────────
1 🔄 exp02-training run_mixed_train.py 1h36m pid=158500 53% ETA 1h24m
2 ⏳ exp10-eval run_transfer_eval.py after #1
Preempt and Resume
The most complex feature. Scenario: training is 60% done, but you need the GPU for a quick evaluation. You don't want to lose 60% progress.
Preempt sends SIGTERM (which HuggingFace Trainer handles gracefully — it saves a checkpoint before exiting), waits for the save, then marks the job:
def cmd_preempt(args):
os.kill(pid, signal.SIGTERM)
for i in range(timeout):
if not _pid_alive(pid):
break
time.sleep(1)
# Find latest checkpoint
checkpoint = _find_latest_checkpoint(job)
db.execute("UPDATE jobs SET status='preempted' ...")
Resume creates a new job with --resume_from_checkpoint injected into the args:
def cmd_resume(args):
checkpoint = _find_latest_checkpoint(job)
new_args = original_args + ["--resume_from_checkpoint", checkpoint]
# Create new job entry
db.execute("INSERT INTO jobs ...")
This works because HuggingFace Trainer, PyTorch Lightning, and most training frameworks support --resume_from_checkpoint natively. The training continues from where it left off — optimizer state, learning rate schedule, step count, everything.
$ gpuq preempt 1
Sending SIGTERM to PID 158500...
Waiting up to 60s for graceful shutdown (checkpoint save)...
Job #1 preempted.
Latest checkpoint: outputs/checkpoint-150
Use `gpuq resume 1` to continue later.
$ gpuq add urgent_eval.py --name "quick-eval"
$ gpuq run # runs the eval
$ gpuq resume 1
[+] Job #3: exp02-training (resumed)
Resuming from: outputs/checkpoint-150
Crash Recovery
gpuq status auto-detects stale processes:
running = db.execute("SELECT id, pid FROM jobs WHERE status='running'")
for r in running:
if not _pid_alive(r["pid"]):
db.execute("UPDATE jobs SET status='interrupted' ...")
After a reboot:
$ gpuq status
1 ⚡ exp02-training run_train.py 2h15m interrupted
2 ⏳ exp10-eval run_eval.py after #1
$ gpuq recover --all # or --jobs 1
Re-queued 1 job(s). Run `gpuq run` to execute.
AI Skill Integration
gpuq ships with a skill/gpuq.md file that teaches AI coding assistants (Claude Code, Cursor, etc.) to manage the queue via natural language:
/gpuq check the queue
/gpuq run train.py in exp05
/gpuq how long until it finishes
/gpuq pause current job, run this eval first
The skill file maps intents to commands — the user doesn't need to remember CLI syntax.
What I'd Do Differently
PID tracking is Linux-only. The /proc filesystem doesn't exist on macOS. A portable version would use psutil, but I deliberately avoided external dependencies.
Checkpoint detection is heuristic. It works for HuggingFace Trainer's checkpoint-{step} pattern and TRL's completions_{step}.parquet, but a custom training loop with torch.save("model_epoch_5.pt") won't be detected. A callback mechanism (the training script signals progress to gpuq) would be more robust but less zero-config.
No multi-GPU support. This is intentional — my use case is a single laptop GPU. For multi-GPU, use Slurm.
The Stack
- Language: Python 3 (stdlib only, no pip dependencies)
- Storage: SQLite with WAL mode
- Process info:
/procfilesystem (Linux) - Lines of code: ~700 in one file
- Repository: github.com/geyuxu/gpuq
git clone [email protected]:geyuxu/gpuq.git
cd gpuq && chmod +x gpuq.py
ln -sf $(pwd)/gpuq.py ~/.local/bin/gpuq
That's it. No pip install, no Docker, no config files. Just a Python script that manages your GPU queue.