Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ All checks passed.
| `get_webhook_events` | Full payloads for all pending events |
| `mark_processed` | Mark an event as processed |

## Event Retention

Stored events are purged automatically to bound Durable Object storage. The Worker
runs a time-based sweep on a Durable Object Alarm (daily), so cleanup happens even
for tenants that never call `mark_processed`:

| Event class | Retention window | Env var | Default |
|-------------|------------------|---------|---------|
| Processed (`mark_processed` called) | older than the window is deleted | `PURGE_AFTER_DAYS` | `7` days |
| Unprocessed (never marked) | older than the window is deleted | `UNPROCESSED_PURGE_AFTER_DAYS` | `90` days |

- The longer window for unprocessed events is intentional: unprocessed means
user-unseen, so the safety margin before dropping is wide (the 7-day vs 90-day
asymmetry is by design).
- The sweep runs via a Durable Object Alarm on a daily cadence and reschedules
itself, so it fires independently of consumption. Processed events are also
purged immediately on `mark_processed` for promptness; the Alarm sweep is the
guarantee that covers abandoned tenants.
- Both windows are configurable in `worker/wrangler.toml` (`[vars]`). Setting a
value to `0` purges that class immediately on sweep.
- Known limitation: the windows bound event *age*, not *volume*. A high-rate,
never-consumed tenant can still reach Cloudflare's 1 GB-per-DO ceiling before
the 90-day window applies. A volume-based hard cap is tracked separately.

## Monorepo Structure

```
Expand Down
6 changes: 4 additions & 2 deletions docs/0-requirements.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ GitHub --POST--> Cloudflare Worker --> TenantRegistry DO
| F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する |
| F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ |
| F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) |
| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ |
| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。処理済み死蔵行による DO ストレージ肥大を防ぐ(即時性のための補助経路。本質的な保証は F2.5 の Alarm sweep が担う) |
| F2.5 | DO Alarm(`ctx.storage.setAlarm` + `alarm()` ハンドラ)による時間駆動の retention sweep を提供する。sweep は (a) `processed=1` かつ `received_at` が `PURGE_AFTER_DAYS` 日(既定 7)より古い処理済みイベントと、(b) `processed=0` かつ `received_at` が `UNPROCESSED_PURGE_AFTER_DAYS` 日(既定 90)より古い未処理イベントを、まとめて削除する。alarm() は実行後に次回 sweep を再スケジュールし、消費ゼロ(`mark_processed` が一度も呼ばれない放置テナント)でも周期実行(日次)される。未処理保持期間を処理済みより長く取るのは未読データ消失の安全域確保のため(7 日 / 90 日の非対称は意図的)。既知の限界(スコープ外): 時間窓は「古さ」を縛るが「量」は縛らないため、高頻度 × 無消費テナントは 90 日窓があっても Cloudflare の 1GB/DO 壁に先に到達しうる。量ベース hard cap は別 issue 候補 |

**イベント構造:**

Expand Down Expand Up @@ -203,7 +204,8 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える
| N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み |
| N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する |
| N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス |
| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 |
| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時および DO Alarm sweep 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 |
| N2.8 | 未処理イベント保持期間 | `UNPROCESSED_PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `90`(日)。DO Alarm sweep 時に保持期間超過の未処理イベントを削除。`0` で未処理を即削除。処理済み(7 日)より長いのは未読データ消失の安全域(非対称は意図的) |

### N3. 制約

Expand Down
6 changes: 4 additions & 2 deletions docs/0-requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ GitHub ──POST──▶ Cloudflare Worker ──▶ TenantRegistry DO
| F2.1 | イベントを UUID 付きで WebhookStore DO の SQLite に保存する |
| F2.2 | 各イベントは id, type, payload, received_at, processed フィールドを持つ |
| F2.3 | trigger_status, last_triggered_at フィールドを保持する(将来の trigger 機能用) |
| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。未処理イベントは保持期間に関わらず削除しない。処理済み死蔵行による DO ストレージ肥大を防ぐ |
| F2.4 | `mark_processed` 実行時に、`processed=1` かつ `received_at` が保持期間(`PURGE_AFTER_DAYS` 日、既定 7)より古いイベントを自動削除する。処理済み死蔵行による DO ストレージ肥大を防ぐ(即時性のための補助経路。本質的な保証は F2.5 の Alarm sweep が担う) |
| F2.5 | DO Alarm(`ctx.storage.setAlarm` + `alarm()` ハンドラ)による時間駆動の retention sweep を提供する。sweep は (a) `processed=1` かつ `received_at` が `PURGE_AFTER_DAYS` 日(既定 7)より古い処理済みイベントと、(b) `processed=0` かつ `received_at` が `UNPROCESSED_PURGE_AFTER_DAYS` 日(既定 90)より古い未処理イベントを、まとめて削除する。alarm() は実行後に次回 sweep を再スケジュールし、消費ゼロ(`mark_processed` が一度も呼ばれない放置テナント)でも周期実行(日次)される。未処理保持期間を処理済みより長く取るのは未読データ消失の安全域確保のため(7 日 / 90 日の非対称は意図的)。既知の限界(スコープ外): 時間窓は「古さ」を縛るが「量」は縛らないため、高頻度 × 無消費テナントは 90 日窓があっても Cloudflare の 1GB/DO 壁に先に到達しうる。量ベース hard cap は別 issue 候補 |

**イベント構造:**

Expand Down Expand Up @@ -203,7 +204,8 @@ Worker は GitHub の web OAuth flow をホストする独自実装を備える
| N2.4 | カスタムドメイン | `github-webhook.smgjp.com` | Cloudflare Worker のカスタムドメインとして設定済み |
| N2.5 | 認証方式 | Worker 自前認証 | Cloudflare Access は使用しない。Worker が webhook secret + Worker-hosted web OAuth で認証を処理する |
| N2.6 | プレビューインスタンス | `preview` 環境 | 本番と同一構成の検証用インスタンス |
| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 |
| N2.7 | 処理済みイベント保持期間 | `PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `7`(日)。`mark_processed` 時および DO Alarm sweep 時に保持期間超過の処理済みイベントを削除。`0` で処理済みを即削除 |
| N2.8 | 未処理イベント保持期間 | `UNPROCESSED_PURGE_AFTER_DAYS` 環境変数(wrangler.toml `[vars]`) | `90`(日)。DO Alarm sweep 時に保持期間超過の未処理イベントを削除。`0` で未処理を即削除。処理済み(7 日)より長いのは未読データ消失の安全域(非対称は意図的) |

**GitHub Webhook 購読イベント:**

Expand Down
14 changes: 14 additions & 0 deletions mcp-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ All tools are read-only except `mark_processed`.

If real-time channel notifications are enabled (Claude Code), step 1 can be skipped — the proxy will push event summaries as soon as the Worker receives them. You still need to call `mark_processed` to clear the queue.

## Event retention

The Worker purges stored events automatically so Durable Object storage stays bounded. A time-based sweep runs on a Durable Object Alarm (daily) and reschedules itself, so cleanup happens even when `mark_processed` is never called.

| Event class | Retention window | Worker env var | Default |
|---|---|---|---|
| Processed (`mark_processed` called) | events older than the window are deleted | `PURGE_AFTER_DAYS` | `7` days |
| Unprocessed (never marked) | events older than the window are deleted | `UNPROCESSED_PURGE_AFTER_DAYS` | `90` days |

- The longer window for unprocessed events is intentional — unprocessed means user-unseen, so the margin before dropping is wide (the 7-day vs 90-day asymmetry is by design).
- Processed events are also purged immediately on `mark_processed` for promptness; the Alarm sweep is the guarantee that covers tenants that stop consuming.
- Both windows are Worker-side configuration (`worker/wrangler.toml` `[vars]`); set a value to `0` to purge that class immediately on sweep. These env vars live on the Worker, not in this proxy.
- Known limitation: the windows bound event *age*, not *volume*. A high-rate, never-consumed tenant can still hit Cloudflare's 1 GB-per-DO ceiling before the 90-day window applies.

## Authentication flow

1. On first tool call (or on startup if cached tokens exist), the proxy discovers OAuth metadata at `${WEBHOOK_WORKER_URL}/.well-known/oauth-authorization-server`.
Expand Down
110 changes: 102 additions & 8 deletions worker/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,52 @@ import { summarizeEvent } from "../../shared/src/summarize.js";
*/
interface StoreEnv {
PURGE_AFTER_DAYS?: string;
UNPROCESSED_PURGE_AFTER_DAYS?: string;
}

/** Default retention window (days) for processed events when PURGE_AFTER_DAYS is unset. */
const DEFAULT_PURGE_DAYS = 7;

/**
* Resolve the retention window from env. Falls back to DEFAULT_PURGE_DAYS when
* unset / non-numeric / negative. A value of 0 means "purge all processed
* events immediately on mark".
* Default retention window (days) for UNPROCESSED events when
* UNPROCESSED_PURGE_AFTER_DAYS is unset. Deliberately much longer than the
* processed window: unprocessed = user-unseen, so the safety margin before
* silently dropping is wide (the 7d-vs-90d asymmetry is intentional).
*/
function purgeDays(env: StoreEnv): number {
const raw = env.PURGE_AFTER_DAYS;
const DEFAULT_UNPROCESSED_PURGE_DAYS = 90;

/** Sweep cadence: how often the DO Alarm fires to run the time-based purge. */
const SWEEP_INTERVAL_MS = 86_400_000; // 24h (daily)

/**
* Resolve a retention window (days) from an env value. Falls back to
* `fallback` when unset / non-numeric / negative. A value of 0 means "purge
* matching events immediately" (no retention).
*/
function resolveDays(raw: string | undefined, fallback: number): number {
if (raw !== undefined) {
const n = Number(raw);
if (Number.isFinite(n) && n >= 0) return n;
}
return DEFAULT_PURGE_DAYS;
return fallback;
}

/** Retention window (days) for processed events. */
function purgeDays(env: StoreEnv): number {
return resolveDays(env.PURGE_AFTER_DAYS, DEFAULT_PURGE_DAYS);
}

/** Retention window (days) for unprocessed events. */
function unprocessedPurgeDays(env: StoreEnv): number {
return resolveDays(env.UNPROCESSED_PURGE_AFTER_DAYS, DEFAULT_UNPROCESSED_PURGE_DAYS);
}

export class WebhookStore extends DurableObject<StoreEnv> {
private initialized = false;
private sseWriters = new Set<WritableStreamDefaultWriter<Uint8Array>>();
private sseEncoder = new TextEncoder();

private ensureTable() {
private ensureSchema() {
if (this.initialized) return;
this.ctx.storage.sql.exec(`
CREATE TABLE IF NOT EXISTS events (
Expand All @@ -54,6 +75,69 @@ export class WebhookStore extends DurableObject<StoreEnv> {
this.initialized = true;
}

/**
* Initialize on the request path: create the schema and ensure the recurring
* sweep alarm is armed. Alarm arming is split out of the alarm() handler,
* which reschedules itself directly.
*/
private async ensureTable() {
this.ensureSchema();
await this.ensureAlarm();
}

/**
* Arm the recurring sweep alarm if one is not already scheduled. This is the
* consumption-independent path: abandoned tenants never call mark_processed,
* so a mark-only trigger would never reach the very tenants whose unprocessed
* events accumulate. The DO Alarm fires even with zero consumption (#236).
*/
private async ensureAlarm() {
const existing = await this.ctx.storage.getAlarm();
if (existing === null) {
await this.ctx.storage.setAlarm(Date.now() + SWEEP_INTERVAL_MS);
}
}

/**
* Time-based retention sweep. Deletes:
* - processed events older than PURGE_AFTER_DAYS (default 7), and
* - UNPROCESSED events older than UNPROCESSED_PURGE_AFTER_DAYS (default 90).
* Returns the per-class purged counts. Bounds DO storage growth from the only
* remaining unbounded path (unprocessed rows on abandoned tenants, #236).
*/
private sweep(): { processed: number; unprocessed: number } {
const now = Date.now();

const processedCutoff = new Date(now - purgeDays(this.env) * 86_400_000).toISOString();
const processedCursor = this.ctx.storage.sql.exec(
`DELETE FROM events WHERE processed = 1 AND received_at < ?`,
processedCutoff,
);
const processed = processedCursor.rowsWritten;

const unprocessedCutoff = new Date(now - unprocessedPurgeDays(this.env) * 86_400_000).toISOString();
const unprocessedCursor = this.ctx.storage.sql.exec(
`DELETE FROM events WHERE processed = 0 AND received_at < ?`,
unprocessedCutoff,
);
const unprocessed = unprocessedCursor.rowsWritten;

return { processed, unprocessed };
}

/**
* DO Alarm handler — the consumption-independent retention guarantee. Runs the
* full sweep (processed + unprocessed) and reschedules the next sweep so the
* cycle keeps running on a hibernating, never-consumed tenant.
*/
async alarm() {
this.ensureSchema();
this.sweep();
// Reschedule the next periodic sweep. alarm() is invoked after the prior
// alarm has been cleared, so this re-arms the recurring cycle.
await this.ctx.storage.setAlarm(Date.now() + SWEEP_INTERVAL_MS);
}

/** Broadcast to all connected WebSocket and SSE clients */
private broadcast(data: unknown) {
const msg = JSON.stringify(data);
Expand All @@ -78,7 +162,7 @@ export class WebhookStore extends DurableObject<StoreEnv> {
}

async fetch(request: Request): Promise<Response> {
this.ensureTable();
await this.ensureTable();
const url = new URL(request.url);

// ── WebSocket upgrade ──
Expand Down Expand Up @@ -257,6 +341,16 @@ export class WebhookStore extends DurableObject<StoreEnv> {
return Response.json({ success: true, event_id, purged });
}

// ── sweep (time-based retention purge) ──
// Runs the same sweep the DO Alarm runs: purge processed > PURGE_AFTER_DAYS
// and unprocessed > UNPROCESSED_PURGE_AFTER_DAYS. Exposed as a route so the
// sweep can be driven on demand (and asserted in tests) independently of the
// alarm scheduler.
if (url.pathname === "/sweep" && request.method === "POST") {
const purged = this.sweep();
return Response.json({ success: true, ...purged });
}

return new Response("Not found", { status: 404 });
}

Expand Down
Loading
Loading