Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/0-requirements.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ GitHub --POST--> Cloudflare Worker --> TenantRegistry DO
| F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する |
| F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ |
| F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) |
| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ |

**イベント構造:**

Expand All @@ -100,7 +101,7 @@ WebhookMcpAgent DO が以下のツールセットを提供する。ローカル
| F3.2 | `list_pending_events` | limit (1-100, default 20) | サマリー配列 | 未処理イベントのメタデータ一覧を返す(ペイロード含まず) |
| F3.3 | `get_event` | event_id | 完全イベント or error | UUID 指定で完全なペイロードを返す |
| F3.4 | `get_webhook_events` | limit (1-100, default 20) | 未処理イベント配列 | 未処理イベントをフルペイロード付きで返す |
| F3.5 | `mark_processed` | event_id | success, event_id | イベントを処理済みにマークする |
| F3.5 | `mark_processed` | event_id | success, event_id, purged | イベントを処理済みにマークし、保持期間超過の処理済みイベントを自動削除する。`purged` は今回削除された件数 |

**F3.1 ローカルブリッジ整形:** ローカルブリッジは `get_pending_status` の戻り値を Claude Code UserPromptSubmit hook の decision JSON shape (`hookSpecificOutput.hookEventName="UserPromptSubmit"` + `additionalContext` に pending_count / types / latest_received_at の自然文要約) にラップして返す。これは `type: "mcp_tool"` UserPromptSubmit hook 経由の呼び出しで戻り値が AI 文脈に注入されるための要件であり、手動 tool 呼び出し時も同 shape で返る。リモート (Worker + DO) 側の戻り値構造は変更しない。

Expand Down Expand Up @@ -202,6 +203,7 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える
| N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み |
| N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する |
| N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス |
| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 |

### N3. 制約

Expand Down
4 changes: 3 additions & 1 deletion docs/0-requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ GitHub ──POST──▶ Cloudflare Worker ──▶ TenantRegistry DO
| F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する |
| F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ |
| F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) |
| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ |

**イベント構造:**

Expand All @@ -100,7 +101,7 @@ WebhookMcpAgent DO が以下のツールセットを提供する。ローカル
| F3.2 | `list_pending_events` | limit (1-100, default 20) | サマリー配列 | 未処理イベントのメタデータ一覧を返す(ペイロード含まず) |
| F3.3 | `get_event` | event_id | 完全イベント or error | UUID 指定で完全なペイロードを返す |
| F3.4 | `get_webhook_events` | limit (1-100, default 20) | 未処理イベント配列 | 未処理イベントをフルペイロード付きで返す |
| F3.5 | `mark_processed` | event_id | success, event_id | イベントを処理済みにマークする |
| F3.5 | `mark_processed` | event_id | success, event_id, purged | イベントを処理済みにマークし、保持期間超過の処理済みイベントを自動削除する。`purged` は今回削除された件数 |

**F3.1 Local bridge shaping:** The local bridge wraps `get_pending_status` results into the Claude Code UserPromptSubmit hook decision JSON shape (`hookSpecificOutput.hookEventName="UserPromptSubmit"` plus a natural-language summary of pending_count / types / latest_received_at in `additionalContext`). This is required so values returned via `type: "mcp_tool"` UserPromptSubmit hooks reach the AI prompt context; manual tool calls receive the same shape. The remote (Worker + DO) return contract is unchanged.

Expand Down Expand Up @@ -202,6 +203,7 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える
| N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み |
| N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する |
| N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス |
| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 |

**GitHub Webhook 購読イベント:**

Expand Down
41 changes: 39 additions & 2 deletions worker/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,33 @@ import { DurableObject } from "cloudflare:workers";
import type { WebhookEvent, EventSummary, PendingStatus } from "../../shared/src/types.js";
import { summarizeEvent } from "../../shared/src/summarize.js";

export class WebhookStore extends DurableObject {
/**
* Minimal local Env shape for this DO. Defined here (not imported from
* index.ts's full Env) to avoid a circular import; only the vars this DO
* actually reads are declared.
*/
interface StoreEnv {
PURGE_AFTER_DAYS?: string;
}

/** Default retention window (days) for processed events when PURGE_AFTER_DAYS is unset. */
const DEFAULT_PURGE_DAYS = 7;

/**
* Resolve the retention window from env. Falls back to DEFAULT_PURGE_DAYS when
* unset / non-numeric / negative. A value of 0 means "purge all processed
* events immediately on mark".
*/
function purgeDays(env: StoreEnv): number {
const raw = env.PURGE_AFTER_DAYS;
if (raw !== undefined) {
const n = Number(raw);
if (Number.isFinite(n) && n >= 0) return n;
}
return DEFAULT_PURGE_DAYS;
}

export class WebhookStore extends DurableObject<StoreEnv> {
private initialized = false;
private sseWriters = new Set<WritableStreamDefaultWriter<Uint8Array>>();
private sseEncoder = new TextEncoder();
Expand Down Expand Up @@ -217,7 +243,18 @@ export class WebhookStore extends DurableObject {
this.ctx.storage.sql.exec(
`UPDATE events SET processed = 1 WHERE id = ?`, event_id,
);
return Response.json({ success: true, event_id });

// Auto-purge: delete processed events whose received_at is older than the
// retention window. Unprocessed events are never deleted regardless of age.
// This bounds DO storage growth from dead processed rows (re-port of #29).
const days = purgeDays(this.env);
const cutoff = new Date(Date.now() - days * 86_400_000).toISOString();
const cursor = this.ctx.storage.sql.exec(
`DELETE FROM events WHERE processed = 1 AND received_at < ?`, cutoff,
);
const purged = cursor.rowsWritten;

return Response.json({ success: true, event_id, purged });
}

return new Response("Not found", { status: 404 });
Expand Down
75 changes: 71 additions & 4 deletions worker/test/workers/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,19 @@ describe("WebhookStore: get_event (/event?id=)", () => {
});
});

// received_at relative to the real wall-clock "now". The DO purges processed
// events older than PURGE_AFTER_DAYS (default 7); tests bind received_at relative
// to now so they stay stable regardless of the absolute date.
function isoFromNow(deltaMs: number): string {
return new Date(Date.now() + deltaMs).toISOString();
}
const DAY_MS = 86_400_000;

describe("WebhookStore: mark-processed", () => {
it("flips processed so the event drops out of pending but remains fetchable by id", async () => {
it("flips processed so a within-retention event drops out of pending but remains fetchable by id, returning purged=0", async () => {
const stub = storeFor("mark-processed");
await ingest(stub, makeEvent({ id: "m1", type: "issues", received_at: "2026-01-01T00:00:00Z" }));
// received_at is recent (1 day ago) so it survives the default 7-day purge window
await ingest(stub, makeEvent({ id: "m1", type: "issues", received_at: isoFromNow(-1 * DAY_MS) }));

const mp = await stub.fetch(
new Request(`${BASE}/mark-processed`, {
Expand All @@ -196,20 +205,78 @@ describe("WebhookStore: mark-processed", () => {
}),
);
expect(mp.status).toBe(200);
expect(await mp.json()).toEqual({ success: true, event_id: "m1" });
expect(await mp.json()).toEqual({ success: true, event_id: "m1", purged: 0 });

// no longer pending
const statusRes = await stub.fetch(new Request(`${BASE}/pending-status`));
const status = (await statusRes.json()) as PendingStatus;
expect(status.pending_count).toBe(0);

// but still retrievable by id, with processed = true
// but still retrievable by id, with processed = true (within retention, not purged)
const evRes = await stub.fetch(new Request(`${BASE}/event?id=m1`));
expect(evRes.status).toBe(200);
const ev = (await evRes.json()) as WebhookEvent;
expect(ev.processed).toBe(true);
});
});

describe("WebhookStore: mark-processed auto-purge", () => {
it("deletes processed events older than the retention window and reports the purged count", async () => {
const stub = storeFor("purge-old-processed");
// An old, already-processed event (30 days ago, beyond the 7-day window)
await ingest(
stub,
makeEvent({ id: "old", type: "issues", received_at: isoFromNow(-30 * DAY_MS), processed: true }),
);
// A fresh event we will mark now; marking triggers the purge sweep
await ingest(stub, makeEvent({ id: "fresh", type: "issues", received_at: isoFromNow(-1 * DAY_MS) }));

const mp = await stub.fetch(
new Request(`${BASE}/mark-processed`, {
method: "POST",
body: JSON.stringify({ event_id: "fresh" }),
}),
);
expect(mp.status).toBe(200);
// the stale "old" row is swept; "fresh" was just marked but is within retention
expect(await mp.json()).toEqual({ success: true, event_id: "fresh", purged: 1 });

// old is gone (404), fresh remains
const oldRes = await stub.fetch(new Request(`${BASE}/event?id=old`));
expect(oldRes.status).toBe(404);
const freshRes = await stub.fetch(new Request(`${BASE}/event?id=fresh`));
expect(freshRes.status).toBe(200);
});

it("keeps unprocessed events regardless of age", async () => {
const stub = storeFor("purge-keeps-unprocessed");
// A very old but UNPROCESSED event must never be purged
await ingest(stub, makeEvent({ id: "ancient", type: "issues", received_at: isoFromNow(-365 * DAY_MS) }));
// A fresh event to mark
await ingest(stub, makeEvent({ id: "trigger", type: "issues", received_at: isoFromNow(-1 * DAY_MS) }));

const mp = await stub.fetch(
new Request(`${BASE}/mark-processed`, {
method: "POST",
body: JSON.stringify({ event_id: "trigger" }),
}),
);
expect(mp.status).toBe(200);
// nothing purged: ancient is unprocessed, trigger is within retention
expect(await mp.json()).toEqual({ success: true, event_id: "trigger", purged: 0 });

// ancient (unprocessed) is still present and still pending
const ancientRes = await stub.fetch(new Request(`${BASE}/event?id=ancient`));
expect(ancientRes.status).toBe(200);
const ancient = (await ancientRes.json()) as WebhookEvent;
expect(ancient.processed).toBe(false);

const statusRes = await stub.fetch(new Request(`${BASE}/pending-status`));
const status = (await statusRes.json()) as PendingStatus;
expect(status.pending_count).toBe(1); // only the unprocessed "ancient"
});
});

describe("WebhookStore: unknown route", () => {
it("returns 404 for an unrecognized path", async () => {
const stub = storeFor("unknown-route");
Expand Down
6 changes: 6 additions & 0 deletions worker/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ main = "src/index.ts"
compatibility_date = "2025-03-26"
compatibility_flags = ["nodejs_compat"]

[vars]
# Retention window (days) for processed webhook events. On mark_processed, the
# WebhookStore DO deletes processed events older than this to bound DO storage.
# Unprocessed events are never purged. 0 = purge processed events immediately.
PURGE_AFTER_DAYS = "7"

[durable_objects]
bindings = [
{ name = "MCP_OBJECT", class_name = "WebhookMcpAgent" },
Expand Down
Loading