diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 9c86fc1c4f6..7891fcd01b1 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -21,9 +21,9 @@ export const dynamic = 'force-dynamic' export const maxDuration = 3600 const logger = createLogger('ScheduledExecuteAPI') -const MAX_CRON_CLAIMS = 200 -const RESERVED_WORKFLOW_CLAIMS = 100 -const RESERVED_JOB_CLAIMS = MAX_CRON_CLAIMS - RESERVED_WORKFLOW_CLAIMS +const WORKFLOW_CHUNK_SIZE = 100 +const JOB_CHUNK_SIZE = 100 +const MAX_TICK_DURATION_MS = 3 * 60 * 1000 const STALE_SCHEDULE_CLAIM_MS = getMaxExecutionTimeout() const dueFilter = (queuedAt: Date) => @@ -143,203 +143,240 @@ async function claimJobSchedules(queuedAt: Date, limit: number) { }) } -export const GET = withRouteHandler(async (request: NextRequest) => { - const requestId = generateRequestId() - logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`) - - const authError = verifyCronAuth(request, 'Schedule execution') - if (authError) { - return authError +type ClaimedSchedule = Awaited>[number] +type ClaimedJob = Awaited>[number] +type WorkflowUtils = typeof import('@/lib/workflows/utils') +type JobQueue = Awaited> + +async function processScheduleItem( + schedule: ClaimedSchedule, + queuedAt: Date, + requestId: string, + jobQueue: JobQueue, + workflowUtils: WorkflowUtils +) { + const queueTime = schedule.lastQueuedAt ?? queuedAt + const executionId = generateId() + const correlation = { + executionId, + requestId, + source: 'schedule' as const, + workflowId: schedule.workflowId!, + scheduleId: schedule.id, + triggerType: 'schedule', + scheduledFor: schedule.nextRunAt?.toISOString(), } - const queuedAt = new Date() + const payload = { + scheduleId: schedule.id, + workflowId: schedule.workflowId!, + executionId, + requestId, + correlation, + blockId: schedule.blockId || undefined, + deploymentVersionId: schedule.deploymentVersionId || undefined, + cronExpression: schedule.cronExpression || undefined, + lastRanAt: schedule.lastRanAt?.toISOString(), + failedCount: schedule.failedCount || 0, + now: queueTime.toISOString(), + scheduledFor: schedule.nextRunAt?.toISOString(), + } try { - const dueSchedules = await claimWorkflowSchedules(queuedAt, RESERVED_WORKFLOW_CLAIMS) - const dueJobs = await claimJobSchedules(queuedAt, RESERVED_JOB_CLAIMS) - const remainingClaimBudget = Math.max(0, MAX_CRON_CLAIMS - dueSchedules.length - dueJobs.length) - - if (remainingClaimBudget > 0 && dueSchedules.length === RESERVED_WORKFLOW_CLAIMS) { - dueSchedules.push(...(await claimWorkflowSchedules(queuedAt, remainingClaimBudget))) - } else if (remainingClaimBudget > 0 && dueJobs.length === RESERVED_JOB_CLAIMS) { - dueJobs.push(...(await claimJobSchedules(queuedAt, remainingClaimBudget))) + const scheduleJobId = buildScheduleExecutionJobId(schedule) + const existingJob = await jobQueue.getJob(scheduleJobId) + if (existingJob && ['pending', 'processing'].includes(existingJob.status)) { + logger.info(`[${requestId}] Schedule execution job already exists`, { + scheduleId: schedule.id, + jobId: scheduleJobId, + status: existingJob.status, + }) + return + } + if (existingJob) { + logger.info(`[${requestId}] Releasing stale schedule claim for finished job`, { + scheduleId: schedule.id, + jobId: scheduleJobId, + status: existingJob.status, + }) + await releaseScheduleLock( + schedule.id, + requestId, + queuedAt, + `Released stale schedule ${schedule.id} for finished job ${scheduleJobId}`, + getNextRunFromCronExpression(schedule.cronExpression) + ) + return } - const totalCount = dueSchedules.length + dueJobs.length + const resolvedWorkflow = schedule.workflowId + ? await workflowUtils.getWorkflowById(schedule.workflowId) + : null + const resolvedWorkspaceId = resolvedWorkflow?.workspaceId + + const jobId = await jobQueue.enqueue('schedule-execution', payload, { + jobId: scheduleJobId, + concurrencyKey: scheduleJobId, + metadata: { + workflowId: schedule.workflowId ?? undefined, + workspaceId: resolvedWorkspaceId ?? undefined, + correlation, + }, + }) logger.info( - `[${requestId}] Processing ${totalCount} due items (${dueSchedules.length} schedules, ${dueJobs.length} jobs)` + `[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}` ) - const jobQueue = await getJobQueue() - - const workflowUtils = - dueSchedules.length > 0 ? await import('@/lib/workflows/utils') : undefined - - const schedulePromises = dueSchedules.map(async (schedule) => { - const queueTime = schedule.lastQueuedAt ?? queuedAt - const executionId = generateId() - const correlation = { - executionId, - requestId, - source: 'schedule' as const, - workflowId: schedule.workflowId!, - scheduleId: schedule.id, - triggerType: 'schedule', - scheduledFor: schedule.nextRunAt?.toISOString(), - } - - const payload = { + const queuedJob = await jobQueue.getJob(jobId) + if (queuedJob && !['pending', 'processing'].includes(queuedJob.status)) { + logger.info(`[${requestId}] Schedule execution job already finished`, { scheduleId: schedule.id, - workflowId: schedule.workflowId!, - executionId, + jobId, + status: queuedJob.status, + }) + await releaseScheduleLock( + schedule.id, requestId, - correlation, - blockId: schedule.blockId || undefined, - deploymentVersionId: schedule.deploymentVersionId || undefined, - cronExpression: schedule.cronExpression || undefined, - lastRanAt: schedule.lastRanAt?.toISOString(), - failedCount: schedule.failedCount || 0, - now: queueTime.toISOString(), - scheduledFor: schedule.nextRunAt?.toISOString(), - } + queuedAt, + `Released stale schedule ${schedule.id} for finished job ${jobId}`, + getNextRunFromCronExpression(schedule.cronExpression) + ) + return + } + if (shouldExecuteInline()) { try { - const scheduleJobId = buildScheduleExecutionJobId(schedule) - const existingJob = await jobQueue.getJob(scheduleJobId) - if (existingJob && ['pending', 'processing'].includes(existingJob.status)) { - logger.info(`[${requestId}] Schedule execution job already exists`, { - scheduleId: schedule.id, - jobId: scheduleJobId, - status: existingJob.status, - }) - return - } - if (existingJob) { - logger.info(`[${requestId}] Releasing stale schedule claim for finished job`, { - scheduleId: schedule.id, - jobId: scheduleJobId, - status: existingJob.status, - }) - await releaseScheduleLock( - schedule.id, - requestId, - queuedAt, - `Released stale schedule ${schedule.id} for finished job ${scheduleJobId}`, - getNextRunFromCronExpression(schedule.cronExpression) - ) - return - } - - const resolvedWorkflow = schedule.workflowId - ? await workflowUtils?.getWorkflowById(schedule.workflowId) - : null - const resolvedWorkspaceId = resolvedWorkflow?.workspaceId - - const jobId = await jobQueue.enqueue('schedule-execution', payload, { - jobId: scheduleJobId, - concurrencyKey: scheduleJobId, - metadata: { - workflowId: schedule.workflowId ?? undefined, - workspaceId: resolvedWorkspaceId ?? undefined, - correlation, - }, - }) - logger.info( - `[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}` + await jobQueue.startJob(jobId) + const output = await executeScheduleJob(payload) + await jobQueue.completeJob(jobId, output) + } catch (error) { + const errorMessage = toError(error).message + logger.error( + `[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`, + { + jobId, + error: errorMessage, + } ) - - const queuedJob = await jobQueue.getJob(jobId) - if (queuedJob && !['pending', 'processing'].includes(queuedJob.status)) { - logger.info(`[${requestId}] Schedule execution job already finished`, { - scheduleId: schedule.id, + try { + await jobQueue.markJobFailed(jobId, errorMessage) + } catch (markFailedError) { + logger.error(`[${requestId}] Failed to mark job as failed`, { jobId, - status: queuedJob.status, + error: toError(markFailedError).message, }) - await releaseScheduleLock( - schedule.id, - requestId, - queuedAt, - `Released stale schedule ${schedule.id} for finished job ${jobId}`, - getNextRunFromCronExpression(schedule.cronExpression) - ) - return } - - if (shouldExecuteInline()) { - try { - await jobQueue.startJob(jobId) - const output = await executeScheduleJob(payload) - await jobQueue.completeJob(jobId, output) - } catch (error) { - const errorMessage = toError(error).message - logger.error( - `[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`, - { - jobId, - error: errorMessage, - } - ) - try { - await jobQueue.markJobFailed(jobId, errorMessage) - } catch (markFailedError) { - logger.error(`[${requestId}] Failed to mark job as failed`, { - jobId, - error: - markFailedError instanceof Error - ? markFailedError.message - : String(markFailedError), - }) - } - await releaseScheduleLock( - schedule.id, - requestId, - queuedAt, - `Failed to release lock for schedule ${schedule.id} after inline execution failure` - ) - } - } - } catch (error) { - logger.error( - `[${requestId}] Failed to queue schedule execution for workflow ${schedule.workflowId}`, - error - ) await releaseScheduleLock( schedule.id, requestId, queuedAt, - `Failed to release lock for schedule ${schedule.id} after queue failure` + `Failed to release lock for schedule ${schedule.id} after inline execution failure` ) } + } + } catch (error) { + logger.error( + `[${requestId}] Failed to queue schedule execution for workflow ${schedule.workflowId}`, + error + ) + await releaseScheduleLock( + schedule.id, + requestId, + queuedAt, + `Failed to release lock for schedule ${schedule.id} after queue failure` + ) + } +} + +async function processJobItem(job: ClaimedJob, queuedAt: Date, requestId: string) { + const queueTime = job.lastQueuedAt ?? queuedAt + const payload = { + scheduleId: job.id, + cronExpression: job.cronExpression || undefined, + failedCount: job.failedCount || 0, + now: queueTime.toISOString(), + } + + try { + await executeJobInline(payload) + } catch (error) { + logger.error(`[${requestId}] Job execution failed for ${job.id}`, { + error: toError(error).message, }) + await releaseScheduleLock( + job.id, + requestId, + queuedAt, + `Failed to release lock for job ${job.id}` + ) + } +} - // Mothership jobs are executed inline directly. - const jobPromises = dueJobs.map(async (job) => { - const queueTime = job.lastQueuedAt ?? queuedAt - const payload = { - scheduleId: job.id, - cronExpression: job.cronExpression || undefined, - failedCount: job.failedCount || 0, - now: queueTime.toISOString(), - } +export const GET = withRouteHandler(async (request: NextRequest) => { + const requestId = generateRequestId() + const tickStart = Date.now() + logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`) - try { - await executeJobInline(payload) - } catch (error) { - logger.error(`[${requestId}] Job execution failed for ${job.id}`, { - error: toError(error).message, - }) - await releaseScheduleLock( - job.id, - requestId, - queuedAt, - `Failed to release lock for job ${job.id}` - ) + const authError = verifyCronAuth(request, 'Schedule execution') + if (authError) { + return authError + } + + try { + const jobQueue = await getJobQueue() + let workflowUtils: WorkflowUtils | undefined + + let totalSchedules = 0 + let totalJobs = 0 + let iterations = 0 + let schedulesExhausted = false + let jobsExhausted = false + + while (Date.now() - tickStart < MAX_TICK_DURATION_MS) { + if (schedulesExhausted && jobsExhausted) break + const queuedAt = new Date() + + const [dueSchedules, dueJobs] = await Promise.all([ + schedulesExhausted ? [] : claimWorkflowSchedules(queuedAt, WORKFLOW_CHUNK_SIZE), + jobsExhausted ? [] : claimJobSchedules(queuedAt, JOB_CHUNK_SIZE), + ]) + + if (dueSchedules.length < WORKFLOW_CHUNK_SIZE) schedulesExhausted = true + if (dueJobs.length < JOB_CHUNK_SIZE) jobsExhausted = true + + if (dueSchedules.length === 0 && dueJobs.length === 0) break + + iterations += 1 + totalSchedules += dueSchedules.length + totalJobs += dueJobs.length + + logger.info( + `[${requestId}] Iteration ${iterations}: claimed ${dueSchedules.length} schedules, ${dueJobs.length} jobs` + ) + + if (dueSchedules.length > 0 && !workflowUtils) { + workflowUtils = await import('@/lib/workflows/utils') } - }) - await Promise.allSettled([...schedulePromises, ...jobPromises]) + const loadedWorkflowUtils = workflowUtils + const schedulePromises = + loadedWorkflowUtils && dueSchedules.length > 0 + ? dueSchedules.map((schedule) => + processScheduleItem(schedule, queuedAt, requestId, jobQueue, loadedWorkflowUtils) + ) + : [] + + await Promise.allSettled([ + ...schedulePromises, + ...dueJobs.map((job) => processJobItem(job, queuedAt, requestId)), + ]) + } - logger.info(`[${requestId}] Processed ${totalCount} items`) + const totalCount = totalSchedules + totalJobs + const durationMs = Date.now() - tickStart + logger.info( + `[${requestId}] Processed ${totalCount} items across ${iterations} iteration(s) in ${durationMs}ms (${totalSchedules} schedules, ${totalJobs} jobs)` + ) return NextResponse.json({ message: 'Scheduled workflow executions processed',