fix: stop funneling all jobs to one worker (JobsProgress shared state)#517
Conversation
JobsProgress persisted in-progress jobs to .runpod_jobs.pkl under os.getcwd(); on endpoints with a network volume every worker shared one file, so occupancy accounting cross-contaminated and jobs funneled onto a single worker (#432). Restore the 1.7.10 in-memory set and feed the separate ping process via a per-worker shared-memory mirror instead. Refs SLS-314, fixes #432
The ping process no longer touches JobsProgress; it reads the job-id snapshot from the per-worker PingJobMirror passed in at process start. Refs SLS-314
JobScaler updates the per-worker PingJobMirror after each job is acquired or finished, so the separate ping process always sees the current in-progress job ids without shared-file state. Refs SLS-314
run_worker constructs a single mirror in the main process and passes it to both the ping process and the JobScaler, completing the #432 fix. Refs SLS-314
The .runpod_jobs.pkl state file no longer exists; remove its cleanup from the local_sim Makefile. Refs SLS-314
There was a problem hiding this comment.
Pull request overview
This PR fixes a multi-worker starvation/regression by removing cross-worker shared job state (previously persisted via a .runpod_jobs.pkl file) and replacing it with a strictly per-worker in-memory JobsProgress, plus a dedicated cross-process PingJobMirror used only to feed the heartbeat ping process.
Changes:
- Revert
JobsProgressto an in-memory per-process tracker; remove file-based persistence and drop thefilelockdependency. - Introduce
PingJobMirror(shared-memory buffer) and wire it throughrun_worker → JobScalerandrun_worker → Heartbeat. - Add/adjust unit tests to cover non-sharing across workers and mirror round-trips across processes (
fork/spawn).
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
runpod/serverless/modules/worker_state.py |
Removes persisted shared state from JobsProgress; adds PingJobMirror shared-memory snapshot mechanism. |
runpod/serverless/modules/rp_ping.py |
Heartbeat reads job ids from injected mirror instead of JobsProgress; ping loop runs in separate process with mirror passed via args. |
runpod/serverless/modules/rp_scale.py |
JobScaler accepts a mirror and syncs it on acquire/finish. |
runpod/serverless/worker.py |
Creates a per-worker mirror and passes it to both ping and scaler. |
requirements.txt |
Removes filelock dependency. |
tests/test_serverless/test_scale_mirror.py |
Adds tests ensuring scaler syncs the mirror and run_worker shares one mirror instance. |
tests/test_serverless/test_modules/test_ping.py |
Updates ping tests to use mirror injection instead of mocking JobsProgress. |
tests/test_serverless/test_modules/test_state.py |
Updates state tests to assert job state is not persisted/shared across instances. |
tests/test_serverless/test_integration_worker_state.py |
Reworks tests into unit coverage for in-memory JobsProgress + multiprocess mirror behavior. |
tests/test_serverless/local_sim/Makefile |
Removes cleanup of .runpod_jobs.pkl since it should no longer exist. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
PR #517 review (capy-ai): the API/realtime path (rp_fastapi WorkerAPI) started the ping without a mirror while tracking jobs in JobsProgress, so heartbeats sent job_id=None there. Move mirror propagation into JobsProgress.add/remove/clear via an attached mirror, so every writer path (JobScaler and rp_fastapi) stays in sync from a single place. Attach the mirror in run_worker and WorkerAPI; drop JobScaler's now-redundant job_mirror plumbing. Refs SLS-314
| # One per-worker mirror so the separate ping process reports the | ||
| # in-progress job ids tracked here. Attaching to job_list means every | ||
| # add/remove syncs it automatically. | ||
| mirror = PingJobMirror() |
There was a problem hiding this comment.
i might be thinking about this wrong but if this happens at the api framework startup level are we passing access to the same ping mirror and its respective shared memory Array to each child worker process? so they technically would share memory?
…ress-shared-file-state-funnels-all
|
Capy auto-review is paused for this organization because the monthly auto-review limit has been reached. Increase the limit or turn it off in billing settings to resume automatic reviews. |
Problem
Fixes #432. Since SDK 1.7.11 (still on
main, v1.9.1+), serverless endpoints with multiple workers funnel nearly all jobs onto a single worker while the rest sit idle. Only 1.7.10 is unaffected. The reporter runs ComfyUI workers with a network volume, and the bug correlates with network storage.Root cause
Between 1.7.10 and 1.7.11,
JobsProgress(the per-worker in-flight job tracker) was changed from an in-memorysetinto cross-process shared state so the separated health-ping process could read the in-progress job list. The ping was moved to its ownmultiprocessing.Processfor a real reason: a CPU-bound handler holds the GIL, the ping can't recur, the server's liveness check times out, and the worker is killed. The ping process must stay.The bridge built to feed it regressed:
multiprocessing.Manager()re-created inside the singleton per process — broken under thespawnstart method (also caused fix: multiprocess broke local development #430).os.getcwd()/.runpod_jobs.pkl, andget_job_list()reloads that file into the in-memory set on every job-take.The scaler decides acquisition from the in-progress count:
jobs_needed = current_concurrency - current_occupancy()(concurrency defaults to 1). With a network volume,os.getcwd()is the shared mount, so every worker reads/writes the same.runpod_jobs.pkl. Each worker's count is overwritten with the global in-progress set →occupancy >> 1→jobs_needed <= 0→ workers refuse to pull. Whichever worker pulls while the file is briefly empty wins all the work; the rest starve. This explains every observation: correlates with network storage, invisible on single-worker local repro, manifests only in live serverless, and 1.7.10 is the only safe version.Fix
Split the tangled responsibility:
JobsProgressto a plain in-memoryset(its 1.7.10 form) — the main-process authority for occupancy and thejob_in_progressURL param. No disk I/O, noManager, nofilelock.PingJobMirror(multiprocessing.Array+ lock) as the only cross-process channel: the main process writes the in-progress job-id snapshot on every acquire/finish; the ping process reads it. Created once in the main process and passed explicitly throughProcess(args=...)— correct under bothforkandspawn, and structurally impossible to share across workers or land on a network volume.The exact ping payload (live comma-separated
job_idlist) and the separate ping process are preserved. The now-unusedfilelockdependency is dropped. All mirror operations are best-effort and never raise into job processing or kill the ping loop.Test plan
JobsProgressperforms zero filesystem writes; two instances sharing a cwd (two workers) do not observe each other's jobs (regression guards for Runpod SDK versions > 1.7.10 send all requests to the same worker even with 10 active workers, and 50 max workers set #432).PingJobMirrorround-trips across a real process under bothforkandspawn; oversized snapshots truncate at a comma boundary without raising.job_idfrom the injected mirror; no mirror =>job_id=None.JobScalersyncs the mirror on acquire/finish; at concurrency 1 with one in-flight job,current_occupancy() == 1sojobs_needed == 0(no over-pull).run_workercreates one mirror and passes the same instance to both ping and scaler.make quality-checkgreen).Notes
This is a behavioral fix to job distribution and warrants a proper minor release, not a patch.