Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async function recoverStaleDatabaseScheduleJobs(now: Date): Promise<void> {

await db.transaction(async (tx) => {
const [lock] = await tx.execute<{ acquired: boolean }>(
sql`SELECT pg_try_advisory_xact_lock(hashtext(${SCHEDULE_EXECUTION_QUEUE_NAME})) AS acquired`
sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired`
)
Comment thread
icecrasher321 marked this conversation as resolved.
if (!lock?.acquired) {
logger.info(
Expand Down Expand Up @@ -527,7 +527,7 @@ async function tryStartDatabaseScheduleJob(jobId: string): Promise<DatabaseSched

return db.transaction(async (tx) => {
const [lock] = await tx.execute<{ acquired: boolean }>(
sql`SELECT pg_try_advisory_xact_lock(hashtext(${SCHEDULE_EXECUTION_QUEUE_NAME})) AS acquired`
sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired`
)
if (!lock?.acquired) return 'capacity_full'

Expand Down
12 changes: 11 additions & 1 deletion apps/sim/app/api/workspaces/[id]/byok-keys/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/per

const logger = createLogger('WorkspaceBYOKKeysAPI')

/**
* Bounds the per-provider BYOK advisory-lock wait so a stuck holder fails fast
* (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a
* server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`.
*/
const WORKSPACE_BYOK_LOCK_TIMEOUT_MS = 5_000

function maskApiKey(key: string): string {
if (key.length <= 8) {
return '•'.repeat(8)
Expand Down Expand Up @@ -203,7 +210,10 @@ export const POST = withRouteHandler(

const newKey = await db.transaction(async (tx) => {
await tx.execute(
sql`SELECT pg_advisory_xact_lock(hashtext(${`byok:${workspaceId}:${providerId}`}))`
sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_BYOK_LOCK_TIMEOUT_MS}ms`}, true)`
)
await tx.execute(
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`byok:${workspaceId}:${providerId}`}, 0))`
)

const [{ keyCount }] = await tx
Expand Down
17 changes: 15 additions & 2 deletions apps/sim/app/api/workspaces/[id]/environment/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ import {

const logger = createLogger('WorkspaceEnvironmentAPI')

/**
* Bounds the workspace-environment advisory-lock wait so a stuck holder fails
* fast (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a
* server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`.
*/
const WORKSPACE_ENV_LOCK_TIMEOUT_MS = 5_000

/**
* Restricts decrypted workspace env values to administrators. Members (including
* read-only) receive the variable names with empty values so editor autocomplete
Expand Down Expand Up @@ -200,7 +207,10 @@ export const PUT = withRouteHandler(
).then((entries) => Object.fromEntries(entries))

const { existingEncrypted, merged } = await db.transaction(async (tx) => {
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${workspaceId}))`)
await tx.execute(
sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_ENV_LOCK_TIMEOUT_MS}ms`}, true)`
)
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${workspaceId}, 0))`)

const [existingRow] = await tx
.select()
Expand Down Expand Up @@ -328,7 +338,10 @@ export const DELETE = withRouteHandler(
}

const result = await db.transaction(async (tx) => {
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${workspaceId}))`)
await tx.execute(
sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_ENV_LOCK_TIMEOUT_MS}ms`}, true)`
)
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${workspaceId}, 0))`)

const [existingRow] = await tx
.select()
Expand Down
97 changes: 97 additions & 0 deletions apps/sim/lib/billing/organizations/lock-order.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* @vitest-environment node
*
* Lock-order regression guard: the paid-org join billing transaction must lock
* the personal Pro subscription BEFORE userStats, matching
* restoreUserProSubscription's subscription → userStats order. Snapshotting
* userStats before locking the subscription inverts that pair and deadlocks a
* concurrent Pro restore for the same user.
*/
import { subscription as subscriptionTable, userStats } from '@sim/db/schema'
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { reapplyPaidOrgJoinBillingForExistingMember } from '@/lib/billing/organizations/membership'

vi.mock('@sim/db', () => dbChainMock)

vi.mock('@/lib/core/outbox/service', () => ({
enqueueOutboxEvent: vi.fn(),
}))

/**
* A superset row that satisfies every read in the join path: a paid org sub, a
* still-active personal Pro to pause, non-zero usage to snapshot, and zero
* storage (so the conditional org storage-transfer write is skipped — the org
* lock under test is the canonical pre-lock, not the storage update).
*/
const GENERIC_ROW = {
id: 'sub-1',
plan: 'team',
referenceId: 'user-1',
status: 'active',
cancelAtPeriodEnd: false,
stripeSubscriptionId: 'stripe-1',
currentPeriodCost: '5',
proPeriodCostSnapshot: '0',
storageUsedBytes: 0,
}

type LockOp = { op: 'lock' | 'update' | 'insert'; table: unknown }

function createRecordingTx() {
const ops: LockOp[] = []
const select = () => {
const ctx: { table: unknown } = { table: undefined }
const chain = {
from: (table: unknown) => {
ctx.table = table
return chain
},
where: () => chain,
for: () => {
ops.push({ op: 'lock', table: ctx.table })
return chain
},
limit: async () => [GENERIC_ROW],
}
return chain
}
const tx = {
select,
update: (table: unknown) => ({
set: () => ({
where: async () => {
ops.push({ op: 'update', table })
},
}),
}),
insert: (table: unknown) => ({
values: async () => {
ops.push({ op: 'insert', table })
},
}),
execute: async () => [],
}
return { tx, ops }
}

describe('paid-org join billing lock ordering', () => {
beforeEach(() => {
vi.clearAllMocks()
resetDbChainMock()
})

it('locks the personal subscription before mutating userStats', async () => {
const { tx, ops } = createRecordingTx()
dbChainMockFns.transaction.mockImplementation(async (cb: (t: unknown) => unknown) => cb(tx))

await reapplyPaidOrgJoinBillingForExistingMember('user-1', 'org-1')

const firstUserStatsUpdate = ops.findIndex((o) => o.op === 'update' && o.table === userStats)
const subscriptionLock = ops.findIndex((o) => o.op === 'lock' && o.table === subscriptionTable)

expect(firstUserStatsUpdate).toBeGreaterThanOrEqual(0)
expect(subscriptionLock).toBeGreaterThanOrEqual(0)
expect(subscriptionLock).toBeLessThan(firstUserStatsUpdate)
})
})
18 changes: 11 additions & 7 deletions apps/sim/lib/billing/organizations/membership.ts
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,17 @@ async function applyPaidOrgJoinBillingTx(
.limit(1)

if (personalPro && !personalPro.cancelAtPeriodEnd) {
// Lock the personal Pro subscription before userStats (snapshotted below) so
// this matches restoreUserProSubscription's subscription → userStats order
// and cannot deadlock against a concurrent Pro restore for the same user.
// The cancel update further down re-locks this row (no-op).
await tx
.select({ id: subscriptionTable.id })
.from(subscriptionTable)
.where(eq(subscriptionTable.id, personalPro.id))
.for('update')
.limit(1)

const [userStatsRow] = await tx
.select({ currentPeriodCost: userStats.currentPeriodCost })
.from(userStats)
Expand Down Expand Up @@ -1025,13 +1036,6 @@ export async function removeUserFromOrganization(
const captureDepartedUsage = async () => {
if (skipBillingLogic) return 0

await tx
.select({ id: organization.id })
.from(organization)
.where(eq(organization.id, organizationId))
.for('update')
.limit(1)

const [departingUserStats] = await tx
.select({ currentPeriodCost: userStats.currentPeriodCost })
.from(userStats)
Expand Down
25 changes: 17 additions & 8 deletions apps/sim/lib/billing/webhooks/invoices.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,12 @@ describe('invoice billing recovery', () => {
expect(mockBlockOrgMembers).not.toHaveBeenCalled()
})

it('coordinates org usage reset with owner tracker and organization locks', async () => {
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] })
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] })
queueSelectResponse({ limitResult: [{ id: 'org-1' }] })
queueSelectResponse({ whereResult: [{ userId: 'owner-1' }, { userId: 'member-1' }] })
it('locks member userStats before the organization row during usage reset', async () => {
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) // owner member row
queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) // owner userStats
queueSelectResponse({ whereResult: [{ userId: 'owner-1' }, { userId: 'member-1' }] }) // member ids
queueSelectResponse({ whereResult: [] }) // all-member userStats FOR UPDATE (pre-org lock)
queueSelectResponse({ limitResult: [{ id: 'org-1' }] }) // organization
queueSelectResponse({
whereResult: [
{ userId: 'owner-1', current: '125', currentCopilot: '10' },
Expand All @@ -248,9 +249,17 @@ describe('invoice billing recovery', () => {

expect(dbChainMockFns.transaction).toHaveBeenCalledTimes(1)
expect(dbChainMockFns.update).toHaveBeenCalledTimes(2)
expect(Object.keys(dbChainMockFns.select.mock.calls[0][0] ?? {})).toEqual(['userId'])
expect(Object.keys(dbChainMockFns.select.mock.calls[1][0] ?? {})).toEqual(['userId'])
expect(Object.keys(dbChainMockFns.select.mock.calls[2][0] ?? {})).toEqual(['id'])

const whereArgs = dbChainMockFns.where.mock.calls.map(
(call) => call[0] as { type?: string; column?: string; left?: string }
)
const allMemberStatsLockIndex = whereArgs.findIndex(
(arg) => arg?.type === 'inArray' && arg?.column === 'userId'
)
const orgLockIndex = whereArgs.findIndex((arg) => arg?.type === 'eq' && arg?.left === 'id')
expect(allMemberStatsLockIndex).toBeGreaterThanOrEqual(0)
expect(orgLockIndex).toBeGreaterThanOrEqual(0)
expect(allMemberStatsLockIndex).toBeLessThan(orgLockIndex)

const statsReset = dbChainMockFns.set.mock.calls[0][0] as Record<string, unknown>
expect(statsReset.currentPeriodCost).not.toBe('0')
Comment thread
icecrasher321 marked this conversation as resolved.
Expand Down
27 changes: 20 additions & 7 deletions apps/sim/lib/billing/webhooks/invoices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,19 +457,32 @@ export async function resetUsageForSubscription(sub: {
.limit(1)
}

await tx
.select({ id: organization.id })
.from(organization)
.where(eq(organization.id, sub.referenceId))
.for('update')
.limit(1)

const membersRows = await tx
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, sub.referenceId))

const memberIds = membersRows.map((row) => row.userId)

// Lock every member's userStats before the organization row so this path
// follows the canonical userStats → organization order shared by the
// join, remove, threshold-billing, and storage-transfer paths. Locking
// organization first would invert against them and risk an AB-BA
// deadlock. The per-member UPDATE below re-locks these rows (no-op).
if (memberIds.length > 0) {
await tx
.select({ userId: userStats.userId })
.from(userStats)
.where(inArray(userStats.userId, memberIds))
.for('update')
}

await tx
.select({ id: organization.id })
.from(organization)
.where(eq(organization.id, sub.referenceId))
.for('update')
.limit(1)
if (memberIds.length > 0) {
const memberStatsRows = await tx
.select({
Expand Down
44 changes: 44 additions & 0 deletions apps/sim/lib/knowledge/documents/lock-order.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* @vitest-environment node
*
* Lock-order regression guard: `updateDocument` must lock the document's
* embedding rows BEFORE the document row when cascading tag updates, matching
* the embedding → document order every chunk-mutation path uses
* (chunks/service.ts). The opposite order deadlocks a document tag edit against
* a concurrent chunk edit of the same document.
*/
import { document, embedding } from '@sim/db/schema'
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { updateDocument } from '@/lib/knowledge/documents/service'

vi.mock('@sim/db', () => dbChainMock)

/** invocationCallOrder of the first `tx.update(table)` call. */
function updateOrderForTable(table: unknown): number {
const { calls, invocationCallOrder } = dbChainMockFns.update.mock
for (let i = 0; i < calls.length; i++) {
if (calls[i][0] === table) return invocationCallOrder[i]
}
return -1
}

describe('updateDocument lock ordering', () => {
beforeEach(() => {
vi.clearAllMocks()
resetDbChainMock()
// Post-transaction re-read of the updated document must return a row.
dbChainMockFns.limit.mockResolvedValue([{ id: 'doc-1', knowledgeBaseId: 'kb-1' }])
})

it('updates embeddings before the document row when cascading tag changes', async () => {
await updateDocument('doc-1', { tag1: 'priority' }, 'req-1')

const embeddingWriteOrder = updateOrderForTable(embedding)
const documentWriteOrder = updateOrderForTable(document)

expect(embeddingWriteOrder).toBeGreaterThan(0)
expect(documentWriteOrder).toBeGreaterThan(0)
expect(embeddingWriteOrder).toBeLessThan(documentWriteOrder)
})
})
4 changes: 2 additions & 2 deletions apps/sim/lib/knowledge/documents/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1902,8 +1902,6 @@ export async function updateDocument(
})

await db.transaction(async (tx) => {
await tx.update(document).set(dbUpdateData).where(eq(document.id, documentId))

const hasTagUpdates = ALL_TAG_SLOTS.some((field) => typedUpdateData[field] !== undefined)

if (hasTagUpdates) {
Expand All @@ -1921,6 +1919,8 @@ export async function updateDocument(
.set(embeddingUpdateData)
.where(eq(embedding.documentId, documentId))
}

await tx.update(document).set(dbUpdateData).where(eq(document.id, documentId))
})

const updatedDocument = await db
Expand Down
Loading
Loading