diff --git a/apps/sim/app/api/auth/oauth/utils.ts b/apps/sim/app/api/auth/oauth/utils.ts index bbfdb0135be..732f157d581 100644 --- a/apps/sim/app/api/auth/oauth/utils.ts +++ b/apps/sim/app/api/auth/oauth/utils.ts @@ -358,7 +358,7 @@ async function performCoalescedRefresh({ const lockKey = `oauth:refresh:${accountId}` - return coalesceLocally(lockKey, () => + const refreshPromise = coalesceLocally(lockKey, () => withLeaderLock({ key: lockKey, onLeader: async () => { @@ -429,6 +429,16 @@ async function performCoalescedRefresh({ }, }) ) + + try { + return await refreshPromise + } catch (error) { + logger.error('Coalesced refresh did not settle', { + ...logContext, + error: toError(error).message, + }) + return null + } } export async function getOAuthToken(userId: string, providerId: string): Promise { diff --git a/apps/sim/app/api/function/execute/route.ts b/apps/sim/app/api/function/execute/route.ts index a3035607e52..9ac6ab2da77 100644 --- a/apps/sim/app/api/function/execute/route.ts +++ b/apps/sim/app/api/function/execute/route.ts @@ -115,10 +115,13 @@ let typescriptModulePromise: Promise | null = null async function loadTypeScriptModule(): Promise { if (!typescriptModulePromise) { - typescriptModulePromise = import('typescript').then((mod) => { - const tsModule = (mod?.default ?? mod) as TypeScriptModule - return tsModule - }) + typescriptModulePromise = import('typescript').then( + (mod) => (mod?.default ?? mod) as TypeScriptModule, + (error) => { + typescriptModulePromise = null + throw error + } + ) } return typescriptModulePromise diff --git a/apps/sim/lib/concurrency/__tests__/singleflight.test.ts b/apps/sim/lib/concurrency/__tests__/singleflight.test.ts index f911266be7b..0599127bb5f 100644 --- a/apps/sim/lib/concurrency/__tests__/singleflight.test.ts +++ b/apps/sim/lib/concurrency/__tests__/singleflight.test.ts @@ -3,7 +3,11 @@ */ import { sleep } from '@sim/utils/helpers' import { afterEach, describe, expect, it, vi } from 'vitest' -import { __resetCoalesceLocallyForTests, coalesceLocally } from '@/lib/concurrency/singleflight' +import { + __resetCoalesceLocallyForTests, + CoalesceSettleTimeoutError, + coalesceLocally, +} from '@/lib/concurrency/singleflight' afterEach(() => { __resetCoalesceLocallyForTests() @@ -57,9 +61,103 @@ describe('coalesceLocally', () => { await expect(coalesceLocally('rejection', fn)).rejects.toThrow('fail 2') }) + it('surfaces a synchronously-thrown fn error and evicts the entry', async () => { + const fn = vi.fn((): Promise => { + throw new Error('sync boom') + }) + + // The real error must surface (not a TDZ ReferenceError from the evict + // closure) and the entry must be evicted so the next call retries. + await expect(coalesceLocally('sync-throw', fn)).rejects.toThrow('sync boom') + await expect(coalesceLocally('sync-throw', fn)).rejects.toThrow('sync boom') + expect(fn).toHaveBeenCalledTimes(2) + }) + it('does not coalesce across distinct keys', async () => { const fn = vi.fn(async () => 'value') await Promise.all([coalesceLocally('a', fn), coalesceLocally('b', fn)]) expect(fn).toHaveBeenCalledTimes(2) }) + + it('rejects all awaiters and evicts the entry when the producer misses the settle deadline', async () => { + vi.useFakeTimers() + try { + let resolveHung: (value: string) => void + const hung = vi.fn( + () => + new Promise((resolve) => { + resolveHung = resolve + }) + ) + + const a = coalesceLocally('wedged', hung) + const b = coalesceLocally('wedged', hung) + const aAssertion = expect(a).rejects.toBeInstanceOf(CoalesceSettleTimeoutError) + const bAssertion = expect(b).rejects.toBeInstanceOf(CoalesceSettleTimeoutError) + + await vi.advanceTimersByTimeAsync(30_000) + await aAssertion + await bAssertion + expect(hung).toHaveBeenCalledTimes(1) + + const fresh = vi.fn(async () => 'recovered') + await expect(coalesceLocally('wedged', fresh)).resolves.toBe('recovered') + expect(fresh).toHaveBeenCalledTimes(1) + + resolveHung!('late') + } finally { + vi.useRealTimers() + } + }) + + it('a timed-out producer settling late does not evict its successor', async () => { + vi.useFakeTimers() + try { + let resolveOld: (value: string) => void + const old = coalesceLocally( + 'late-settle', + () => + new Promise((resolve) => { + resolveOld = resolve + }), + 1_000 + ) + const oldAssertion = expect(old).rejects.toBeInstanceOf(CoalesceSettleTimeoutError) + await vi.advanceTimersByTimeAsync(1_000) + await oldAssertion + + let resolveNew: (value: string) => void + const successor = coalesceLocally( + 'late-settle', + () => + new Promise((resolve) => { + resolveNew = resolve + }) + ) + + resolveOld!('late') + await vi.advanceTimersByTimeAsync(0) + + const joined = coalesceLocally('late-settle', async () => 'should-not-run') + expect(joined).toBe(successor) + + resolveNew!('new-value') + await expect(successor).resolves.toBe('new-value') + } finally { + vi.useRealTimers() + } + }) + + it('does not fire the deadline for producers that settle in time', async () => { + vi.useFakeTimers() + try { + const value = await coalesceLocally('prompt', async () => 'ok', 1_000) + expect(value).toBe('ok') + + await vi.advanceTimersByTimeAsync(2_000) + await expect(coalesceLocally('prompt', async () => 'again', 1_000)).resolves.toBe('again') + } finally { + vi.useRealTimers() + } + }) }) diff --git a/apps/sim/lib/concurrency/singleflight.ts b/apps/sim/lib/concurrency/singleflight.ts index f15ae06f9da..a41fbeda455 100644 --- a/apps/sim/lib/concurrency/singleflight.ts +++ b/apps/sim/lib/concurrency/singleflight.ts @@ -1,19 +1,69 @@ const inflight = new Map>() -export function coalesceLocally(key: string, fn: () => Promise): Promise { +/** + * Default deadline for a coalesced producer to settle. Joiners share the + * producer's promise, so without a deadline a single hung producer wedges + * every future caller for that key until process restart. + */ +const DEFAULT_SETTLE_TIMEOUT_MS = 30_000 + +/** + * Thrown to all awaiters when a coalesced producer fails to settle within + * its deadline. The entry is evicted first, so the next caller mints a + * fresh producer instead of joining the wedged one. + */ +export class CoalesceSettleTimeoutError extends Error { + constructor(key: string, timeoutMs: number) { + super(`Coalesced producer for "${key}" did not settle within ${timeoutMs}ms`) + this.name = 'CoalesceSettleTimeoutError' + } +} + +/** + * Deduplicates concurrent async work by key within this process: the first + * caller runs `fn`, every concurrent caller for the same key shares its + * promise. The entry is evicted when the producer settles (either way) or + * when the settle deadline fires, whichever comes first. The underlying + * `fn` is not cancelled on timeout — it keeps running detached, but no new + * caller will join it. + */ +export function coalesceLocally( + key: string, + fn: () => Promise, + settleTimeoutMs: number = DEFAULT_SETTLE_TIMEOUT_MS +): Promise { const existing = inflight.get(key) as Promise | undefined if (existing) return existing - const promise = (async () => { - try { - return await fn() - } finally { - inflight.delete(key) - } - })() + let timer: ReturnType | undefined + const evict = () => { + if (inflight.get(key) === guarded) inflight.delete(key) + } + + const guarded: Promise = Promise.race([ + (async () => { + try { + // Defer fn() to a microtask so a synchronous throw surfaces as a + // rejection after `guarded` and the timer are initialized. Calling it + // inline would run the finally below during construction, touching + // `guarded` in its temporal dead zone and masking fn's real error. + return await Promise.resolve().then(fn) + } finally { + clearTimeout(timer) + evict() + } + })(), + new Promise((_, reject) => { + timer = setTimeout(() => { + evict() + reject(new CoalesceSettleTimeoutError(key, settleTimeoutMs)) + }, settleTimeoutMs) + timer.unref?.() + }), + ]) - inflight.set(key, promise) - return promise + inflight.set(key, guarded) + return guarded } export function __resetCoalesceLocallyForTests(): void { diff --git a/apps/sim/lib/mcp/oauth/storage.test.ts b/apps/sim/lib/mcp/oauth/storage.test.ts index 61455b36135..b9d07e779f7 100644 --- a/apps/sim/lib/mcp/oauth/storage.test.ts +++ b/apps/sim/lib/mcp/oauth/storage.test.ts @@ -221,6 +221,42 @@ describe('withMcpOauthRefreshLock', () => { } }) + it('bounds the queue wait: callers stalled behind a wedged link reject without running fn', async () => { + vi.useFakeTimers() + try { + mockAcquireLock.mockResolvedValue(true) + let resolveFirst: (value: string) => void + const hungFn = vi.fn( + () => + new Promise((resolve) => { + resolveFirst = resolve + }) + ) + const queuedFn = vi.fn(async () => 'second') + + const first = withMcpOauthRefreshLock('row-stall', hungFn) + const second = withMcpOauthRefreshLock('row-stall', queuedFn) + const secondAssertion = expect(second).rejects.toThrow(/stalled for/) + + await vi.advanceTimersByTimeAsync(90_000) + await secondAssertion + expect(queuedFn).not.toHaveBeenCalled() + + // The wedged link is untouched by the queue deadline (its own fn keeps + // the lock, protecting a possibly mid-rotation refresh) and the row + // heals once it settles — including skipping the abandoned link's fn. + resolveFirst!('first') + await expect(first).resolves.toBe('first') + + await expect(withMcpOauthRefreshLock('row-stall', async () => 'healed')).resolves.toBe( + 'healed' + ) + expect(queuedFn).not.toHaveBeenCalled() + } finally { + vi.useRealTimers() + } + }) + it('extends the lock TTL while fn() is running so long refreshes do not lose the lock', async () => { vi.useFakeTimers() try { diff --git a/apps/sim/lib/mcp/oauth/storage.ts b/apps/sim/lib/mcp/oauth/storage.ts index aca0fbf5ec6..b2e0a7b13b5 100644 --- a/apps/sim/lib/mcp/oauth/storage.ts +++ b/apps/sim/lib/mcp/oauth/storage.ts @@ -237,7 +237,10 @@ export async function clearState(rowId: string): Promise { * Two-tier serialization (each caller runs its OWN `fn()` — callers consume * `McpClient` instances that can't be shared, unlike a scalar access token): * 1) In-process: per-row Promise chain. Concurrent callers queue; each - * runs `fn()` after the previous settles. + * runs `fn()` after the previous settles. The queue wait is bounded — + * a caller whose turn does not arrive within + * {@link REFRESH_QUEUE_WAIT_TIMEOUT_MS} rejects without ever running + * its `fn()`, so a wedged link cannot accumulate callers indefinitely. * 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`) with a TTL * watchdog that periodically extends the lock while `fn()` runs, so * long-running refreshes don't drop the lock and let another process @@ -251,18 +254,59 @@ const REFRESH_LOCK_EXTEND_INTERVAL_MS = 5_000 const REFRESH_POLL_INTERVAL_MS = 100 const REFRESH_MAX_WAIT_MS = 30_000 +/** + * Deadline on the in-process QUEUE WAIT only — the time a caller spends + * waiting for its turn behind queued predecessors. Without it, one hung + * link wedges every subsequent caller for that row until process restart. + * Sized to survive one legitimately slow predecessor: up to + * REFRESH_MAX_WAIT_MS of cross-process lock contention plus the MCP SDK's + * 60s initialize timeout. Deliberately NOT applied to the caller's own + * `fn()` run — aborting a running `fn()` would orphan a connected + * `McpClient` and abandon a possibly mid-rotation refresh; `fn()` is + * bounded by its own SDK/HTTP/Redis timeouts instead. + */ +const REFRESH_QUEUE_WAIT_TIMEOUT_MS = 90_000 + const inflightChains = new Map>() export async function withMcpOauthRefreshLock(rowId: string, fn: () => Promise): Promise { const lockKey = `mcp:oauth:refresh:${rowId}` const prev = inflightChains.get(lockKey) ?? Promise.resolve() - const next = prev.catch(() => undefined).then(() => runWithRedisMutex(lockKey, rowId, fn)) + const prevSettled = prev.catch(() => undefined) + + let queueTimedOut = false + const next = prevSettled.then(() => { + if (queueTimedOut) { + throw new Error(`MCP OAuth refresh queue for ${rowId} abandoned after timeout`) + } + return runWithRedisMutex(lockKey, rowId, fn) + }) inflightChains.set(lockKey, next) const cleanup = () => { if (inflightChains.get(lockKey) === next) inflightChains.delete(lockKey) } next.then(cleanup, cleanup) - return next as Promise + + let queueTimer: ReturnType | undefined + const queueDeadline = new Promise((_, reject) => { + queueTimer = setTimeout(() => { + queueTimedOut = true + reject( + new Error( + `MCP OAuth refresh queue for ${rowId} stalled for ${REFRESH_QUEUE_WAIT_TIMEOUT_MS}ms` + ) + ) + }, REFRESH_QUEUE_WAIT_TIMEOUT_MS) + queueTimer.unref?.() + }) + + try { + await Promise.race([prevSettled, queueDeadline]) + } finally { + clearTimeout(queueTimer) + } + + return next } async function runWithRedisMutex( diff --git a/apps/sim/lib/oauth/oauth.ts b/apps/sim/lib/oauth/oauth.ts index 9cca712769f..e694876bea2 100644 --- a/apps/sim/lib/oauth/oauth.ts +++ b/apps/sim/lib/oauth/oauth.ts @@ -1490,6 +1490,15 @@ function extractErrorCode(value: unknown): string | undefined { return undefined } +/** + * Hard deadline on the token-endpoint exchange. This function does not coalesce + * on its own; its sole production caller (`performCoalescedRefresh` in the OAuth + * utils) shares one in-flight refresh across concurrent callers for a credential. + * Without this bound a hung endpoint would wedge every joiner on that key until + * the undici socket defaults (~5 min) gave up. + */ +const TOKEN_REFRESH_TIMEOUT_MS = 15_000 + export async function refreshOAuthToken( providerId: string, refreshToken: string @@ -1505,6 +1514,7 @@ export async function refreshOAuthToken( method: 'POST', headers, body: useJsonBody ? JSON.stringify(bodyParams) : new URLSearchParams(bodyParams).toString(), + signal: AbortSignal.timeout(TOKEN_REFRESH_TIMEOUT_MS), }) if (!response.ok) { diff --git a/apps/sim/lib/webhooks/deploy.ts b/apps/sim/lib/webhooks/deploy.ts index 8b204288b2a..686c974df5a 100644 --- a/apps/sim/lib/webhooks/deploy.ts +++ b/apps/sim/lib/webhooks/deploy.ts @@ -932,6 +932,10 @@ async function persistCreatedWebhookRecordAfterCleanupFailure({ * Removes external subscriptions and deletes webhook records from the database. * * @param skipExternalCleanup - If true, skip external subscription cleanup (already done elsewhere) + * @param shouldDeleteWebhook - Best-effort early-exit probe. Its implementations + * query the global pool, so it MUST only be awaited while no transaction is open. + * See {@link deleteWebhookRecordAfterCleanup} for the in-transaction recheck that + * makes this probe non-authoritative. */ export async function cleanupWebhooksForWorkflow( workflowId: string, @@ -1036,6 +1040,16 @@ export async function cleanupWebhooksForWorkflow( ) } +/** + * Deletes a webhook record unless the deployment became active again. + * + * `shouldDeleteWebhook` is awaited BEFORE the transaction opens — its + * implementations query the global pool, so running it inside the + * transaction would nest a second pooled checkout under the held + * connection. The transaction does not need it: the `FOR UPDATE` select + * on the deployment version row is the authoritative recheck, and it + * aborts the delete if the version was reactivated. + */ async function deleteWebhookRecordAfterCleanup(params: { workflowId: string deploymentVersionId?: string | null diff --git a/apps/sim/tools/vanta/utils.ts b/apps/sim/tools/vanta/utils.ts index 356ff7b32dc..4a9c3a00ccc 100644 --- a/apps/sim/tools/vanta/utils.ts +++ b/apps/sim/tools/vanta/utils.ts @@ -153,6 +153,13 @@ const vantaTokenExchanges = new Map>() /** Evict cached tokens well before their one-hour expiry. */ const VANTA_TOKEN_EXPIRY_BUFFER_MS = 10 * 60 * 1000 +/** + * Hard deadline on the token-endpoint exchange. The exchange promise is + * shared across concurrent callers, so a hung endpoint without this bound + * would wedge every joiner until the undici socket defaults (~5 min) gave up. + */ +const VANTA_TOKEN_EXCHANGE_TIMEOUT_MS = 15_000 + /** * Derives the cache key for a credential set. The client secret is included * only as a SHA-256 digest so plaintext secrets never persist in the @@ -183,6 +190,7 @@ async function exchangeVantaToken(params: VantaTokenParams, cacheKey: string): P grant_type: 'client_credentials', }), cache: 'no-store', + signal: AbortSignal.timeout(VANTA_TOKEN_EXCHANGE_TIMEOUT_MS), }) const data: unknown = await response.json().catch(() => null)