From 289bfe30391dea7ffd4910c81ff14f26c97b2a74 Mon Sep 17 00:00:00 2001 From: Claude Lin & Lay Date: Fri, 26 Jun 2026 02:01:04 +0900 Subject: [PATCH] feat(worker): re-port auto-purge of processed events to WebhookStore DO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DO 移行 (#71) で失われた auto-purge 機能 (#29) を WebhookStore DO に再移植する。 mark_processed 実行時に processed=1 かつ received_at が保持期間 (PURGE_AFTER_DAYS 日, 既定 7) より古いイベントを DELETE し, 処理済み死蔵行による DO ストレージ肥大を止める。 未処理イベントは保持期間に関わらず削除しない。戻り値に削除件数 purged を追加した。 env 型は store.ts にローカル最小定義 (StoreEnv) し index.ts の Env への循環 import を避けた。 wrangler.toml の [vars] に PURGE_AFTER_DAYS=7 を追加。docs/0-requirements{,.ja}.md に auto-purge 仕様 (F2.4 / F3.5 purged 戻り値 / N2.7 構成) を同一 PR で復活させた。 既存 store.test.ts の mark-processed テストは絶対日付 received_at が 7 日 purge で即削除 され壊れるため, received_at を now 相対に変更して新挙動へ整合させた。保持期間超過の処理済み 削除 / 未処理は残す / purged 件数の検証テストを追加 (DO テスト 25→28)。 Refs #29 #71 Closes #234 --- docs/0-requirements.ja.md | 4 +- docs/0-requirements.md | 4 +- worker/src/store.ts | 41 ++++++++++++++++- worker/test/workers/store.test.ts | 75 +++++++++++++++++++++++++++++-- worker/wrangler.toml | 6 +++ 5 files changed, 122 insertions(+), 8 deletions(-) diff --git a/docs/0-requirements.ja.md b/docs/0-requirements.ja.md index 892fd3f..62eee78 100644 --- a/docs/0-requirements.ja.md +++ b/docs/0-requirements.ja.md @@ -75,6 +75,7 @@ GitHub --POST--> Cloudflare Worker --> TenantRegistry DO | F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する | | F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ | | F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) | +| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ | **イベント構造:** @@ -100,7 +101,7 @@ WebhookMcpAgent DO が以下のツールセットを提供する。ローカル | F3.2 | `list_pending_events` | limit (1-100, default 20) | サマリー配列 | 未処理イベントのメタデータ一覧を返す(ペイロード含まず) | | F3.3 | `get_event` | event_id | 完全イベント or error | UUID 指定で完全なペイロードを返す | | F3.4 | `get_webhook_events` | limit (1-100, default 20) | 未処理イベント配列 | 未処理イベントをフルペイロード付きで返す | -| F3.5 | `mark_processed` | event_id | success, event_id | イベントを処理済みにマークする | +| F3.5 | `mark_processed` | event_id | success, event_id, purged | イベントを処理済みにマークし、保持期間超過の処理済みイベントを自動削除する。`purged` は今回削除された件数 | **F3.1 ローカルブリッジ整形:** ローカルブリッジは `get_pending_status` の戻り値を Claude Code UserPromptSubmit hook の decision JSON shape (`hookSpecificOutput.hookEventName="UserPromptSubmit"` + `additionalContext` に pending_count / types / latest_received_at の自然文要約) にラップして返す。これは `type: "mcp_tool"` UserPromptSubmit hook 経由の呼び出しで戻り値が AI 文脈に注入されるための要件であり、手動 tool 呼び出し時も同 shape で返る。リモート (Worker + DO) 側の戻り値構造は変更しない。 @@ -202,6 +203,7 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える | N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み | | N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する | | N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス | +| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 | ### N3. 制約 diff --git a/docs/0-requirements.md b/docs/0-requirements.md index 1d92538..20b20aa 100644 --- a/docs/0-requirements.md +++ b/docs/0-requirements.md @@ -75,6 +75,7 @@ GitHub ──POST──▶ Cloudflare Worker ──▶ TenantRegistry DO | F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する | | F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ | | F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) | +| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ | **イベント構造:** @@ -100,7 +101,7 @@ WebhookMcpAgent DO が以下のツールセットを提供する。ローカル | F3.2 | `list_pending_events` | limit (1-100, default 20) | サマリー配列 | 未処理イベントのメタデータ一覧を返す(ペイロード含まず) | | F3.3 | `get_event` | event_id | 完全イベント or error | UUID 指定で完全なペイロードを返す | | F3.4 | `get_webhook_events` | limit (1-100, default 20) | 未処理イベント配列 | 未処理イベントをフルペイロード付きで返す | -| F3.5 | `mark_processed` | event_id | success, event_id | イベントを処理済みにマークする | +| F3.5 | `mark_processed` | event_id | success, event_id, purged | イベントを処理済みにマークし、保持期間超過の処理済みイベントを自動削除する。`purged` は今回削除された件数 | **F3.1 Local bridge shaping:** The local bridge wraps `get_pending_status` results into the Claude Code UserPromptSubmit hook decision JSON shape (`hookSpecificOutput.hookEventName="UserPromptSubmit"` plus a natural-language summary of pending_count / types / latest_received_at in `additionalContext`). This is required so values returned via `type: "mcp_tool"` UserPromptSubmit hooks reach the AI prompt context; manual tool calls receive the same shape. The remote (Worker + DO) return contract is unchanged. @@ -202,6 +203,7 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える | N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み | | N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する | | N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス | +| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 | **GitHub Webhook 購読イベント:** diff --git a/worker/src/store.ts b/worker/src/store.ts index 3f7fd91..1df2698 100644 --- a/worker/src/store.ts +++ b/worker/src/store.ts @@ -7,7 +7,33 @@ import { DurableObject } from "cloudflare:workers"; import type { WebhookEvent, EventSummary, PendingStatus } from "../../shared/src/types.js"; import { summarizeEvent } from "../../shared/src/summarize.js"; -export class WebhookStore extends DurableObject { +/** + * Minimal local Env shape for this DO. Defined here (not imported from + * index.ts's full Env) to avoid a circular import; only the vars this DO + * actually reads are declared. + */ +interface StoreEnv { + PURGE_AFTER_DAYS?: string; +} + +/** Default retention window (days) for processed events when PURGE_AFTER_DAYS is unset. */ +const DEFAULT_PURGE_DAYS = 7; + +/** + * Resolve the retention window from env. Falls back to DEFAULT_PURGE_DAYS when + * unset / non-numeric / negative. A value of 0 means "purge all processed + * events immediately on mark". + */ +function purgeDays(env: StoreEnv): number { + const raw = env.PURGE_AFTER_DAYS; + if (raw !== undefined) { + const n = Number(raw); + if (Number.isFinite(n) && n >= 0) return n; + } + return DEFAULT_PURGE_DAYS; +} + +export class WebhookStore extends DurableObject { private initialized = false; private sseWriters = new Set>(); private sseEncoder = new TextEncoder(); @@ -217,7 +243,18 @@ export class WebhookStore extends DurableObject { this.ctx.storage.sql.exec( `UPDATE events SET processed = 1 WHERE id = ?`, event_id, ); - return Response.json({ success: true, event_id }); + + // Auto-purge: delete processed events whose received_at is older than the + // retention window. Unprocessed events are never deleted regardless of age. + // This bounds DO storage growth from dead processed rows (re-port of #29). + const days = purgeDays(this.env); + const cutoff = new Date(Date.now() - days * 86_400_000).toISOString(); + const cursor = this.ctx.storage.sql.exec( + `DELETE FROM events WHERE processed = 1 AND received_at < ?`, cutoff, + ); + const purged = cursor.rowsWritten; + + return Response.json({ success: true, event_id, purged }); } return new Response("Not found", { status: 404 }); diff --git a/worker/test/workers/store.test.ts b/worker/test/workers/store.test.ts index 68b9f2a..401be4f 100644 --- a/worker/test/workers/store.test.ts +++ b/worker/test/workers/store.test.ts @@ -184,10 +184,19 @@ describe("WebhookStore: get_event (/event?id=)", () => { }); }); +// received_at relative to the real wall-clock "now". The DO purges processed +// events older than PURGE_AFTER_DAYS (default 7); tests bind received_at relative +// to now so they stay stable regardless of the absolute date. +function isoFromNow(deltaMs: number): string { + return new Date(Date.now() + deltaMs).toISOString(); +} +const DAY_MS = 86_400_000; + describe("WebhookStore: mark-processed", () => { - it("flips processed so the event drops out of pending but remains fetchable by id", async () => { + it("flips processed so a within-retention event drops out of pending but remains fetchable by id, returning purged=0", async () => { const stub = storeFor("mark-processed"); - await ingest(stub, makeEvent({ id: "m1", type: "issues", received_at: "2026-01-01T00:00:00Z" })); + // received_at is recent (1 day ago) so it survives the default 7-day purge window + await ingest(stub, makeEvent({ id: "m1", type: "issues", received_at: isoFromNow(-1 * DAY_MS) })); const mp = await stub.fetch( new Request(`${BASE}/mark-processed`, { @@ -196,20 +205,78 @@ describe("WebhookStore: mark-processed", () => { }), ); expect(mp.status).toBe(200); - expect(await mp.json()).toEqual({ success: true, event_id: "m1" }); + expect(await mp.json()).toEqual({ success: true, event_id: "m1", purged: 0 }); // no longer pending const statusRes = await stub.fetch(new Request(`${BASE}/pending-status`)); const status = (await statusRes.json()) as PendingStatus; expect(status.pending_count).toBe(0); - // but still retrievable by id, with processed = true + // but still retrievable by id, with processed = true (within retention, not purged) const evRes = await stub.fetch(new Request(`${BASE}/event?id=m1`)); + expect(evRes.status).toBe(200); const ev = (await evRes.json()) as WebhookEvent; expect(ev.processed).toBe(true); }); }); +describe("WebhookStore: mark-processed auto-purge", () => { + it("deletes processed events older than the retention window and reports the purged count", async () => { + const stub = storeFor("purge-old-processed"); + // An old, already-processed event (30 days ago, beyond the 7-day window) + await ingest( + stub, + makeEvent({ id: "old", type: "issues", received_at: isoFromNow(-30 * DAY_MS), processed: true }), + ); + // A fresh event we will mark now; marking triggers the purge sweep + await ingest(stub, makeEvent({ id: "fresh", type: "issues", received_at: isoFromNow(-1 * DAY_MS) })); + + const mp = await stub.fetch( + new Request(`${BASE}/mark-processed`, { + method: "POST", + body: JSON.stringify({ event_id: "fresh" }), + }), + ); + expect(mp.status).toBe(200); + // the stale "old" row is swept; "fresh" was just marked but is within retention + expect(await mp.json()).toEqual({ success: true, event_id: "fresh", purged: 1 }); + + // old is gone (404), fresh remains + const oldRes = await stub.fetch(new Request(`${BASE}/event?id=old`)); + expect(oldRes.status).toBe(404); + const freshRes = await stub.fetch(new Request(`${BASE}/event?id=fresh`)); + expect(freshRes.status).toBe(200); + }); + + it("keeps unprocessed events regardless of age", async () => { + const stub = storeFor("purge-keeps-unprocessed"); + // A very old but UNPROCESSED event must never be purged + await ingest(stub, makeEvent({ id: "ancient", type: "issues", received_at: isoFromNow(-365 * DAY_MS) })); + // A fresh event to mark + await ingest(stub, makeEvent({ id: "trigger", type: "issues", received_at: isoFromNow(-1 * DAY_MS) })); + + const mp = await stub.fetch( + new Request(`${BASE}/mark-processed`, { + method: "POST", + body: JSON.stringify({ event_id: "trigger" }), + }), + ); + expect(mp.status).toBe(200); + // nothing purged: ancient is unprocessed, trigger is within retention + expect(await mp.json()).toEqual({ success: true, event_id: "trigger", purged: 0 }); + + // ancient (unprocessed) is still present and still pending + const ancientRes = await stub.fetch(new Request(`${BASE}/event?id=ancient`)); + expect(ancientRes.status).toBe(200); + const ancient = (await ancientRes.json()) as WebhookEvent; + expect(ancient.processed).toBe(false); + + const statusRes = await stub.fetch(new Request(`${BASE}/pending-status`)); + const status = (await statusRes.json()) as PendingStatus; + expect(status.pending_count).toBe(1); // only the unprocessed "ancient" + }); +}); + describe("WebhookStore: unknown route", () => { it("returns 404 for an unrecognized path", async () => { const stub = storeFor("unknown-route"); diff --git a/worker/wrangler.toml b/worker/wrangler.toml index 9a1561a..c3eec4c 100644 --- a/worker/wrangler.toml +++ b/worker/wrangler.toml @@ -3,6 +3,12 @@ main = "src/index.ts" compatibility_date = "2025-03-26" compatibility_flags = ["nodejs_compat"] +[vars] +# Retention window (days) for processed webhook events. On mark_processed, the +# WebhookStore DO deletes processed events older than this to bound DO storage. +# Unprocessed events are never purged. 0 = purge processed events immediately. +PURGE_AFTER_DAYS = "7" + [durable_objects] bindings = [ { name = "MCP_OBJECT", class_name = "WebhookMcpAgent" },