Skip to content

feat(schedules): agent run schedules (v1)#335

Open
jromualdez-scale wants to merge 11 commits into
mainfrom
jerome/scheduled-agents
Open

feat(schedules): agent run schedules (v1)#335
jromualdez-scale wants to merge 11 commits into
mainfrom
jerome/scheduled-agents

Conversation

@jromualdez-scale

@jromualdez-scale jromualdez-scale commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds per-agent run schedules: recurring schedules that fire a task and deliver a configured initial input on a cron/interval cadence. Replaces the prior schedules implementation (a bare-workflow scheduler) on the same API path.

Each schedule is a Postgres row (the source of truth) plus a Temporal Schedule that acts purely as the recurring clock (it carries only the row id). On each fire, a thin, deterministic workflow runs a single activity that creates a task and delivers the initial input via the same path as a manual run — message/send for sync agents, event/send for agentic agents — attributed to the schedule's stored creator principal.

Feature flag

The API is gated behind ENABLE_AGENT_RUN_SCHEDULES (matches the existing ENABLE_HEALTH_CHECK_WORKFLOW pattern), disabled by default in every environment — when off, the routes are not registered at all. Enable per-environment when ready to test (e.g. locally ENABLE_AGENT_RUN_SCHEDULES=true ./dev.sh). The OpenAPI spec/SDK document the endpoints regardless of the runtime default.

Removed / breaking changes

This PR deletes the previous schedules feature (routes, schemas, service, use case, and its tests). The old endpoint scheduled a raw Temporal workflow and stored nothing in Postgres; the new one schedules an agent run and is Postgres-backed. Because the API path /agents/{agent_id}/schedules is reused with new semantics, this is breaking for existing consumers of the old endpoint:

  • POST /agents/{agent_id}/schedules — request/response schema changed (schedules an agent run, not a bare workflow)
  • POST …/{name}/unpauserenamed to …/{name}/resume
  • Path param {schedule_name}{name} (cosmetic)
  • Adds the new agent_run_schedules table (the old scheduler was Temporal-only)

(…/{name}/trigger is preserved — see below.)

Endpoints

/agents/{agent_id}/schedules:

  • POST — create
  • GET — list (served from Postgres; no per-row Temporal call)
  • GET /{name} — get (includes live Temporal state: next/last fire, action count)
  • PATCH /{name} — partial update (cadence, window, input, params, paused; cron/interval stay mutually exclusive)
  • POST /{name}/pause · POST /{name}/resume
  • POST /{name}/trigger — immediate out-of-band run
  • DELETE /{name}

Implementation notes

  • ScheduledAgentRunWorkflow (thin/deterministic) + launch_scheduled_agent_run activity (all side effects live in the activity).
  • Deterministic per-fire task name makes task/create idempotent on activity retry; a delivered marker guards against duplicate input delivery.
  • Fire-time authorization re-check under the stored creator principal — a revoked creator stops firing cleanly.
  • Scheduled tasks get a task_metadata.display_name (Scheduled Message: <name> · <fire time>), stamped with the nominal fire time (stable across retries) so they render with a label instead of "Unnamed task".
  • delete/pause/resume/update tolerate a missing Temporal schedule so a partial failure can't strand an un-cleanable row.
  • New agent_run_schedules table migration (new-table create; schema-only, non-blocking).

Testing

  • 30 unit tests (service, activity, use case, env flag) pass, covering create/list/get/update/pause/resume/trigger/delete, idempotency, validation, and flag parsing.
  • Verified end-to-end locally (flag on): both delivery paths (sync message/send and agentic event/send), plus pause/resume/update/trigger/delete reflected consistently in Postgres and Temporal.
  • Verified on a dev cluster (branch image, flag on): create → Temporal schedule → worker fires on schedule → message/send delivered, with the row persisted and the creator principal captured from real auth.

Deployment dependency (authz provider)

Dev verification surfaced this: on a cluster using the SGP authz provider (AUTH_PROVIDER=sgp), the provider must learn the new schedule resource type before this is usable there. Today its /v1/authz/check returns 422 for a schedule resource, so:

  • Create works (it gates on agent.update, and register of the schedule resource is tolerated).
  • Every op gated on the schedule resource — GET /{name}, pause, resume, trigger, PATCH, DELETE — returns 422 until the provider handles check/grant/revoke/register/deregister/search for schedule (mirroring agent/task/api_key).

This is provider-side work (the schedule type is already part of the documented auth-provider contract); it should land alongside this feature's rollout. Environments with authz disabled or a permissive provider are unaffected.

🤖 Generated with Claude Code

Greptile Summary

This PR replaces the previous bare-workflow scheduler with a Postgres-backed per-agent run schedule feature: each fire creates a fresh task and delivers a configured initial input through the existing message/send / event/send path, attributed to the stored creator principal. The implementation is well-structured, with proper rollback on create failure, TemporalScheduleNotFoundError tolerance on delete/update/pause, and a no-describe-per-row list path.

  • New agent_run_schedules table (Postgres source of truth) + Temporal Schedule as a pure recurring clock; thin deterministic ScheduledAgentRunWorkflow delegates all side-effects to launch_scheduled_agent_run.
  • Full CRUD surface (POST, GET, GET /{name}, PATCH, pause, resume, trigger, DELETE) gated behind ENABLE_AGENT_RUN_SCHEDULES; breaking change to existing /agents/{agent_id}/schedules consumers.
  • One logic defect: the activity's schedule.paused guard does not consult trigger_type, so POST /{name}/trigger on a paused schedule silently produces a skipped workflow result despite the inline comment explicitly stating manual triggers should bypass the guard.

Confidence Score: 4/5

Safe to merge with feature flag off (default); the trigger endpoint on paused schedules should be fixed before enabling in production.

The activity unconditionally skips paused schedules regardless of trigger_type, so any call to the trigger endpoint on a paused schedule produces a silent no-op — the comment in that very block says manual triggers should bypass this check, but the condition is never consulted. Everything else (rollback on create failure, TemporalScheduleNotFoundError tolerance, idempotent task naming, creator-principal replay) is correctly implemented.

agentex/src/temporal/activities/scheduled_agent_run_activities.py — the paused guard at line 192 needs to skip the early return when trigger_type is "manual".

Important Files Changed

Filename Overview
agentex/src/temporal/activities/scheduled_agent_run_activities.py New Temporal activity for scheduled agent runs; handles task creation, input delivery, idempotency, and fire-time authz — but unconditionally skips paused schedules even on manual trigger, contradicting the inline comment and trigger endpoint semantics.
agentex/src/domain/services/agent_run_schedule_service.py Core service managing Postgres-backed schedules and Temporal clocks; handles create/list/get/update/pause/resume/delete/trigger with correct rollback, TemporalScheduleNotFoundError tolerance, and list-path optimisation (no per-row describe RPC).
agentex/src/api/routes/agent_run_schedules.py FastAPI router wiring all schedule endpoints; authz via _check_schedule_or_collapse_to_404 and DAuthorizedId, creator principal extracted cleanly without credentials.
agentex/src/temporal/scheduled_agent_run_factory.py Worker-side DI factory that wires AgentsACPUseCase bound to the stored creator principal; uses _ScheduledRunRequest stub with no credentials forwarded.
agentex/src/temporal/workflows/scheduled_agent_run_workflow.py Thin deterministic wrapper workflow; delegates all side-effects to the activity with retry policy and 120s start-to-close timeout.
agentex/database/migrations/alembic/versions/2026_06_22_1200_add_agent_run_schedules_3b1c9d2e4f6a.py Non-breaking CREATE TABLE migration for agent_run_schedules with two supporting indexes; downgrade is complete.
agentex/src/api/schemas/agent_run_schedules.py Pydantic schemas for the new scheduling API; cron/interval mutual-exclusivity validated at the use-case layer rather than here via a model validator.
agentex/src/domain/use_cases/agent_run_schedules_use_case.py Use-case layer; validates cron/interval mutual-exclusivity on create and update, then delegates to the service.
agentex/src/domain/repositories/agent_run_schedule_repository.py Postgres CRUD repository with natural-key lookup (agent_id, name) and newest-first list query.
agentex/src/adapters/temporal/adapter_temporal.py Adds update_schedule to TemporalAdapter and timezone/overlap_policy support to create_schedule; error-handling mirrors existing patterns.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Client
    participant API as FastAPI Routes
    participant SVC as AgentRunScheduleService
    participant PG as Postgres
    participant TMP as Temporal Schedule
    participant WF as ScheduledAgentRunWorkflow
    participant ACT as launch_scheduled_agent_run
    participant ACP as Agent ACP

    Client->>API: "POST /agents/{id}/schedules"
    API->>SVC: create_schedule(agent, request, creator_principal)
    SVC->>PG: INSERT agent_run_schedules row
    SVC->>SVC: register_resource (authz)
    SVC->>TMP: "create_schedule(id=row_id)"
    TMP-->>SVC: ScheduleHandle
    SVC-->>Client: AgentRunScheduleResponse

    Note over TMP,WF: On each cron/interval fire
    TMP->>WF: start ScheduledAgentRunWorkflow(schedule_id)
    WF->>ACT: launch_scheduled_agent_run(schedule_id, fire_id, scheduled)
    ACT->>PG: get schedule row
    ACT->>ACT: authz re-check (creator principal)
    ACT->>ACP: task/create (deterministic name)
    ACT->>ACP: message/send or event/send
    ACT->>PG: mark scheduled_input_delivered
    ACT-->>WF: status launched

    Client->>API: "POST /agents/{id}/schedules/{name}/trigger"
    API->>SVC: trigger_schedule(agent_id, name)
    SVC->>TMP: "start_workflow(manual, args=[row_id, manual])"
    WF->>ACT: launch_scheduled_agent_run(schedule_id, fire_id, manual)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Client
    participant API as FastAPI Routes
    participant SVC as AgentRunScheduleService
    participant PG as Postgres
    participant TMP as Temporal Schedule
    participant WF as ScheduledAgentRunWorkflow
    participant ACT as launch_scheduled_agent_run
    participant ACP as Agent ACP

    Client->>API: "POST /agents/{id}/schedules"
    API->>SVC: create_schedule(agent, request, creator_principal)
    SVC->>PG: INSERT agent_run_schedules row
    SVC->>SVC: register_resource (authz)
    SVC->>TMP: "create_schedule(id=row_id)"
    TMP-->>SVC: ScheduleHandle
    SVC-->>Client: AgentRunScheduleResponse

    Note over TMP,WF: On each cron/interval fire
    TMP->>WF: start ScheduledAgentRunWorkflow(schedule_id)
    WF->>ACT: launch_scheduled_agent_run(schedule_id, fire_id, scheduled)
    ACT->>PG: get schedule row
    ACT->>ACT: authz re-check (creator principal)
    ACT->>ACP: task/create (deterministic name)
    ACT->>ACP: message/send or event/send
    ACT->>PG: mark scheduled_input_delivered
    ACT-->>WF: status launched

    Client->>API: "POST /agents/{id}/schedules/{name}/trigger"
    API->>SVC: trigger_schedule(agent_id, name)
    SVC->>TMP: "start_workflow(manual, args=[row_id, manual])"
    WF->>ACT: launch_scheduled_agent_run(schedule_id, fire_id, manual)
Loading

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
agentex/src/temporal/activities/scheduled_agent_run_activities.py:192-199
**Manual trigger silently skips paused schedules**

The comment at line 193 states "a manual trigger can still fire a paused schedule," indicating the clear design intent is for `trigger_type == "manual"` to bypass this guard — but `trigger_type` is never consulted. Any call to the `POST /{name}/trigger` endpoint on a paused schedule starts a workflow that immediately returns `{"status": "skipped", "reason": "schedule_paused"}`. The API caller receives an HTTP 200 with the schedule response and has no observable indication that the run was silently dropped.

```suggestion
        if schedule.paused and trigger_type != "manual":
            # Temporal pauses the schedule too, but a manual trigger can still
            # fire a paused schedule — honor the stored paused state defensively
            # only for cadence-driven fires, not explicit out-of-band triggers.
            return {
                "status": "skipped",
                "reason": "schedule_paused",
                "schedule_id": schedule_id,
            }
```

Reviews (4): Last reviewed commit: "feat(schedules): persist run fire time m..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

jromualdez-scale and others added 3 commits June 23, 2026 14:13
Replace the prior schedules implementation with per-agent "agent run
schedules": recurring schedules backed by a Temporal Schedule that, on
each fire, creates a task and delivers a configured initial input via
the same path as a manual agent run — message/send for sync agents,
event/send for agentic agents — attributed to the schedule's stored
creator principal.

- REST CRUD under /agents/{agent_id}/schedules: create, get, list,
  pause, resume, delete
- Postgres row is the source of truth for the schedule definition;
  the Temporal Schedule is only the recurring clock and carries just
  the row id
- ScheduledAgentRunWorkflow (thin, deterministic) + the
  launch_scheduled_agent_run activity that does all side effects
- deterministic per-fire task name makes task/create idempotent on
  activity retry; a delivered marker guards against re-delivery
- fire-time authz re-check under the creator principal so a revoked
  creator stops firing cleanly
- new agent_run_schedules table migration

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The UI derives a task's display name from task_metadata.display_name
(falling back to params.description), never the task's `name` field, so
scheduled tasks rendered as "Unnamed task".

Set a templated, per-fire display_name on each scheduled task —
"Scheduled Message: {schedule_name} · {fire_time}" — placed first in the
metadata so a caller-supplied display_name in the schedule's task_metadata
still overrides it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…omments

This repository is public. Strip internal ticket IDs and design-decision
shorthand from code comments and docstrings, keeping the descriptive text.
No behavior change.
@jromualdez-scale jromualdez-scale requested a review from a team as a code owner June 23, 2026 18:16
@github-actions

github-actions Bot commented Jun 23, 2026

Copy link
Copy Markdown

✱ Stainless preview builds

This PR will update the agentex-sdk SDKs with the following commit messages.

openapi

feat(api): remove trigger endpoint, rename schedules to run_schedules, update types

python

feat(api): remove retrieve/delete/pause/trigger/unpause, update create/list in schedules

typescript

feat(api): remove retrieve/delete/pause/trigger/unpause from schedules, update types

Edit this comment to update them. They will appear in their respective SDK's changelogs.

⚠️ agentex-sdk-openapi studio · code · diff

Your SDK build had at least one "error" diagnostic, which is a regression from the base state.
generate ❗ (prev: generate ✅)

New diagnostics (5 error, 8 note)
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`
⚠️ agentex-sdk-python studio · conflict

Your SDK build had at least one new error diagnostic, which is a regression from the base state.

New diagnostics (5 error, 8 note)
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`
⚠️ agentex-sdk-typescript studio · code · diff

Your SDK build had at least one "error" diagnostic, which is a regression from the base state.
generate ❗ (prev: generate ⚠️) → build ✅lint ✅test ✅

npm install https://pkg.stainless.com/s/agentex-sdk-typescript/9ef0715c9ebd5f3a5109f183288f0e20b3b18784/dist.tar.gz
New diagnostics (5 error, 8 note)
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `get /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `delete /agents/{agent_id}/schedules/{schedule_name}`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/pause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/unpause`
Endpoint/NotFound: Skipped endpoint because it's not in your OpenAPI spec: `post /agents/{agent_id}/schedules/{schedule_name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `get /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `patch /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `delete /agents/{agent_id}/schedules/{name}`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/trigger`
💡 Endpoint/NotConfigured: Skipped endpoint because it's not in your Stainless config: `post /agents/{agent_id}/schedules/{name}/pause`

This comment is auto-generated by GitHub Actions and is automatically kept up to date as you push.
If you push custom code to the preview branch, re-run this workflow to update the comment.
Last updated: 2026-06-25 20:20:42 UTC

@jromualdez-scale jromualdez-scale marked this pull request as draft June 23, 2026 18:19
Comment thread agentex/src/domain/services/agent_run_schedule_service.py
Comment thread agentex/src/domain/services/agent_run_schedule_service.py
Comment thread agentex/src/temporal/activities/scheduled_agent_run_activities.py
jromualdez-scale and others added 4 commits June 23, 2026 14:46
…igger

- delete/pause/resume tolerate a missing Temporal schedule (treat as
  success / log) so a prior partial delete can't strand an un-cleanable,
  un-toggleable row.
- list no longer fans out a describe RPC per row; live Temporal fields are
  served only on the single-schedule GET (list state comes from the row).
- scheduled task display_name uses the nominal fire time parsed from the
  workflow id (stable across activity retries) instead of wall-clock now().
- add PATCH /agents/{agent_id}/schedules/{name} (partial update of cadence,
  window, input, etc.; cron/interval stay mutually exclusive).
- re-add POST /agents/{agent_id}/schedules/{name}/trigger for an immediate
  out-of-band run (restores parity with the prior scheduler).
- new Temporal adapter update_schedule; regenerated OpenAPI spec; unit tests
  for all of the above.
…_SCHEDULES)

Gate the run schedules router behind a boolean env flag, matching the
existing ENABLE_HEALTH_CHECK_WORKFLOW pattern. Disabled by default in every
environment, so the API surface is absent unless explicitly enabled.

Local dev reads the flag from the shell (defaults false), so you opt in only
when testing: `ENABLE_AGENT_RUN_SCHEDULES=true ./dev.sh`. Deployed envs set the
env var when they want the feature on.

The OpenAPI generator opts the feature on so the endpoints stay documented in
the spec/SDK regardless of the runtime default; live serving remains gated.
…, harden update ordering

Address review follow-ups on agent run schedules:

- ScheduleInitialInput.type is now Literal["text"] (was a free str with a
  "v1 only" comment), so an unsupported content type is rejected at validation
  instead of silently coerced to text.
- Remove the persisted initial_input_method column/entity field. Delivery
  method is always inferred from the agent's ACP type, so the stored value was
  always null and could only go stale relative to the agent's current type. The
  response still exposes the (now always computed) method.
- update_schedule pushes the merged spec to Temporal BEFORE committing the row,
  closing the common divergence: a rejected cron/timezone or transient Temporal
  error now aborts with nothing persisted. A residual window remains (Temporal
  accepts, then the row write fails) since there is no cross-store transaction;
  the row stays the declared source of truth so a later successful update
  re-converges. create holds the analogous invariant via row rollback; update
  has no in-place rollback, so it orders the writes instead.

Regenerate openapi.yaml and add an update-ordering regression test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@jromualdez-scale jromualdez-scale marked this pull request as ready for review June 24, 2026 19:43
# Best-effort delivered marker for the retry guard above. A crash between
# delivery and this update is the only window where a retry could
# re-deliver; deterministic task naming still prevents duplicate tasks.
task.task_metadata = {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deterministic task name prevents a duplicate task, but not a duplicate delivery right? event/send has no dedupe. For agentic agents that second event re-runs the whole turn (double reply, double LLM/tool cost etc..). We could add a idempotency key on the delivery so a replay is dropped

@jromualdez-scale jromualdez-scale Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. You are right that the “correct” fix here is delivery-level idempotency in the existing Agentex send paths: add an optional idempotency_key to SendEventRequestEntity / SendMessageRequestEntity, persist it with the event/message, and enforce uniqueness so retries return/drop the existing delivery instead of forwarding again.

That would solve this cleanly, but it would also mean updating the existing Agentex event/message persistence paths, adding partial uniqueness indexes, and defining consistent behavior when an idempotency_key is provided. Existing normal client calls could keep the key optional and preserve current behavior, while scheduled-run retries would opt in with a deterministic key. We could also use this to address normal client retry idempotency more broadly if needed, but that feels like a bigger reliability effort than this v1 scheduling PR.

Another lightweight v1 option is to move the scheduled_input_delivered marker before delivery, so Temporal retries skip after the fire is claimed. But that flips the failure mode: we avoid rare double-delivery, at the cost of a rare missed delivery if the worker crashes after claiming and before sending.

Happy to go with either option here or keep it as is. Any thoughts @NiteshDhanpal? @danielmillerp any preference here?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd skip the marker-before option.. it trades the double delivery for a silent missed delivery, which is worse for a scheduled job. The current marker-after order gives us a clear at-least-once delivery model, which is the safer default

Can we just make that explicit in the code comments? i.e. state that delivery is at-least-once by design and the scenario when a double delivery can happen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do so thanks for the input here!

run_schedules_use_case: DAgentRunSchedulesUseCase,
authorization: DAuthorizationService,
) -> DeleteResponse:
await _check_schedule_or_collapse_to_404(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should soft delete the schedule record for audit purposes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this makes sense for audit. I think the shape would be to keep deleting the Temporal schedule so no future fires happen, but soft-delete the Postgres schedule row instead of removing it.

Implementation-wise we’d add a status/deleted_at field, filter deleted schedules out of list/get, and update delete_schedule to mark deleted after the Temporal delete succeeds. The main choice is whether deleted schedule names can be reused. Keeping the current unique (agent_id, name) means no reuse, which is simpler and preserves audit history cleanly. Allowing reuse would require a partial unique index over active schedules only.

Thoughts? It may be simpler in v1 to just assume deleted names cannot be reused

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on the soft-delete approach — keep deleting the Temporal schedule so no future fires happen, and soft-delete the Postgres row after that delete succeeds, filtering deleted rows out of list/get
Fro the name reuse, I'm fine keeping the current (agent_id, name) unique index → no reuse for v1. It's the simplest path, and it actually keeps audit cleanest (one name = one schedule, ever). The only cost is you can't reuse a deleted name. If a real reuse need shows up later, a partial unique index (WHERE deleted_at IS NULL) is a clean, one-line upgrade that allows reuse

One bigger question to flag (non blocking for v1): the reuse debate only exists because name is our identity (URL + unique key + authz selector). The row already has a stable id; if id were the handle and name a mutable label, reuse would be a non-issue with unambiguous audit

message=f"Schedule '{schedule_id}' not found",
detail=str(e),
) from e
logger.error(f"Failed to update schedule {schedule_id}: {e}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log a metric?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this!

['agent_id', 'name'],
unique=True,
)
op.create_index(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an index on updated_at?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this index would only help a query that sorts or filters on updated_at. Today the schedule list path orders by created_at, and I don’t see anything reading updated_at. It’s also the last-modified time of the schedule definition itself, which should only change on relatively rare patch/pause/resume operations.

Given the expected low row count per agent, I’d hold off until we have a query that actually needs it. Happy to add it if you’re picturing a “recently updated” sort/view down the line. Thoghts?


def upgrade() -> None:
op.create_table(
'agent_run_schedules',

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a record version field to track every schedule update? Currently, patch, pause, and resume look like blind read-modify-write flows. That means a stale patch could accidentally overwrite a newer pause/resume change and silently reactivate a schedule.
If we add a version number, we can make updates conditional on the version the caller last read. That gives us optimistic concurrency control and also gives us a cleaner audit trail of schedule changes over time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that patch/pause/resume are blind read-modify-write flows today, and a version column would give us proper optimistic concurrency plus a cleaner foundation for change history.

My one hesitation is v1 scope: schedules are user-owned, and schedule mutations should be infrequent actions on a single row. In practice I’d expect one user to be editing/pausing/resuming a schedule one operation at a time, so concurrent conflicting edits seem unlikely compared to task/message/event traffic.

Do you think it’s worth addressing in this first pass, or could we defer the version column to a follow-up once we see real edit patterns? Happy to go either way, just want to size it against v1 scope. cc @danielmillerp

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My argument for doing this now is migration-cost asymmetry, not how often concurrency happens.
This table is brand new in the PR, so adding version INT NOT NULL DEFAULT 1 is basically free right now: no backfill, no nullable-then-populate migration, no schema change on a live populated table, and the write paths are already being built.
If we retrofit this later, we’ll need to migrate existing rows, backfill versions, and thread version checks through patch/pause/resume after those flows have already shipped. That is strictly more work and higher risk.
It also gives us a useful debugging hook for customer issues. If a customer reports an unexpected patch/pause/resume outcome, having an explicit version on the row makes it much easier to reason about which state was read, which update was attempted, and whether we dropped or overwrote a concurrent change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed will add this in!

@danielmillerp danielmillerp left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks great to me!



@router.post(
"/{name}/pause",

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hell ya was gonna request haha

user_id: str | None = Field(
None, description="Creator user id, if a user principal."
)
service_account_id: str | None = Field(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will accomodate for long term solution right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, and it's intentionally credential-free. We preserve the creator’s account/user/service-account context for ownership and attribution, but we do not store request credentials, headers, JWTs, API keys, etc. Worst case, even if the fire-time authz path evolves, this still gives us useful attribution/debugging context without retaining secrets.



def build_run_schedule_temporal_id(schedule_row_id: str) -> str:
return f"{RUN_SCHEDULE_TEMPORAL_ID_PREFIX}:{schedule_row_id}"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're using db to get this right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep!

) from exc

temporal_id = build_run_schedule_temporal_id(created.id)
authz_selector = build_run_schedule_authz_selector(agent.id, created.name)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the AuthZ resource selector for the schedule. The general pattern in the codebase is AgentexResource(type, selector), and the selector is the stable id AuthZ uses for checks. Since schedule routes address schedules by (agent_id, name), we derive the selector from those path params so we can authorize before the DB lookup. It’s registered under the parent agent so schedule permissions can inherit/cascade from the agent.

I'll be honest Cursor figured this one out haha ^

@workflow.defn
class ScheduledAgentRunWorkflow:
@workflow.run
async def run(self, schedule_id: str) -> dict[str, Any]:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how long does this run, is it ever closed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each workflow execution is per scheduled fire and closes when the launch activity succeeds or exhausts retries. The long-lived thing is the Temporal Schedule, not this workflow.

jromualdez-scale and others added 4 commits June 25, 2026 14:18
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Comment on lines +192 to +199
if schedule.paused:
# Temporal pauses the schedule too, but a manual trigger can still
# fire a paused schedule — honor the stored paused state defensively.
return {
"status": "skipped",
"reason": "schedule_paused",
"schedule_id": schedule_id,
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Manual trigger silently skips paused schedules

The comment at line 193 states "a manual trigger can still fire a paused schedule," indicating the clear design intent is for trigger_type == "manual" to bypass this guard — but trigger_type is never consulted. Any call to the POST /{name}/trigger endpoint on a paused schedule starts a workflow that immediately returns {"status": "skipped", "reason": "schedule_paused"}. The API caller receives an HTTP 200 with the schedule response and has no observable indication that the run was silently dropped.

Suggested change
if schedule.paused:
# Temporal pauses the schedule too, but a manual trigger can still
# fire a paused schedule — honor the stored paused state defensively.
return {
"status": "skipped",
"reason": "schedule_paused",
"schedule_id": schedule_id,
}
if schedule.paused and trigger_type != "manual":
# Temporal pauses the schedule too, but a manual trigger can still
# fire a paused schedule — honor the stored paused state defensively
# only for cadence-driven fires, not explicit out-of-band triggers.
return {
"status": "skipped",
"reason": "schedule_paused",
"schedule_id": schedule_id,
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/temporal/activities/scheduled_agent_run_activities.py
Line: 192-199

Comment:
**Manual trigger silently skips paused schedules**

The comment at line 193 states "a manual trigger can still fire a paused schedule," indicating the clear design intent is for `trigger_type == "manual"` to bypass this guard — but `trigger_type` is never consulted. Any call to the `POST /{name}/trigger` endpoint on a paused schedule starts a workflow that immediately returns `{"status": "skipped", "reason": "schedule_paused"}`. The API caller receives an HTTP 200 with the schedule response and has no observable indication that the run was silently dropped.

```suggestion
        if schedule.paused and trigger_type != "manual":
            # Temporal pauses the schedule too, but a manual trigger can still
            # fire a paused schedule — honor the stored paused state defensively
            # only for cadence-driven fires, not explicit out-of-band triggers.
            return {
                "status": "skipped",
                "reason": "schedule_paused",
                "schedule_id": schedule_id,
            }
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants