Skip to content

Commit d97f27e

Browse files
committed
fix trigger dev, bullmq, inline prioritization
1 parent 6b9a1bc commit d97f27e

3 files changed

Lines changed: 15 additions & 13 deletions

File tree

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { v4 as uuidv4 } from 'uuid'
66
import { verifyCronAuth } from '@/lib/auth/internal'
77
import { getJobQueue } from '@/lib/core/async-jobs'
88
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
9+
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
910
import { generateRequestId } from '@/lib/core/utils/request'
1011
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
1112
import {
@@ -153,7 +154,7 @@ export async function GET(request: NextRequest) {
153154
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
154155
)
155156

156-
if (!isBullMQEnabled()) {
157+
if (!isBullMQEnabled() && !isTriggerDevEnabled) {
157158
try {
158159
await jobQueue.startJob(jobId)
159160
const output = await executeScheduleJob(payload)

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth
66
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
77
import { getJobQueue } from '@/lib/core/async-jobs'
88
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
9+
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
910
import {
1011
createTimeoutAbortController,
1112
getTimeoutErrorMessage,
@@ -238,7 +239,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
238239
jobId,
239240
})
240241

241-
if (!isBullMQEnabled() && jobQueue) {
242+
if (!isBullMQEnabled() && !isTriggerDevEnabled && jobQueue) {
242243
const inlineJobQueue = jobQueue
243244
void (async () => {
244245
try {

apps/sim/lib/webhooks/processor.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing/core/subscri
88
import { getInlineJobQueue, getJobQueue } from '@/lib/core/async-jobs'
99
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
1010
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
11-
import { isProd } from '@/lib/core/config/feature-flags'
11+
import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
1212
import { safeCompare } from '@/lib/core/security/encryption'
1313
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
1414
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
@@ -1265,9 +1265,16 @@ export async function queueWebhookExecution(
12651265

12661266
const isPolling = isPollingWebhookProvider(payload.provider)
12671267

1268-
if (isPolling && isBullMQEnabled()) {
1269-
const jobId = isBullMQEnabled()
1270-
? await enqueueWorkspaceDispatch({
1268+
if (isPolling && (isTriggerDevEnabled || isBullMQEnabled())) {
1269+
const jobId = isTriggerDevEnabled
1270+
? await (await getJobQueue()).enqueue('webhook-execution', payload, {
1271+
metadata: {
1272+
workflowId: foundWorkflow.id,
1273+
userId: actorUserId,
1274+
correlation,
1275+
},
1276+
})
1277+
: await enqueueWorkspaceDispatch({
12711278
id: executionId,
12721279
workspaceId: foundWorkflow.workspaceId,
12731280
lane: 'runtime',
@@ -1284,13 +1291,6 @@ export async function queueWebhookExecution(
12841291
correlation,
12851292
},
12861293
})
1287-
: await (await getJobQueue()).enqueue('webhook-execution', payload, {
1288-
metadata: {
1289-
workflowId: foundWorkflow.id,
1290-
userId: actorUserId,
1291-
correlation,
1292-
},
1293-
})
12941294
logger.info(
12951295
`[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue`
12961296
)

0 commit comments

Comments
 (0)