Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .github/workflows/migrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ jobs:

if [ "${ENVIRONMENT}" = "dev" ]; then
echo "Dev environment — pushing schema directly (db:push)"
bun run db:push --force
# `--force` only suppresses the data-loss confirm, not drizzle's
# rename-vs-drop prompt, which fires (and crashes, no TTY) when a
# diff both adds and drops tables/columns at once. Turn that opaque
# crash into an actionable failure instead of a bare stack trace.
push_output="$(bun run db:push --force 2>&1)" && push_status=0 || push_status=$?
echo "$push_output"
if [ "$push_status" -ne 0 ]; then
if printf '%s' "$push_output" | grep -q 'Interactive prompts require a TTY'; then
echo "::error title=Dev schema push needs manual reconciliation::drizzle-kit push hit an interactive rename/drop prompt that CI cannot answer. The dev DB has drifted from schema.ts: it still holds table(s)/column(s) the schema no longer declares while the schema also adds new ones, so drizzle cannot tell a rename from a drop+create. Fix: drop the stale objects on the dev DB to match schema.ts — the same DROPs the latest versioned migration already applied to staging/prod (grep packages/db/migrations for the most recent DROP TABLE / DROP COLUMN) — then re-run this workflow. --force cannot bypass this prompt."
fi
exit "$push_status"
fi
else
echo "Applying versioned migrations (db:migrate)"
bun run ./scripts/migrate.ts
Expand Down
60 changes: 43 additions & 17 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { asyncJobs, db } from '@sim/db'
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
import { tableJobs, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
Expand All @@ -8,12 +8,15 @@ import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { deleteFile } from '@/lib/uploads/core/storage-service'

const logger = createLogger('CleanupStaleExecutions')

const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
const MAX_INT32 = 2_147_483_647
/** Terminal table-jobs older than this are pruned; only the latest job per table is ever read. */
const TABLE_JOB_RETENTION_HOURS = 24

export const GET = withRouteHandler(async (request: NextRequest) => {
try {
Expand Down Expand Up @@ -110,33 +113,56 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
})
}

// Mark stale table imports as failed. Imports run detached on the web container and
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
// an `importing` table with no recent update has stalled (not merely slow). Rows are
// left in place (no rollback); the user re-imports.
// Mark stale table jobs (import or delete) as failed. Jobs run detached on the web container
// and are lost if the pod is killed mid-run. `updated_at` is bumped by progress updates, so a
// `running` job with no recent update has stalled (not merely slow). Committed work is left in
// place (no rollback); the user retries. Also prune long-settled terminal jobs so the table
// doesn't grow unbounded (the latest job per table is what list/detail reads surface).
let staleImportsMarkedFailed = 0
try {
const now = new Date()
const staleImports = await db
.update(userTableDefinitions)
.update(tableJobs)
.set({
importStatus: 'failed',
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
updatedAt: new Date(),
status: 'failed',
error: `Job terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
completedAt: now,
updatedAt: now,
})
.where(and(eq(tableJobs.status, 'running'), lt(tableJobs.updatedAt, staleThreshold)))
.returning({ id: tableJobs.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table jobs as failed`)
}

const terminalRetention = new Date(Date.now() - TABLE_JOB_RETENTION_HOURS * 60 * 60 * 1000)
const pruned = await db
.delete(tableJobs)
.where(
and(
eq(userTableDefinitions.importStatus, 'importing'),
lt(userTableDefinitions.updatedAt, staleThreshold)
inArray(tableJobs.status, ['ready', 'failed', 'canceled']),
lt(tableJobs.updatedAt, terminalRetention)
)
)
.returning({ id: userTableDefinitions.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
.returning({ type: tableJobs.type, payload: tableJobs.payload })

// Pruned export jobs carry the generated file's storage key — delete the file with the job
// so the exports prefix doesn't accumulate. Best-effort: a miss just orphans one object.
for (const job of pruned) {
if (job.type !== 'export') continue
const resultKey = (job.payload as { resultKey?: string } | null)?.resultKey
if (!resultKey) continue
await deleteFile({ key: resultKey, context: 'workspace' }).catch((err) => {
logger.warn('Failed to delete pruned export file', {
resultKey,
error: toError(err).message,
})
})
}
} catch (error) {
logger.error('Failed to clean up stale table imports:', {
logger.error('Failed to clean up stale table jobs:', {
error: toError(error).message,
})
}
Expand Down
12 changes: 9 additions & 3 deletions apps/sim/app/api/table/[tableId]/cancel-runs/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { cancelWorkflowGroupRuns } from '@/lib/table/workflow-columns'
import { accessError, checkAccess } from '@/app/api/table/utils'
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'

const logger = createLogger('TableCancelRunsAPI')

Expand All @@ -32,7 +32,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const parsed = await parseRequest(cancelTableRunsContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, scope, rowId } = parsed.data.body
const { workspaceId, scope, rowId, filter, excludeRowIds } = parsed.data.body

const result = await checkAccess(tableId, authResult.userId, 'write')
if (!result.ok) return accessError(result, requestId, tableId)
Expand All @@ -42,7 +42,13 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
}

const cancelled = await cancelWorkflowGroupRuns(tableId, scope === 'row' ? rowId : undefined)
const filterError = tableFilterError(filter, table.schema.columns)
if (filterError) return filterError

const cancelled = await cancelWorkflowGroupRuns(tableId, scope === 'row' ? rowId : undefined, {
filter,
excludeRowIds,
})
logger.info(
`[${requestId}] cancel-runs: tableId=${tableId} scope=${scope}${
rowId ? ` rowId=${rowId}` : ''
Expand Down
64 changes: 33 additions & 31 deletions apps/sim/app/api/table/[tableId]/columns/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
updateColumnConstraints,
updateColumnType,
} from '@/lib/table'
import { accessError, checkAccess, normalizeColumn } from '@/app/api/table/utils'
import { accessError, checkAccess, normalizeColumn, rootErrorMessage } from '@/app/api/table/utils'

const logger = createLogger('TableColumnsAPI')

Expand Down Expand Up @@ -63,13 +63,17 @@ export const POST = withRouteHandler(async (request: NextRequest, context: Colum
return validationErrorResponse(error, 'Invalid request data')
}

if (error instanceof Error) {
if (error.message.includes('already exists') || error.message.includes('maximum column')) {
return NextResponse.json({ error: error.message }, { status: 400 })
}
if (error.message === 'Table not found') {
return NextResponse.json({ error: error.message }, { status: 404 })
}
const msg = rootErrorMessage(error)
if (
msg.includes('already exists') ||
msg.includes('maximum column') ||
msg.includes('Invalid column') ||
msg.includes('exceeds maximum')
) {
return NextResponse.json({ error: msg }, { status: 400 })
}
if (msg === 'Table not found') {
return NextResponse.json({ error: msg }, { status: 404 })
}

logger.error(`[${requestId}] Error adding column to table ${tableId}:`, error)
Expand Down Expand Up @@ -146,22 +150,21 @@ export const PATCH = withRouteHandler(async (request: NextRequest, context: Colu
return validationErrorResponse(error, 'Invalid request data')
}

if (error instanceof Error) {
const msg = error.message
if (msg.includes('not found') || msg.includes('Table not found')) {
return NextResponse.json({ error: msg }, { status: 404 })
}
if (
msg.includes('already exists') ||
msg.includes('Cannot delete the last column') ||
msg.includes('Cannot set column') ||
msg.includes('Invalid column') ||
msg.includes('exceeds maximum') ||
msg.includes('incompatible') ||
msg.includes('duplicate')
) {
return NextResponse.json({ error: msg }, { status: 400 })
}
const msg = rootErrorMessage(error)
if (msg.includes('not found') || msg.includes('Table not found')) {
return NextResponse.json({ error: msg }, { status: 404 })
}
if (
msg.includes('already exists') ||
msg.includes('Cannot delete the last column') ||
msg.includes('Cannot set column') ||
msg.includes('Cannot set unique column') ||
msg.includes('Invalid column') ||
msg.includes('exceeds maximum') ||
msg.includes('incompatible') ||
msg.includes('duplicate')
) {
return NextResponse.json({ error: msg }, { status: 400 })
}

logger.error(`[${requestId}] Error updating column in table ${tableId}:`, error)
Expand Down Expand Up @@ -211,13 +214,12 @@ export const DELETE = withRouteHandler(
return validationErrorResponse(error, 'Invalid request data')
}

if (error instanceof Error) {
if (error.message.includes('not found') || error.message === 'Table not found') {
return NextResponse.json({ error: error.message }, { status: 404 })
}
if (error.message.includes('Cannot delete') || error.message.includes('last column')) {
return NextResponse.json({ error: error.message }, { status: 400 })
}
const msg = rootErrorMessage(error)
if (msg.includes('not found') || msg === 'Table not found') {
return NextResponse.json({ error: msg }, { status: 404 })
}
if (msg.includes('Cannot delete') || msg.includes('last column')) {
return NextResponse.json({ error: msg }, { status: 400 })
}

logger.error(`[${requestId}] Error deleting column from table ${tableId}:`, error)
Expand Down
11 changes: 9 additions & 2 deletions apps/sim/app/api/table/[tableId]/columns/run/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { runWorkflowColumn } from '@/lib/table/workflow-columns'
import { accessError, checkAccess } from '@/app/api/table/utils'
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'

const logger = createLogger('TableRunColumnAPI')

Expand All @@ -25,16 +25,23 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const parsed = await parseRequest(runColumnContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, groupIds, runMode, rowIds, limit } = parsed.data.body
const { workspaceId, groupIds, runMode, rowIds, filter, excludeRowIds, limit } =
parsed.data.body
const access = await checkAccess(tableId, auth.userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)

// Validate the filter up front (the dispatcher reuses it) so a bad field fails fast.
const filterError = tableFilterError(filter, access.table.schema.columns)
if (filterError) return filterError

const { dispatchId } = await runWorkflowColumn({
tableId,
workspaceId,
groupIds,
mode: runMode,
rowIds,
filter,
excludeRowIds,
limit,
requestId,
triggeredByUserId: auth.userId,
Expand Down
Loading
Loading