diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 414ae81047b..52f7f3e5412 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -426,7 +426,7 @@ async function recoverStaleDatabaseScheduleJobs(now: Date): Promise { await db.transaction(async (tx) => { const [lock] = await tx.execute<{ acquired: boolean }>( - sql`SELECT pg_try_advisory_xact_lock(hashtext(${SCHEDULE_EXECUTION_QUEUE_NAME})) AS acquired` + sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired` ) if (!lock?.acquired) { logger.info( @@ -527,7 +527,7 @@ async function tryStartDatabaseScheduleJob(jobId: string): Promise { const [lock] = await tx.execute<{ acquired: boolean }>( - sql`SELECT pg_try_advisory_xact_lock(hashtext(${SCHEDULE_EXECUTION_QUEUE_NAME})) AS acquired` + sql`SELECT pg_try_advisory_xact_lock(hashtextextended(${SCHEDULE_EXECUTION_QUEUE_NAME}, 0)) AS acquired` ) if (!lock?.acquired) return 'capacity_full' diff --git a/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts b/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts index 8235f727b2e..d7dca1499db 100644 --- a/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts +++ b/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts @@ -21,6 +21,13 @@ import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/per const logger = createLogger('WorkspaceBYOKKeysAPI') +/** + * Bounds the per-provider BYOK advisory-lock wait so a stuck holder fails fast + * (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a + * server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`. + */ +const WORKSPACE_BYOK_LOCK_TIMEOUT_MS = 5_000 + function maskApiKey(key: string): string { if (key.length <= 8) { return '•'.repeat(8) @@ -203,7 +210,10 @@ export const POST = withRouteHandler( const newKey = await db.transaction(async (tx) => { await tx.execute( - sql`SELECT pg_advisory_xact_lock(hashtext(${`byok:${workspaceId}:${providerId}`}))` + sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_BYOK_LOCK_TIMEOUT_MS}ms`}, true)` + ) + await tx.execute( + sql`SELECT pg_advisory_xact_lock(hashtextextended(${`byok:${workspaceId}:${providerId}`}, 0))` ) const [{ keyCount }] = await tx diff --git a/apps/sim/app/api/workspaces/[id]/environment/route.ts b/apps/sim/app/api/workspaces/[id]/environment/route.ts index c32065c49d3..e2a87fdfbbc 100644 --- a/apps/sim/app/api/workspaces/[id]/environment/route.ts +++ b/apps/sim/app/api/workspaces/[id]/environment/route.ts @@ -33,6 +33,13 @@ import { const logger = createLogger('WorkspaceEnvironmentAPI') +/** + * Bounds the workspace-environment advisory-lock wait so a stuck holder fails + * fast (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a + * server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`. + */ +const WORKSPACE_ENV_LOCK_TIMEOUT_MS = 5_000 + /** * Restricts decrypted workspace env values to administrators. Members (including * read-only) receive the variable names with empty values so editor autocomplete @@ -200,7 +207,10 @@ export const PUT = withRouteHandler( ).then((entries) => Object.fromEntries(entries)) const { existingEncrypted, merged } = await db.transaction(async (tx) => { - await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${workspaceId}))`) + await tx.execute( + sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_ENV_LOCK_TIMEOUT_MS}ms`}, true)` + ) + await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${workspaceId}, 0))`) const [existingRow] = await tx .select() @@ -328,7 +338,10 @@ export const DELETE = withRouteHandler( } const result = await db.transaction(async (tx) => { - await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${workspaceId}))`) + await tx.execute( + sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_ENV_LOCK_TIMEOUT_MS}ms`}, true)` + ) + await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${workspaceId}, 0))`) const [existingRow] = await tx .select() diff --git a/apps/sim/lib/billing/organizations/lock-order.test.ts b/apps/sim/lib/billing/organizations/lock-order.test.ts new file mode 100644 index 00000000000..fdcf8f8aacf --- /dev/null +++ b/apps/sim/lib/billing/organizations/lock-order.test.ts @@ -0,0 +1,97 @@ +/** + * @vitest-environment node + * + * Lock-order regression guard: the paid-org join billing transaction must lock + * the personal Pro subscription BEFORE userStats, matching + * restoreUserProSubscription's subscription → userStats order. Snapshotting + * userStats before locking the subscription inverts that pair and deadlocks a + * concurrent Pro restore for the same user. + */ +import { subscription as subscriptionTable, userStats } from '@sim/db/schema' +import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { reapplyPaidOrgJoinBillingForExistingMember } from '@/lib/billing/organizations/membership' + +vi.mock('@sim/db', () => dbChainMock) + +vi.mock('@/lib/core/outbox/service', () => ({ + enqueueOutboxEvent: vi.fn(), +})) + +/** + * A superset row that satisfies every read in the join path: a paid org sub, a + * still-active personal Pro to pause, non-zero usage to snapshot, and zero + * storage (so the conditional org storage-transfer write is skipped — the org + * lock under test is the canonical pre-lock, not the storage update). + */ +const GENERIC_ROW = { + id: 'sub-1', + plan: 'team', + referenceId: 'user-1', + status: 'active', + cancelAtPeriodEnd: false, + stripeSubscriptionId: 'stripe-1', + currentPeriodCost: '5', + proPeriodCostSnapshot: '0', + storageUsedBytes: 0, +} + +type LockOp = { op: 'lock' | 'update' | 'insert'; table: unknown } + +function createRecordingTx() { + const ops: LockOp[] = [] + const select = () => { + const ctx: { table: unknown } = { table: undefined } + const chain = { + from: (table: unknown) => { + ctx.table = table + return chain + }, + where: () => chain, + for: () => { + ops.push({ op: 'lock', table: ctx.table }) + return chain + }, + limit: async () => [GENERIC_ROW], + } + return chain + } + const tx = { + select, + update: (table: unknown) => ({ + set: () => ({ + where: async () => { + ops.push({ op: 'update', table }) + }, + }), + }), + insert: (table: unknown) => ({ + values: async () => { + ops.push({ op: 'insert', table }) + }, + }), + execute: async () => [], + } + return { tx, ops } +} + +describe('paid-org join billing lock ordering', () => { + beforeEach(() => { + vi.clearAllMocks() + resetDbChainMock() + }) + + it('locks the personal subscription before mutating userStats', async () => { + const { tx, ops } = createRecordingTx() + dbChainMockFns.transaction.mockImplementation(async (cb: (t: unknown) => unknown) => cb(tx)) + + await reapplyPaidOrgJoinBillingForExistingMember('user-1', 'org-1') + + const firstUserStatsUpdate = ops.findIndex((o) => o.op === 'update' && o.table === userStats) + const subscriptionLock = ops.findIndex((o) => o.op === 'lock' && o.table === subscriptionTable) + + expect(firstUserStatsUpdate).toBeGreaterThanOrEqual(0) + expect(subscriptionLock).toBeGreaterThanOrEqual(0) + expect(subscriptionLock).toBeLessThan(firstUserStatsUpdate) + }) +}) diff --git a/apps/sim/lib/billing/organizations/membership.ts b/apps/sim/lib/billing/organizations/membership.ts index cd57cb28d07..52775881e10 100644 --- a/apps/sim/lib/billing/organizations/membership.ts +++ b/apps/sim/lib/billing/organizations/membership.ts @@ -676,6 +676,17 @@ async function applyPaidOrgJoinBillingTx( .limit(1) if (personalPro && !personalPro.cancelAtPeriodEnd) { + // Lock the personal Pro subscription before userStats (snapshotted below) so + // this matches restoreUserProSubscription's subscription → userStats order + // and cannot deadlock against a concurrent Pro restore for the same user. + // The cancel update further down re-locks this row (no-op). + await tx + .select({ id: subscriptionTable.id }) + .from(subscriptionTable) + .where(eq(subscriptionTable.id, personalPro.id)) + .for('update') + .limit(1) + const [userStatsRow] = await tx .select({ currentPeriodCost: userStats.currentPeriodCost }) .from(userStats) @@ -1025,13 +1036,6 @@ export async function removeUserFromOrganization( const captureDepartedUsage = async () => { if (skipBillingLogic) return 0 - await tx - .select({ id: organization.id }) - .from(organization) - .where(eq(organization.id, organizationId)) - .for('update') - .limit(1) - const [departingUserStats] = await tx .select({ currentPeriodCost: userStats.currentPeriodCost }) .from(userStats) diff --git a/apps/sim/lib/billing/webhooks/invoices.test.ts b/apps/sim/lib/billing/webhooks/invoices.test.ts index 697a8049bff..da58b9fdbde 100644 --- a/apps/sim/lib/billing/webhooks/invoices.test.ts +++ b/apps/sim/lib/billing/webhooks/invoices.test.ts @@ -230,11 +230,12 @@ describe('invoice billing recovery', () => { expect(mockBlockOrgMembers).not.toHaveBeenCalled() }) - it('coordinates org usage reset with owner tracker and organization locks', async () => { - queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) - queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) - queueSelectResponse({ limitResult: [{ id: 'org-1' }] }) - queueSelectResponse({ whereResult: [{ userId: 'owner-1' }, { userId: 'member-1' }] }) + it('locks member userStats before the organization row during usage reset', async () => { + queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) // owner member row + queueSelectResponse({ limitResult: [{ userId: 'owner-1' }] }) // owner userStats + queueSelectResponse({ whereResult: [{ userId: 'owner-1' }, { userId: 'member-1' }] }) // member ids + queueSelectResponse({ whereResult: [] }) // all-member userStats FOR UPDATE (pre-org lock) + queueSelectResponse({ limitResult: [{ id: 'org-1' }] }) // organization queueSelectResponse({ whereResult: [ { userId: 'owner-1', current: '125', currentCopilot: '10' }, @@ -248,9 +249,17 @@ describe('invoice billing recovery', () => { expect(dbChainMockFns.transaction).toHaveBeenCalledTimes(1) expect(dbChainMockFns.update).toHaveBeenCalledTimes(2) - expect(Object.keys(dbChainMockFns.select.mock.calls[0][0] ?? {})).toEqual(['userId']) - expect(Object.keys(dbChainMockFns.select.mock.calls[1][0] ?? {})).toEqual(['userId']) - expect(Object.keys(dbChainMockFns.select.mock.calls[2][0] ?? {})).toEqual(['id']) + + const whereArgs = dbChainMockFns.where.mock.calls.map( + (call) => call[0] as { type?: string; column?: string; left?: string } + ) + const allMemberStatsLockIndex = whereArgs.findIndex( + (arg) => arg?.type === 'inArray' && arg?.column === 'userId' + ) + const orgLockIndex = whereArgs.findIndex((arg) => arg?.type === 'eq' && arg?.left === 'id') + expect(allMemberStatsLockIndex).toBeGreaterThanOrEqual(0) + expect(orgLockIndex).toBeGreaterThanOrEqual(0) + expect(allMemberStatsLockIndex).toBeLessThan(orgLockIndex) const statsReset = dbChainMockFns.set.mock.calls[0][0] as Record expect(statsReset.currentPeriodCost).not.toBe('0') diff --git a/apps/sim/lib/billing/webhooks/invoices.ts b/apps/sim/lib/billing/webhooks/invoices.ts index 9460f521623..ff7c2294df4 100644 --- a/apps/sim/lib/billing/webhooks/invoices.ts +++ b/apps/sim/lib/billing/webhooks/invoices.ts @@ -457,19 +457,32 @@ export async function resetUsageForSubscription(sub: { .limit(1) } - await tx - .select({ id: organization.id }) - .from(organization) - .where(eq(organization.id, sub.referenceId)) - .for('update') - .limit(1) - const membersRows = await tx .select({ userId: member.userId }) .from(member) .where(eq(member.organizationId, sub.referenceId)) const memberIds = membersRows.map((row) => row.userId) + + // Lock every member's userStats before the organization row so this path + // follows the canonical userStats → organization order shared by the + // join, remove, threshold-billing, and storage-transfer paths. Locking + // organization first would invert against them and risk an AB-BA + // deadlock. The per-member UPDATE below re-locks these rows (no-op). + if (memberIds.length > 0) { + await tx + .select({ userId: userStats.userId }) + .from(userStats) + .where(inArray(userStats.userId, memberIds)) + .for('update') + } + + await tx + .select({ id: organization.id }) + .from(organization) + .where(eq(organization.id, sub.referenceId)) + .for('update') + .limit(1) if (memberIds.length > 0) { const memberStatsRows = await tx .select({ diff --git a/apps/sim/lib/knowledge/documents/lock-order.test.ts b/apps/sim/lib/knowledge/documents/lock-order.test.ts new file mode 100644 index 00000000000..92ecb99c566 --- /dev/null +++ b/apps/sim/lib/knowledge/documents/lock-order.test.ts @@ -0,0 +1,44 @@ +/** + * @vitest-environment node + * + * Lock-order regression guard: `updateDocument` must lock the document's + * embedding rows BEFORE the document row when cascading tag updates, matching + * the embedding → document order every chunk-mutation path uses + * (chunks/service.ts). The opposite order deadlocks a document tag edit against + * a concurrent chunk edit of the same document. + */ +import { document, embedding } from '@sim/db/schema' +import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { updateDocument } from '@/lib/knowledge/documents/service' + +vi.mock('@sim/db', () => dbChainMock) + +/** invocationCallOrder of the first `tx.update(table)` call. */ +function updateOrderForTable(table: unknown): number { + const { calls, invocationCallOrder } = dbChainMockFns.update.mock + for (let i = 0; i < calls.length; i++) { + if (calls[i][0] === table) return invocationCallOrder[i] + } + return -1 +} + +describe('updateDocument lock ordering', () => { + beforeEach(() => { + vi.clearAllMocks() + resetDbChainMock() + // Post-transaction re-read of the updated document must return a row. + dbChainMockFns.limit.mockResolvedValue([{ id: 'doc-1', knowledgeBaseId: 'kb-1' }]) + }) + + it('updates embeddings before the document row when cascading tag changes', async () => { + await updateDocument('doc-1', { tag1: 'priority' }, 'req-1') + + const embeddingWriteOrder = updateOrderForTable(embedding) + const documentWriteOrder = updateOrderForTable(document) + + expect(embeddingWriteOrder).toBeGreaterThan(0) + expect(documentWriteOrder).toBeGreaterThan(0) + expect(embeddingWriteOrder).toBeLessThan(documentWriteOrder) + }) +}) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 6e906c3bdd4..307db69bc7c 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -1902,8 +1902,6 @@ export async function updateDocument( }) await db.transaction(async (tx) => { - await tx.update(document).set(dbUpdateData).where(eq(document.id, documentId)) - const hasTagUpdates = ALL_TAG_SLOTS.some((field) => typedUpdateData[field] !== undefined) if (hasTagUpdates) { @@ -1921,6 +1919,8 @@ export async function updateDocument( .set(embeddingUpdateData) .where(eq(embedding.documentId, documentId)) } + + await tx.update(document).set(dbUpdateData).where(eq(document.id, documentId)) }) const updatedDocument = await db diff --git a/apps/sim/lib/table/__tests__/lock-order.test.ts b/apps/sim/lib/table/__tests__/lock-order.test.ts new file mode 100644 index 00000000000..cc7f4ec9726 --- /dev/null +++ b/apps/sim/lib/table/__tests__/lock-order.test.ts @@ -0,0 +1,107 @@ +/** + * @vitest-environment node + * + * Lock-order regression guard: a column-creating CSV import must acquire the + * per-table row-order advisory lock (`user_table_rows_pos`) BEFORE writing + * `user_table_definitions`, matching the rows_pos → definitions order that plain + * inserts take via the row-count trigger. The opposite order deadlocks + * concurrent inserts on the same table. + */ +import { userTableDefinitions } from '@sim/db/schema' +import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { importAppendRows } from '@/lib/table/service' +import type { TableDefinition } from '@/lib/table/types' + +vi.mock('@sim/db', () => dbChainMock) + +vi.mock('@/lib/core/config/feature-flags', () => ({ + isTablesFractionalOrderingEnabled: false, +})) + +vi.mock('@/lib/table/validation', () => ({ + validateRowSize: vi.fn(() => ({ valid: true, errors: [] })), + validateRowAgainstSchema: vi.fn(() => ({ valid: true, errors: [] })), + coerceRowToSchema: vi.fn(() => ({ valid: true, errors: [] })), + coerceRowValues: vi.fn((row) => row), + validateTableName: vi.fn(() => ({ valid: true, errors: [] })), + validateTableSchema: vi.fn(() => ({ valid: true, errors: [] })), + getUniqueColumns: vi.fn(() => []), + checkUniqueConstraintsDb: vi.fn(async () => ({ valid: true, errors: [] })), + checkBatchUniqueConstraintsDb: vi.fn(async () => ({ valid: true, errors: [] })), +})) + +const TABLE: TableDefinition = { + id: 'tbl-1', + name: 'People', + description: null, + schema: { columns: [{ name: 'name', type: 'string' }] }, + metadata: null, + rowCount: 0, + maxRows: 1000, + workspaceId: 'ws-1', + createdBy: 'user-1', + archivedAt: null, + createdAt: new Date('2024-01-01'), + updatedAt: new Date('2024-01-01'), +} + +/** + * invocationCallOrder of the first `tx.execute(...)` whose SQL contains + * `substring` — checks the template strings, the interpolated values (the + * advisory key is passed as a value), and `sql.raw` output. + */ +function executeOrderContaining(substring: string): number { + const { calls, invocationCallOrder } = dbChainMockFns.execute.mock + for (let i = 0; i < calls.length; i++) { + const arg = calls[i][0] as { strings?: unknown; values?: unknown; rawSql?: unknown } | undefined + const haystacks: string[] = [] + if (Array.isArray(arg?.strings)) { + haystacks.push( + ...(arg.strings as unknown[]).filter((s): s is string => typeof s === 'string') + ) + } + if (Array.isArray(arg?.values)) { + haystacks.push(...(arg.values as unknown[]).filter((v): v is string => typeof v === 'string')) + } + if (typeof arg?.rawSql === 'string') haystacks.push(arg.rawSql) + if (haystacks.some((s) => s.includes(substring))) return invocationCallOrder[i] + } + return -1 +} + +/** invocationCallOrder of the first `tx.update(table)` call. */ +function updateOrderForTable(table: unknown): number { + const { calls, invocationCallOrder } = dbChainMockFns.update.mock + for (let i = 0; i < calls.length; i++) { + if (calls[i][0] === table) return invocationCallOrder[i] + } + return -1 +} + +describe('table import lock ordering', () => { + beforeEach(() => { + vi.clearAllMocks() + resetDbChainMock() + }) + + it('acquires the rows_pos advisory before the user_table_definitions write when adding columns', async () => { + // The lock pre-acquire and the column-creating write both run at the top of + // the import, before the batch-insert loop. The loop's order-key aggregates + // aren't fully wired in this unit mock, so tolerate a downstream error after + // the locks under test have been recorded. + await importAppendRows( + TABLE, + [{ name: 'new_col', type: 'string' }], + [{ name: 'Alice', new_col: 'x' }], + { workspaceId: 'ws-1', userId: 'user-1', requestId: 'req-1' } + ).catch(() => {}) + + const rowsPosLockOrder = executeOrderContaining('user_table_rows_pos') + const definitionsWriteOrder = updateOrderForTable(userTableDefinitions) + + expect(rowsPosLockOrder).toBeGreaterThan(0) + expect(definitionsWriteOrder).toBeGreaterThan(0) + expect(rowsPosLockOrder).toBeLessThan(definitionsWriteOrder) + }) +}) diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 2c526fa9b2d..1037f6c6526 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -2434,6 +2434,12 @@ export async function importAppendRows( return db.transaction(async (trx) => { let working = table if (additions.length > 0) { + // Take the row-order lock before creating columns so this path uses the + // same rows_pos → user_table_definitions order as plain inserts. Creating + // columns first would lock the definition row before rows_pos, inverting + // the order and deadlocking concurrent inserts on this table. The lock is + // re-entrant, so the per-batch acquire below is a no-op. + await acquireRowOrderLock(trx, table.id) working = await addTableColumnsWithTx(trx, table, additions, ctx.requestId) } const inserted: TableRow[] = [] @@ -2464,6 +2470,7 @@ export async function importReplaceRows( return db.transaction(async (trx) => { let working = table if (additions.length > 0) { + await acquireRowOrderLock(trx, table.id) working = await addTableColumnsWithTx(trx, table, additions, requestId) } return replaceTableRowsWithTx( diff --git a/apps/sim/lib/uploads/contexts/workspace/workspace-file-folder-manager.ts b/apps/sim/lib/uploads/contexts/workspace/workspace-file-folder-manager.ts index b524a094a73..5a8e47f72b9 100644 --- a/apps/sim/lib/uploads/contexts/workspace/workspace-file-folder-manager.ts +++ b/apps/sim/lib/uploads/contexts/workspace/workspace-file-folder-manager.ts @@ -9,6 +9,13 @@ import { getWorkspaceWithOwner } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceFileFolders') +/** + * Bounds the workspace-file-folder advisory-lock wait so a stuck holder fails + * fast (SQLSTATE 55P03) rather than hanging, even if the deployment lacks a + * server-side `lock_timeout`. Transaction-scoped via `set_config(..., true)`. + */ +const WORKSPACE_FILE_FOLDER_LOCK_TIMEOUT_MS = 5_000 + export type WorkspaceFileFolderScope = 'active' | 'archived' | 'all' export class WorkspaceFileFolderConflictError extends Error { @@ -109,6 +116,9 @@ async function acquireWorkspaceFileFolderMutationLock( tx: WorkspaceFileFolderLockTx, workspaceId: string ) { + await tx.execute( + sql`SELECT set_config('lock_timeout', ${`${WORKSPACE_FILE_FOLDER_LOCK_TIMEOUT_MS}ms`}, true)` + ) await tx.execute( sql`SELECT pg_advisory_xact_lock(hashtextextended(${`workspace_file_folders:${workspaceId}`}, 0))` )