From 1beae2b4d691c4e910bd924980bf5b8ac399703f Mon Sep 17 00:00:00 2001 From: Claude Lin & Lay Date: Fri, 26 Jun 2026 02:30:53 +0900 Subject: [PATCH] feat(worker): add DO-alarm time-based retention sweep for unprocessed events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 未処理イベントの保持上限を導入し, DO ストレージ肥大の唯一残った無限増加経路を塞ぐ. WebhookStore DO に時間駆動の retention sweep を追加する. sweep は処理済み (>PURGE_AFTER_DAYS, 既定 7 日) と未処理 (>UNPROCESSED_PURGE_AFTER_DAYS, 既定 90 日) の両方をまとめて削除する. トリガは DO Alarm (ctx.storage.setAlarm + alarm() ハンドラ) で, alarm() は実行後に次回 sweep を再スケジュールするため, mark_processed を一度も呼ばない放置テナントでも消費ゼロで日次実行される. 既存の mark_processed 即時 purge は即時性のため残した. 未処理保持期間を処理済みより長く取るのは未読データ消失の安全域確保のため (7 日 / 90 日の非対称は意図的). 新規 env UNPROCESSED_PURGE_AFTER_DAYS を wrangler.toml [vars] に追加した. docs/0-requirements (.ja 含む) に F2.5 と N2.8 を追記し, root README と mcp-server/README に retention 仕様を明記した. テストは未処理 >90 日削除 / <90 日残存 / 処理済み >7 日削除 / 混在 sweep / alarm 再スケジュール / 放置テナント alarm 掃除 を検証する. Closes #236 --- README.md | 24 +++++++ docs/0-requirements.ja.md | 6 +- docs/0-requirements.md | 6 +- mcp-server/README.md | 14 ++++ worker/src/store.ts | 110 +++++++++++++++++++++++++--- worker/test/workers/store.test.ts | 115 +++++++++++++++++++++++++++++- worker/wrangler.toml | 13 +++- 7 files changed, 272 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 79a492e..b9d5214 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,30 @@ All checks passed. | `get_webhook_events` | Full payloads for all pending events | | `mark_processed` | Mark an event as processed | +## Event Retention + +Stored events are purged automatically to bound Durable Object storage. The Worker +runs a time-based sweep on a Durable Object Alarm (daily), so cleanup happens even +for tenants that never call `mark_processed`: + +| Event class | Retention window | Env var | Default | +|-------------|------------------|---------|---------| +| Processed (`mark_processed` called) | older than the window is deleted | `PURGE_AFTER_DAYS` | `7` days | +| Unprocessed (never marked) | older than the window is deleted | `UNPROCESSED_PURGE_AFTER_DAYS` | `90` days | + +- The longer window for unprocessed events is intentional: unprocessed means + user-unseen, so the safety margin before dropping is wide (the 7-day vs 90-day + asymmetry is by design). +- The sweep runs via a Durable Object Alarm on a daily cadence and reschedules + itself, so it fires independently of consumption. Processed events are also + purged immediately on `mark_processed` for promptness; the Alarm sweep is the + guarantee that covers abandoned tenants. +- Both windows are configurable in `worker/wrangler.toml` (`[vars]`). Setting a + value to `0` purges that class immediately on sweep. +- Known limitation: the windows bound event *age*, not *volume*. A high-rate, + never-consumed tenant can still reach Cloudflare's 1 GB-per-DO ceiling before + the 90-day window applies. A volume-based hard cap is tracked separately. + ## Monorepo Structure ``` diff --git a/docs/0-requirements.ja.md b/docs/0-requirements.ja.md index 62eee78..e7771ea 100644 --- a/docs/0-requirements.ja.md +++ b/docs/0-requirements.ja.md @@ -75,7 +75,8 @@ GitHub --POST--> Cloudflare Worker --> TenantRegistry DO | F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する | | F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ | | F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) | -| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ | +| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。処理済み死蔵行による DO ストレージ肥大を防ぐ(即時性のための補助経路。本質的な保証は F2.5 の Alarm sweep が担う) | +| F2.5 | DO Alarm(`ctx.storage.setAlarm` + `alarm()` ハンドラ)による時間駆動の retention sweep を提供する。sweep は (a) `processed=1` かつ `received_at` が `PURGE_AFTER_DAYS` 日(既定 7)より古い処理済みイベントと、(b) `processed=0` かつ `received_at` が `UNPROCESSED_PURGE_AFTER_DAYS` 日(既定 90)より古い未処理イベントを、まとめて削除する。alarm() は実行後に次回 sweep を再スケジュールし、消費ゼロ(`mark_processed` が一度も呼ばれない放置テナント)でも周期実行(日次)される。未処理保持期間を処理済みより長く取るのは未読データ消失の安全域確保のため(7 日 / 90 日の非対称は意図的)。既知の限界(スコープ外): 時間窓は「古さ」を縛るが「量」は縛らないため、高頻度 × 無消費テナントは 90 日窓があっても Cloudflare の 1GB/DO 壁に先に到達しうる。量ベース hard cap は別 issue 候補 | **イベント構造:** @@ -203,7 +204,8 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える | N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み | | N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する | | N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス | -| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 | +| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時および DO Alarm sweep 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 | +| N2.8 | 未処理イベント保持期間 | `UNPROCESSED_PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `90`(日)。DO Alarm sweep 時に保持期間超過の未処理イベントを削除。`0` で未処理を即削除。処理済み(7 日)より長いのは未読データ消失の安全域(非対称は意図的) | ### N3. 制約 diff --git a/docs/0-requirements.md b/docs/0-requirements.md index 20b20aa..eaa4572 100644 --- a/docs/0-requirements.md +++ b/docs/0-requirements.md @@ -75,7 +75,8 @@ GitHub ──POST──▶ Cloudflare Worker ──▶ TenantRegistry DO | F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する | | F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ | | F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) | -| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ | +| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。処理済み死蔵行による DO ストレージ肥大を防ぐ(即時性のための補助経路。本質的な保証は F2.5 の Alarm sweep が担う) | +| F2.5 | DO Alarm(`ctx.storage.setAlarm` + `alarm()` ハンドラ)による時間駆動の retention sweep を提供する。sweep は (a) `processed=1` かつ `received_at` が `PURGE_AFTER_DAYS` 日(既定 7)より古い処理済みイベントと、(b) `processed=0` かつ `received_at` が `UNPROCESSED_PURGE_AFTER_DAYS` 日(既定 90)より古い未処理イベントを、まとめて削除する。alarm() は実行後に次回 sweep を再スケジュールし、消費ゼロ(`mark_processed` が一度も呼ばれない放置テナント)でも周期実行(日次)される。未処理保持期間を処理済みより長く取るのは未読データ消失の安全域確保のため(7 日 / 90 日の非対称は意図的)。既知の限界(スコープ外): 時間窓は「古さ」を縛るが「量」は縛らないため、高頻度 × 無消費テナントは 90 日窓があっても Cloudflare の 1GB/DO 壁に先に到達しうる。量ベース hard cap は別 issue 候補 | **イベント構造:** @@ -203,7 +204,8 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える | N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み | | N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する | | N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス | -| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 | +| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時および DO Alarm sweep 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 | +| N2.8 | 未処理イベント保持期間 | `UNPROCESSED_PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `90`(日)。DO Alarm sweep 時に保持期間超過の未処理イベントを削除。`0` で未処理を即削除。処理済み(7 日)より長いのは未読データ消失の安全域(非対称は意図的) | **GitHub Webhook 購読イベント:** diff --git a/mcp-server/README.md b/mcp-server/README.md index c714ef5..46d7d56 100644 --- a/mcp-server/README.md +++ b/mcp-server/README.md @@ -149,6 +149,20 @@ All tools are read-only except `mark_processed`. If real-time channel notifications are enabled (Claude Code), step 1 can be skipped — the proxy will push event summaries as soon as the Worker receives them. You still need to call `mark_processed` to clear the queue. +## Event retention + +The Worker purges stored events automatically so Durable Object storage stays bounded. A time-based sweep runs on a Durable Object Alarm (daily) and reschedules itself, so cleanup happens even when `mark_processed` is never called. + +| Event class | Retention window | Worker env var | Default | +|---|---|---|---| +| Processed (`mark_processed` called) | events older than the window are deleted | `PURGE_AFTER_DAYS` | `7` days | +| Unprocessed (never marked) | events older than the window are deleted | `UNPROCESSED_PURGE_AFTER_DAYS` | `90` days | + +- The longer window for unprocessed events is intentional — unprocessed means user-unseen, so the margin before dropping is wide (the 7-day vs 90-day asymmetry is by design). +- Processed events are also purged immediately on `mark_processed` for promptness; the Alarm sweep is the guarantee that covers tenants that stop consuming. +- Both windows are Worker-side configuration (`worker/wrangler.toml` `[vars]`); set a value to `0` to purge that class immediately on sweep. These env vars live on the Worker, not in this proxy. +- Known limitation: the windows bound event *age*, not *volume*. A high-rate, never-consumed tenant can still hit Cloudflare's 1 GB-per-DO ceiling before the 90-day window applies. + ## Authentication flow 1. On first tool call (or on startup if cached tokens exist), the proxy discovers OAuth metadata at `${WEBHOOK_WORKER_URL}/.well-known/oauth-authorization-server`. diff --git a/worker/src/store.ts b/worker/src/store.ts index 1df2698..e03a9b5 100644 --- a/worker/src/store.ts +++ b/worker/src/store.ts @@ -14,23 +14,44 @@ import { summarizeEvent } from "../../shared/src/summarize.js"; */ interface StoreEnv { PURGE_AFTER_DAYS?: string; + UNPROCESSED_PURGE_AFTER_DAYS?: string; } /** Default retention window (days) for processed events when PURGE_AFTER_DAYS is unset. */ const DEFAULT_PURGE_DAYS = 7; /** - * Resolve the retention window from env. Falls back to DEFAULT_PURGE_DAYS when - * unset / non-numeric / negative. A value of 0 means "purge all processed - * events immediately on mark". + * Default retention window (days) for UNPROCESSED events when + * UNPROCESSED_PURGE_AFTER_DAYS is unset. Deliberately much longer than the + * processed window: unprocessed = user-unseen, so the safety margin before + * silently dropping is wide (the 7d-vs-90d asymmetry is intentional). */ -function purgeDays(env: StoreEnv): number { - const raw = env.PURGE_AFTER_DAYS; +const DEFAULT_UNPROCESSED_PURGE_DAYS = 90; + +/** Sweep cadence: how often the DO Alarm fires to run the time-based purge. */ +const SWEEP_INTERVAL_MS = 86_400_000; // 24h (daily) + +/** + * Resolve a retention window (days) from an env value. Falls back to + * `fallback` when unset / non-numeric / negative. A value of 0 means "purge + * matching events immediately" (no retention). + */ +function resolveDays(raw: string | undefined, fallback: number): number { if (raw !== undefined) { const n = Number(raw); if (Number.isFinite(n) && n >= 0) return n; } - return DEFAULT_PURGE_DAYS; + return fallback; +} + +/** Retention window (days) for processed events. */ +function purgeDays(env: StoreEnv): number { + return resolveDays(env.PURGE_AFTER_DAYS, DEFAULT_PURGE_DAYS); +} + +/** Retention window (days) for unprocessed events. */ +function unprocessedPurgeDays(env: StoreEnv): number { + return resolveDays(env.UNPROCESSED_PURGE_AFTER_DAYS, DEFAULT_UNPROCESSED_PURGE_DAYS); } export class WebhookStore extends DurableObject { @@ -38,7 +59,7 @@ export class WebhookStore extends DurableObject { private sseWriters = new Set>(); private sseEncoder = new TextEncoder(); - private ensureTable() { + private ensureSchema() { if (this.initialized) return; this.ctx.storage.sql.exec(` CREATE TABLE IF NOT EXISTS events ( @@ -54,6 +75,69 @@ export class WebhookStore extends DurableObject { this.initialized = true; } + /** + * Initialize on the request path: create the schema and ensure the recurring + * sweep alarm is armed. Alarm arming is split out of the alarm() handler, + * which reschedules itself directly. + */ + private async ensureTable() { + this.ensureSchema(); + await this.ensureAlarm(); + } + + /** + * Arm the recurring sweep alarm if one is not already scheduled. This is the + * consumption-independent path: abandoned tenants never call mark_processed, + * so a mark-only trigger would never reach the very tenants whose unprocessed + * events accumulate. The DO Alarm fires even with zero consumption (#236). + */ + private async ensureAlarm() { + const existing = await this.ctx.storage.getAlarm(); + if (existing === null) { + await this.ctx.storage.setAlarm(Date.now() + SWEEP_INTERVAL_MS); + } + } + + /** + * Time-based retention sweep. Deletes: + * - processed events older than PURGE_AFTER_DAYS (default 7), and + * - UNPROCESSED events older than UNPROCESSED_PURGE_AFTER_DAYS (default 90). + * Returns the per-class purged counts. Bounds DO storage growth from the only + * remaining unbounded path (unprocessed rows on abandoned tenants, #236). + */ + private sweep(): { processed: number; unprocessed: number } { + const now = Date.now(); + + const processedCutoff = new Date(now - purgeDays(this.env) * 86_400_000).toISOString(); + const processedCursor = this.ctx.storage.sql.exec( + `DELETE FROM events WHERE processed = 1 AND received_at < ?`, + processedCutoff, + ); + const processed = processedCursor.rowsWritten; + + const unprocessedCutoff = new Date(now - unprocessedPurgeDays(this.env) * 86_400_000).toISOString(); + const unprocessedCursor = this.ctx.storage.sql.exec( + `DELETE FROM events WHERE processed = 0 AND received_at < ?`, + unprocessedCutoff, + ); + const unprocessed = unprocessedCursor.rowsWritten; + + return { processed, unprocessed }; + } + + /** + * DO Alarm handler — the consumption-independent retention guarantee. Runs the + * full sweep (processed + unprocessed) and reschedules the next sweep so the + * cycle keeps running on a hibernating, never-consumed tenant. + */ + async alarm() { + this.ensureSchema(); + this.sweep(); + // Reschedule the next periodic sweep. alarm() is invoked after the prior + // alarm has been cleared, so this re-arms the recurring cycle. + await this.ctx.storage.setAlarm(Date.now() + SWEEP_INTERVAL_MS); + } + /** Broadcast to all connected WebSocket and SSE clients */ private broadcast(data: unknown) { const msg = JSON.stringify(data); @@ -78,7 +162,7 @@ export class WebhookStore extends DurableObject { } async fetch(request: Request): Promise { - this.ensureTable(); + await this.ensureTable(); const url = new URL(request.url); // ── WebSocket upgrade ── @@ -257,6 +341,16 @@ export class WebhookStore extends DurableObject { return Response.json({ success: true, event_id, purged }); } + // ── sweep (time-based retention purge) ── + // Runs the same sweep the DO Alarm runs: purge processed > PURGE_AFTER_DAYS + // and unprocessed > UNPROCESSED_PURGE_AFTER_DAYS. Exposed as a route so the + // sweep can be driven on demand (and asserted in tests) independently of the + // alarm scheduler. + if (url.pathname === "/sweep" && request.method === "POST") { + const purged = this.sweep(); + return Response.json({ success: true, ...purged }); + } + return new Response("Not found", { status: 404 }); } diff --git a/worker/test/workers/store.test.ts b/worker/test/workers/store.test.ts index 401be4f..0b52f63 100644 --- a/worker/test/workers/store.test.ts +++ b/worker/test/workers/store.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { env } from "cloudflare:test"; +import { env, runInDurableObject, runDurableObjectAlarm } from "cloudflare:test"; import type { WebhookEvent, EventSummary, PendingStatus } from "../../../shared/src/types.js"; // Each test uses a uniquely-named DO instance for storage isolation. WebhookStore @@ -277,6 +277,119 @@ describe("WebhookStore: mark-processed auto-purge", () => { }); }); +// Drive the time-based retention sweep on demand (same sweep the DO Alarm runs). +async function sweep(stub: DurableObjectStub) { + const res = await stub.fetch(new Request(`${BASE}/sweep`, { method: "POST" })); + expect(res.status).toBe(200); + return res.json() as Promise<{ success: boolean; processed: number; unprocessed: number }>; +} + +describe("WebhookStore: time-based sweep (DO Alarm body)", () => { + it("purges UNPROCESSED events older than UNPROCESSED_PURGE_AFTER_DAYS (default 90)", async () => { + const stub = storeFor("sweep-old-unprocessed"); + // 100 days old, unprocessed → beyond the 90-day window → must be swept + await ingest(stub, makeEvent({ id: "stale", type: "issues", received_at: isoFromNow(-100 * DAY_MS) })); + + const result = await sweep(stub); + expect(result.unprocessed).toBe(1); + + // gone + const res = await stub.fetch(new Request(`${BASE}/event?id=stale`)); + expect(res.status).toBe(404); + }); + + it("keeps UNPROCESSED events within the 90-day window", async () => { + const stub = storeFor("sweep-keeps-recent-unprocessed"); + // 89 days old, unprocessed → inside the window → must survive + await ingest(stub, makeEvent({ id: "recent", type: "issues", received_at: isoFromNow(-89 * DAY_MS) })); + + const result = await sweep(stub); + expect(result.unprocessed).toBe(0); + + // still present and still pending + const res = await stub.fetch(new Request(`${BASE}/event?id=recent`)); + expect(res.status).toBe(200); + const statusRes = await stub.fetch(new Request(`${BASE}/pending-status`)); + const status = (await statusRes.json()) as PendingStatus; + expect(status.pending_count).toBe(1); + }); + + it("purges PROCESSED events older than PURGE_AFTER_DAYS (default 7) in the same sweep — no mark_processed call required", async () => { + const stub = storeFor("sweep-old-processed"); + // 30 days old, already processed → beyond the 7-day window → swept by the + // time-based sweep without any mark_processed trigger + await ingest(stub, makeEvent({ id: "doneold", type: "issues", received_at: isoFromNow(-30 * DAY_MS), processed: true })); + // fresh processed event stays + await ingest(stub, makeEvent({ id: "donefresh", type: "issues", received_at: isoFromNow(-1 * DAY_MS), processed: true })); + + const result = await sweep(stub); + expect(result.processed).toBe(1); + expect(result.unprocessed).toBe(0); + + const goneRes = await stub.fetch(new Request(`${BASE}/event?id=doneold`)); + expect(goneRes.status).toBe(404); + const keptRes = await stub.fetch(new Request(`${BASE}/event?id=donefresh`)); + expect(keptRes.status).toBe(200); + }); + + it("sweeps processed (>7d) and unprocessed (>90d) together while keeping in-window rows of both classes", async () => { + const stub = storeFor("sweep-mixed"); + await ingest(stub, makeEvent({ id: "p-old", received_at: isoFromNow(-30 * DAY_MS), processed: true })); // swept + await ingest(stub, makeEvent({ id: "p-new", received_at: isoFromNow(-2 * DAY_MS), processed: true })); // kept + await ingest(stub, makeEvent({ id: "u-old", received_at: isoFromNow(-120 * DAY_MS) })); // swept + await ingest(stub, makeEvent({ id: "u-new", received_at: isoFromNow(-30 * DAY_MS) })); // kept + + const result = await sweep(stub); + expect(result).toEqual({ success: true, processed: 1, unprocessed: 1 }); + + expect((await stub.fetch(new Request(`${BASE}/event?id=p-old`))).status).toBe(404); + expect((await stub.fetch(new Request(`${BASE}/event?id=u-old`))).status).toBe(404); + expect((await stub.fetch(new Request(`${BASE}/event?id=p-new`))).status).toBe(200); + expect((await stub.fetch(new Request(`${BASE}/event?id=u-new`))).status).toBe(200); + }); + + it("reports the swept unprocessed count for a clearly-overdue row under the default window", async () => { + const stub = storeFor("sweep-default-window"); + await ingest(stub, makeEvent({ id: "year-old", received_at: isoFromNow(-365 * DAY_MS) })); + const result = await sweep(stub); + expect(result.unprocessed).toBe(1); + }); +}); + +describe("WebhookStore: DO Alarm scheduling", () => { + it("arms a sweep alarm on first request and reschedules after the alarm fires", async () => { + const stub = storeFor("alarm-reschedule"); + // First request initializes the DO and arms the recurring sweep alarm. + await ingest(stub, makeEvent({ id: "x", received_at: isoFromNow(-1 * DAY_MS) })); + + // An alarm should now be scheduled (consumption-independent sweep is armed). + const armed = await runInDurableObject(stub, (_instance, state) => state.storage.getAlarm()); + expect(armed).not.toBeNull(); + + // Fire the scheduled alarm; the handler runs the sweep and re-arms the next one. + const ran = await runDurableObjectAlarm(stub); + expect(ran).toBe(true); + + // After firing, a fresh alarm must be scheduled again (recurring cycle). + const rearmed = await runInDurableObject(stub, (_instance, state) => state.storage.getAlarm()); + expect(rearmed).not.toBeNull(); + }); + + it("alarm() sweeps overdue rows when no consumer ever calls mark_processed", async () => { + const stub = storeFor("alarm-sweeps"); + // Abandoned-tenant shape: an ancient unprocessed event, never consumed. + await ingest(stub, makeEvent({ id: "abandoned", received_at: isoFromNow(-200 * DAY_MS) })); + + // Drive the alarm handler directly (simulates the scheduled fire). + await runInDurableObject(stub, async (instance: any) => { + await instance.alarm(); + }); + + const res = await stub.fetch(new Request(`${BASE}/event?id=abandoned`)); + expect(res.status).toBe(404); + }); +}); + describe("WebhookStore: unknown route", () => { it("returns 404 for an unrecognized path", async () => { const stub = storeFor("unknown-route"); diff --git a/worker/wrangler.toml b/worker/wrangler.toml index c3eec4c..200f4da 100644 --- a/worker/wrangler.toml +++ b/worker/wrangler.toml @@ -4,10 +4,17 @@ compatibility_date = "2025-03-26" compatibility_flags = ["nodejs_compat"] [vars] -# Retention window (days) for processed webhook events. On mark_processed, the -# WebhookStore DO deletes processed events older than this to bound DO storage. -# Unprocessed events are never purged. 0 = purge processed events immediately. +# Retention window (days) for processed webhook events. On mark_processed and on +# the periodic DO Alarm sweep, the WebhookStore DO deletes processed events older +# than this to bound DO storage. 0 = purge processed events immediately. PURGE_AFTER_DAYS = "7" +# Retention window (days) for UNPROCESSED webhook events. The DO Alarm sweep +# deletes unprocessed events older than this. Much longer than the processed +# window because unprocessed = user-unseen (the asymmetry is intentional). This +# closes the last unbounded storage-growth path: abandoned tenants that never +# call mark_processed. The sweep runs on a time-based DO Alarm, independent of +# consumption. 0 = purge unprocessed events immediately on sweep. +UNPROCESSED_PURGE_AFTER_DAYS = "90" [durable_objects] bindings = [