Skip to content

fix: stop funneling all jobs to one worker (JobsProgress shared state)#517

Open
deanq wants to merge 7 commits into
mainfrom
deanquinanola/sls-314-runpod-python-jobsprogress-shared-file-state-funnels-all
Open

fix: stop funneling all jobs to one worker (JobsProgress shared state)#517
deanq wants to merge 7 commits into
mainfrom
deanquinanola/sls-314-runpod-python-jobsprogress-shared-file-state-funnels-all

Conversation

@deanq

@deanq deanq commented Jun 23, 2026

Copy link
Copy Markdown
Member

Problem

Fixes #432. Since SDK 1.7.11 (still on main, v1.9.1+), serverless endpoints with multiple workers funnel nearly all jobs onto a single worker while the rest sit idle. Only 1.7.10 is unaffected. The reporter runs ComfyUI workers with a network volume, and the bug correlates with network storage.

Root cause

Between 1.7.10 and 1.7.11, JobsProgress (the per-worker in-flight job tracker) was changed from an in-memory set into cross-process shared state so the separated health-ping process could read the in-progress job list. The ping was moved to its own multiprocessing.Process for a real reason: a CPU-bound handler holds the GIL, the ping can't recur, the server's liveness check times out, and the worker is killed. The ping process must stay.

The bridge built to feed it regressed:

  • 1.7.11 used multiprocessing.Manager() re-created inside the singleton per process — broken under the spawn start method (also caused fix: multiprocess broke local development #430).
  • 1.7.12 → current persists state to a pickle file at os.getcwd()/.runpod_jobs.pkl, and get_job_list() reloads that file into the in-memory set on every job-take.

The scaler decides acquisition from the in-progress count: jobs_needed = current_concurrency - current_occupancy() (concurrency defaults to 1). With a network volume, os.getcwd() is the shared mount, so every worker reads/writes the same .runpod_jobs.pkl. Each worker's count is overwritten with the global in-progress set → occupancy >> 1jobs_needed <= 0 → workers refuse to pull. Whichever worker pulls while the file is briefly empty wins all the work; the rest starve. This explains every observation: correlates with network storage, invisible on single-worker local repro, manifests only in live serverless, and 1.7.10 is the only safe version.

Fix

Split the tangled responsibility:

  1. Revert JobsProgress to a plain in-memory set (its 1.7.10 form) — the main-process authority for occupancy and the job_in_progress URL param. No disk I/O, no Manager, no filelock.
  2. Add PingJobMirror (multiprocessing.Array + lock) as the only cross-process channel: the main process writes the in-progress job-id snapshot on every acquire/finish; the ping process reads it. Created once in the main process and passed explicitly through Process(args=...) — correct under both fork and spawn, and structurally impossible to share across workers or land on a network volume.

The exact ping payload (live comma-separated job_id list) and the separate ping process are preserved. The now-unused filelock dependency is dropped. All mirror operations are best-effort and never raise into job processing or kill the ping loop.

Test plan

  • JobsProgress performs zero filesystem writes; two instances sharing a cwd (two workers) do not observe each other's jobs (regression guards for Runpod SDK versions > 1.7.10 send all requests to the same worker even with 10 active workers, and 50 max workers set #432).
  • PingJobMirror round-trips across a real process under both fork and spawn; oversized snapshots truncate at a comma boundary without raising.
  • Ping sources job_id from the injected mirror; no mirror => job_id=None.
  • JobScaler syncs the mirror on acquire/finish; at concurrency 1 with one in-flight job, current_occupancy() == 1 so jobs_needed == 0 (no over-pull).
  • run_worker creates one mirror and passes the same instance to both ping and scaler.
  • Full suite: 487 passed, coverage 94% (make quality-check green).

Notes

This is a behavioral fix to job distribution and warrants a proper minor release, not a patch.

deanq added 5 commits June 22, 2026 21:54
JobsProgress persisted in-progress jobs to .runpod_jobs.pkl under
os.getcwd(); on endpoints with a network volume every worker shared one
file, so occupancy accounting cross-contaminated and jobs funneled onto
a single worker (#432). Restore the 1.7.10 in-memory set and feed the
separate ping process via a per-worker shared-memory mirror instead.

Refs SLS-314, fixes #432
The ping process no longer touches JobsProgress; it reads the job-id
snapshot from the per-worker PingJobMirror passed in at process start.

Refs SLS-314
JobScaler updates the per-worker PingJobMirror after each job is
acquired or finished, so the separate ping process always sees the
current in-progress job ids without shared-file state.

Refs SLS-314
run_worker constructs a single mirror in the main process and passes it
to both the ping process and the JobScaler, completing the #432 fix.

Refs SLS-314
The .runpod_jobs.pkl state file no longer exists; remove its cleanup
from the local_sim Makefile.

Refs SLS-314

@capy-ai capy-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 1 comment

Comment thread runpod/serverless/modules/rp_ping.py

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a multi-worker starvation/regression by removing cross-worker shared job state (previously persisted via a .runpod_jobs.pkl file) and replacing it with a strictly per-worker in-memory JobsProgress, plus a dedicated cross-process PingJobMirror used only to feed the heartbeat ping process.

Changes:

  • Revert JobsProgress to an in-memory per-process tracker; remove file-based persistence and drop the filelock dependency.
  • Introduce PingJobMirror (shared-memory buffer) and wire it through run_worker → JobScaler and run_worker → Heartbeat.
  • Add/adjust unit tests to cover non-sharing across workers and mirror round-trips across processes (fork/spawn).

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
runpod/serverless/modules/worker_state.py Removes persisted shared state from JobsProgress; adds PingJobMirror shared-memory snapshot mechanism.
runpod/serverless/modules/rp_ping.py Heartbeat reads job ids from injected mirror instead of JobsProgress; ping loop runs in separate process with mirror passed via args.
runpod/serverless/modules/rp_scale.py JobScaler accepts a mirror and syncs it on acquire/finish.
runpod/serverless/worker.py Creates a per-worker mirror and passes it to both ping and scaler.
requirements.txt Removes filelock dependency.
tests/test_serverless/test_scale_mirror.py Adds tests ensuring scaler syncs the mirror and run_worker shares one mirror instance.
tests/test_serverless/test_modules/test_ping.py Updates ping tests to use mirror injection instead of mocking JobsProgress.
tests/test_serverless/test_modules/test_state.py Updates state tests to assert job state is not persisted/shared across instances.
tests/test_serverless/test_integration_worker_state.py Reworks tests into unit coverage for in-memory JobsProgress + multiprocess mirror behavior.
tests/test_serverless/local_sim/Makefile Removes cleanup of .runpod_jobs.pkl since it should no longer exist.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread runpod/serverless/modules/rp_ping.py
Comment thread runpod/serverless/modules/rp_scale.py
PR #517 review (capy-ai): the API/realtime path (rp_fastapi WorkerAPI)
started the ping without a mirror while tracking jobs in JobsProgress,
so heartbeats sent job_id=None there. Move mirror propagation into
JobsProgress.add/remove/clear via an attached mirror, so every writer
path (JobScaler and rp_fastapi) stays in sync from a single place.
Attach the mirror in run_worker and WorkerAPI; drop JobScaler's
now-redundant job_mirror plumbing.

Refs SLS-314
# One per-worker mirror so the separate ping process reports the
# in-progress job ids tracked here. Attaching to job_list means every
# add/remove syncs it automatically.
mirror = PingJobMirror()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i might be thinking about this wrong but if this happens at the api framework startup level are we passing access to the same ping mirror and its respective shared memory Array to each child worker process? so they technically would share memory?

@capy-ai

capy-ai Bot commented Jun 23, 2026

Copy link
Copy Markdown

Capy auto-review is paused for this organization because the monthly auto-review limit has been reached. Increase the limit or turn it off in billing settings to resume automatic reviews.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Runpod SDK versions > 1.7.10 send all requests to the same worker even with 10 active workers, and 50 max workers set

4 participants