Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion apps/sim/app/api/auth/oauth/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ async function performCoalescedRefresh({

const lockKey = `oauth:refresh:${accountId}`

return coalesceLocally(lockKey, () =>
const refreshPromise = coalesceLocally(lockKey, () =>
withLeaderLock<string>({
key: lockKey,
onLeader: async () => {
Expand Down Expand Up @@ -429,6 +429,16 @@ async function performCoalescedRefresh({
},
})
)

try {
return await refreshPromise
} catch (error) {
logger.error('Coalesced refresh did not settle', {
...logContext,
error: toError(error).message,
})
return null
}
Comment thread
icecrasher321 marked this conversation as resolved.
}

export async function getOAuthToken(userId: string, providerId: string): Promise<string | null> {
Expand Down
11 changes: 7 additions & 4 deletions apps/sim/app/api/function/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ let typescriptModulePromise: Promise<TypeScriptModule> | null = null

async function loadTypeScriptModule(): Promise<TypeScriptModule> {
if (!typescriptModulePromise) {
typescriptModulePromise = import('typescript').then((mod) => {
const tsModule = (mod?.default ?? mod) as TypeScriptModule
return tsModule
})
typescriptModulePromise = import('typescript').then(
(mod) => (mod?.default ?? mod) as TypeScriptModule,
(error) => {
typescriptModulePromise = null
throw error
}
)
}

return typescriptModulePromise
Expand Down
100 changes: 99 additions & 1 deletion apps/sim/lib/concurrency/__tests__/singleflight.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
*/
import { sleep } from '@sim/utils/helpers'
import { afterEach, describe, expect, it, vi } from 'vitest'
import { __resetCoalesceLocallyForTests, coalesceLocally } from '@/lib/concurrency/singleflight'
import {
__resetCoalesceLocallyForTests,
CoalesceSettleTimeoutError,
coalesceLocally,
} from '@/lib/concurrency/singleflight'

afterEach(() => {
__resetCoalesceLocallyForTests()
Expand Down Expand Up @@ -57,9 +61,103 @@ describe('coalesceLocally', () => {
await expect(coalesceLocally('rejection', fn)).rejects.toThrow('fail 2')
})

it('surfaces a synchronously-thrown fn error and evicts the entry', async () => {
const fn = vi.fn((): Promise<string> => {
throw new Error('sync boom')
})

// The real error must surface (not a TDZ ReferenceError from the evict
// closure) and the entry must be evicted so the next call retries.
await expect(coalesceLocally('sync-throw', fn)).rejects.toThrow('sync boom')
await expect(coalesceLocally('sync-throw', fn)).rejects.toThrow('sync boom')
expect(fn).toHaveBeenCalledTimes(2)
})

it('does not coalesce across distinct keys', async () => {
const fn = vi.fn(async () => 'value')
await Promise.all([coalesceLocally('a', fn), coalesceLocally('b', fn)])
expect(fn).toHaveBeenCalledTimes(2)
})

it('rejects all awaiters and evicts the entry when the producer misses the settle deadline', async () => {
vi.useFakeTimers()
try {
let resolveHung: (value: string) => void
const hung = vi.fn(
() =>
new Promise<string>((resolve) => {
resolveHung = resolve
})
)

const a = coalesceLocally('wedged', hung)
const b = coalesceLocally('wedged', hung)
const aAssertion = expect(a).rejects.toBeInstanceOf(CoalesceSettleTimeoutError)
const bAssertion = expect(b).rejects.toBeInstanceOf(CoalesceSettleTimeoutError)

await vi.advanceTimersByTimeAsync(30_000)
await aAssertion
await bAssertion
expect(hung).toHaveBeenCalledTimes(1)

const fresh = vi.fn(async () => 'recovered')
await expect(coalesceLocally('wedged', fresh)).resolves.toBe('recovered')
expect(fresh).toHaveBeenCalledTimes(1)

resolveHung!('late')
} finally {
vi.useRealTimers()
}
})

it('a timed-out producer settling late does not evict its successor', async () => {
vi.useFakeTimers()
try {
let resolveOld: (value: string) => void
const old = coalesceLocally(
'late-settle',
() =>
new Promise<string>((resolve) => {
resolveOld = resolve
}),
1_000
)
const oldAssertion = expect(old).rejects.toBeInstanceOf(CoalesceSettleTimeoutError)
await vi.advanceTimersByTimeAsync(1_000)
await oldAssertion

let resolveNew: (value: string) => void
const successor = coalesceLocally(
'late-settle',
() =>
new Promise<string>((resolve) => {
resolveNew = resolve
})
)

resolveOld!('late')
await vi.advanceTimersByTimeAsync(0)

const joined = coalesceLocally('late-settle', async () => 'should-not-run')
expect(joined).toBe(successor)

resolveNew!('new-value')
await expect(successor).resolves.toBe('new-value')
} finally {
vi.useRealTimers()
}
})

it('does not fire the deadline for producers that settle in time', async () => {
vi.useFakeTimers()
try {
const value = await coalesceLocally('prompt', async () => 'ok', 1_000)
expect(value).toBe('ok')

await vi.advanceTimersByTimeAsync(2_000)
await expect(coalesceLocally('prompt', async () => 'again', 1_000)).resolves.toBe('again')
} finally {
vi.useRealTimers()
}
})
})
70 changes: 60 additions & 10 deletions apps/sim/lib/concurrency/singleflight.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,69 @@
const inflight = new Map<string, Promise<unknown>>()

export function coalesceLocally<T>(key: string, fn: () => Promise<T>): Promise<T> {
/**
* Default deadline for a coalesced producer to settle. Joiners share the
* producer's promise, so without a deadline a single hung producer wedges
* every future caller for that key until process restart.
*/
const DEFAULT_SETTLE_TIMEOUT_MS = 30_000

/**
* Thrown to all awaiters when a coalesced producer fails to settle within
* its deadline. The entry is evicted first, so the next caller mints a
* fresh producer instead of joining the wedged one.
*/
export class CoalesceSettleTimeoutError extends Error {
constructor(key: string, timeoutMs: number) {
super(`Coalesced producer for "${key}" did not settle within ${timeoutMs}ms`)
this.name = 'CoalesceSettleTimeoutError'
}
}

/**
* Deduplicates concurrent async work by key within this process: the first
* caller runs `fn`, every concurrent caller for the same key shares its
* promise. The entry is evicted when the producer settles (either way) or
* when the settle deadline fires, whichever comes first. The underlying
* `fn` is not cancelled on timeout — it keeps running detached, but no new
* caller will join it.
*/
export function coalesceLocally<T>(
key: string,
fn: () => Promise<T>,
settleTimeoutMs: number = DEFAULT_SETTLE_TIMEOUT_MS
): Promise<T> {
const existing = inflight.get(key) as Promise<T> | undefined
if (existing) return existing

const promise = (async () => {
try {
return await fn()
} finally {
inflight.delete(key)
}
})()
let timer: ReturnType<typeof setTimeout> | undefined
const evict = () => {
if (inflight.get(key) === guarded) inflight.delete(key)
}

const guarded: Promise<T> = Promise.race([
(async () => {
try {
// Defer fn() to a microtask so a synchronous throw surfaces as a
// rejection after `guarded` and the timer are initialized. Calling it
// inline would run the finally below during construction, touching
// `guarded` in its temporal dead zone and masking fn's real error.
return await Promise.resolve().then(fn)
} finally {
clearTimeout(timer)
evict()
}
})(),
new Promise<never>((_, reject) => {
timer = setTimeout(() => {
evict()
reject(new CoalesceSettleTimeoutError(key, settleTimeoutMs))
}, settleTimeoutMs)
timer.unref?.()
}),
])

inflight.set(key, promise)
return promise
inflight.set(key, guarded)
return guarded
}

export function __resetCoalesceLocallyForTests(): void {
Expand Down
36 changes: 36 additions & 0 deletions apps/sim/lib/mcp/oauth/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,42 @@ describe('withMcpOauthRefreshLock', () => {
}
})

it('bounds the queue wait: callers stalled behind a wedged link reject without running fn', async () => {
vi.useFakeTimers()
try {
mockAcquireLock.mockResolvedValue(true)
let resolveFirst: (value: string) => void
const hungFn = vi.fn(
() =>
new Promise<string>((resolve) => {
resolveFirst = resolve
})
)
const queuedFn = vi.fn(async () => 'second')

const first = withMcpOauthRefreshLock('row-stall', hungFn)
const second = withMcpOauthRefreshLock('row-stall', queuedFn)
const secondAssertion = expect(second).rejects.toThrow(/stalled for/)

await vi.advanceTimersByTimeAsync(90_000)
await secondAssertion
expect(queuedFn).not.toHaveBeenCalled()

// The wedged link is untouched by the queue deadline (its own fn keeps
// the lock, protecting a possibly mid-rotation refresh) and the row
// heals once it settles — including skipping the abandoned link's fn.
resolveFirst!('first')
await expect(first).resolves.toBe('first')

await expect(withMcpOauthRefreshLock('row-stall', async () => 'healed')).resolves.toBe(
'healed'
)
expect(queuedFn).not.toHaveBeenCalled()
} finally {
vi.useRealTimers()
}
})

it('extends the lock TTL while fn() is running so long refreshes do not lose the lock', async () => {
vi.useFakeTimers()
try {
Expand Down
50 changes: 47 additions & 3 deletions apps/sim/lib/mcp/oauth/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ export async function clearState(rowId: string): Promise<void> {
* Two-tier serialization (each caller runs its OWN `fn()` — callers consume
* `McpClient` instances that can't be shared, unlike a scalar access token):
* 1) In-process: per-row Promise chain. Concurrent callers queue; each
* runs `fn()` after the previous settles.
* runs `fn()` after the previous settles. The queue wait is bounded —
* a caller whose turn does not arrive within
* {@link REFRESH_QUEUE_WAIT_TIMEOUT_MS} rejects without ever running
* its `fn()`, so a wedged link cannot accumulate callers indefinitely.
* 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`) with a TTL
* watchdog that periodically extends the lock while `fn()` runs, so
* long-running refreshes don't drop the lock and let another process
Expand All @@ -251,18 +254,59 @@ const REFRESH_LOCK_EXTEND_INTERVAL_MS = 5_000
const REFRESH_POLL_INTERVAL_MS = 100
const REFRESH_MAX_WAIT_MS = 30_000

/**
* Deadline on the in-process QUEUE WAIT only — the time a caller spends
* waiting for its turn behind queued predecessors. Without it, one hung
* link wedges every subsequent caller for that row until process restart.
* Sized to survive one legitimately slow predecessor: up to
* REFRESH_MAX_WAIT_MS of cross-process lock contention plus the MCP SDK's
* 60s initialize timeout. Deliberately NOT applied to the caller's own
* `fn()` run — aborting a running `fn()` would orphan a connected
* `McpClient` and abandon a possibly mid-rotation refresh; `fn()` is
* bounded by its own SDK/HTTP/Redis timeouts instead.
*/
const REFRESH_QUEUE_WAIT_TIMEOUT_MS = 90_000

const inflightChains = new Map<string, Promise<unknown>>()

export async function withMcpOauthRefreshLock<T>(rowId: string, fn: () => Promise<T>): Promise<T> {
const lockKey = `mcp:oauth:refresh:${rowId}`
const prev = inflightChains.get(lockKey) ?? Promise.resolve()
const next = prev.catch(() => undefined).then(() => runWithRedisMutex(lockKey, rowId, fn))
const prevSettled = prev.catch(() => undefined)

let queueTimedOut = false
const next = prevSettled.then(() => {
if (queueTimedOut) {
throw new Error(`MCP OAuth refresh queue for ${rowId} abandoned after timeout`)
}
return runWithRedisMutex(lockKey, rowId, fn)
})
inflightChains.set(lockKey, next)
const cleanup = () => {
if (inflightChains.get(lockKey) === next) inflightChains.delete(lockKey)
}
next.then(cleanup, cleanup)
return next as Promise<T>

let queueTimer: ReturnType<typeof setTimeout> | undefined
const queueDeadline = new Promise<never>((_, reject) => {
queueTimer = setTimeout(() => {
queueTimedOut = true
reject(
new Error(
`MCP OAuth refresh queue for ${rowId} stalled for ${REFRESH_QUEUE_WAIT_TIMEOUT_MS}ms`
)
)
}, REFRESH_QUEUE_WAIT_TIMEOUT_MS)
queueTimer.unref?.()
})

try {
await Promise.race([prevSettled, queueDeadline])
} finally {
clearTimeout(queueTimer)
}

return next
}

async function runWithRedisMutex<T>(
Expand Down
10 changes: 10 additions & 0 deletions apps/sim/lib/oauth/oauth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,15 @@ function extractErrorCode(value: unknown): string | undefined {
return undefined
}

/**
* Hard deadline on the token-endpoint exchange. This function does not coalesce
* on its own; its sole production caller (`performCoalescedRefresh` in the OAuth
* utils) shares one in-flight refresh across concurrent callers for a credential.
* Without this bound a hung endpoint would wedge every joiner on that key until
* the undici socket defaults (~5 min) gave up.
*/
const TOKEN_REFRESH_TIMEOUT_MS = 15_000

export async function refreshOAuthToken(
providerId: string,
refreshToken: string
Expand All @@ -1505,6 +1514,7 @@ export async function refreshOAuthToken(
method: 'POST',
headers,
body: useJsonBody ? JSON.stringify(bodyParams) : new URLSearchParams(bodyParams).toString(),
signal: AbortSignal.timeout(TOKEN_REFRESH_TIMEOUT_MS),
})

if (!response.ok) {
Expand Down
Loading
Loading