From 80c67c4f09572ba490d61b0b7ae65af2ff6ef71f Mon Sep 17 00:00:00 2001 From: Akshay Sasidharan Date: Tue, 1 Jul 2025 13:02:15 +0530 Subject: [PATCH 01/25] feat: trigger workflows with their name or id - Fixes webhook trigger failing on license check - Add backward compatible changes to trigger workflows with their name or id --- server/package-lock.json | 2 +- .../modules/licensing/guards/webhook.guard.ts | 91 ++++++++++--------- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/server/package-lock.json b/server/package-lock.json index fa1e6d68ba..3cef36d54f 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -20645,4 +20645,4 @@ } } } -} \ No newline at end of file +} diff --git a/server/src/modules/licensing/guards/webhook.guard.ts b/server/src/modules/licensing/guards/webhook.guard.ts index 1a5d084643..67dec9fd98 100644 --- a/server/src/modules/licensing/guards/webhook.guard.ts +++ b/server/src/modules/licensing/guards/webhook.guard.ts @@ -5,6 +5,7 @@ import { LicenseTermsService } from '../interfaces/IService'; import { LICENSE_FIELD, LICENSE_LIMIT } from '../constants'; import { AppsRepository } from '@modules/apps/repository'; import { APP_TYPES } from '@modules/apps/constants'; +import { isUUID } from 'class-validator'; @Injectable() export class WebhookGuard implements CanActivate { @@ -16,17 +17,19 @@ export class WebhookGuard implements CanActivate { ) {} async canActivate(context: ExecutionContext): Promise { - const request = context.switchToHttp().getRequest(); + const request = context.switchToHttp().getRequest(); const workflowsLimit = await this.licenseTermsService.getLicenseTerms(LICENSE_FIELD.WORKFLOWS); + const isUuid = isUUID(request?.params?.idOrName); const workflowApp = await this.appsRepository.findOne({ where: { - id: request?.params?.id, + [isUuid ? 'id' : 'name']: request?.params?.idOrName, type: APP_TYPES.WORKFLOW, }, }); if (!workflowApp) throw new HttpException(`Workflow doesn't exists`, 404); + request.tj_app = workflowApp; // Webhook API token validation if (request.headers.authorization.split(' ')[1] !== workflowApp.workflowApiToken) throw new UnauthorizedException(); @@ -35,71 +38,75 @@ export class WebhookGuard implements CanActivate { if (!workflowApp.workflowEnabled) throw new HttpException(`Webhook endpoint disabled or doesn't exists`, 404); // Workspace Level - - // Daily Limit - if ( - workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query( - `SELECT COUNT(*) + if (workflowsLimit.workspace) { + // Daily Limit + if ( + workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED && + ( + await this.manager.query( + `SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE a.organization_id = $1 AND DATE(we.created_at) = current_date`, - [workflowApp.organizationId] - ) - )[0].count >= workflowsLimit.workspace.daily_executions - ) { - throw new HttpException('Maximum daily limit for workflow execution has reached for this workspace', 451); - } + [workflowApp.organizationId] + ) + )[0].count >= workflowsLimit.workspace.daily_executions + ) { + throw new HttpException('Maximum daily limit for workflow execution has reached for this workspace', 451); + } - // Monthly Limit - if ( - workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query( - `SELECT COUNT(*) + // Monthly Limit + if ( + workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED && + ( + await this.manager.query( + `SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE a.organization_id = $1 AND extract (year from we.created_at) = extract (year from current_date) AND extract (month from we.created_at) = extract (month from current_date)`, - [workflowApp.organizationId] - ) - )[0].count >= workflowsLimit.workspace.monthly_executions - ) { - throw new HttpException('Maximum monthly limit for workflow execution has reached for this workspace', 451); + [workflowApp.organizationId] + ) + )[0].count >= workflowsLimit.workspace.monthly_executions + ) { + throw new HttpException('Maximum monthly limit for workflow execution has reached for this workspace', 451); + } } // Instance Level - - // Daily Limit - if ( - workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query(`SELECT COUNT(*) + if (workflowsLimit.instance) { + // Daily Limit + if ( + workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED && + ( + await this.manager.query(`SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE DATE(we.created_at) = current_date`) - )[0].count >= workflowsLimit.instance.daily_executions - ) { - throw new HttpException('Maximum daily limit for workflow execution has been reached', 451); - } + )[0].count >= workflowsLimit.instance.daily_executions + ) { + throw new HttpException('Maximum daily limit for workflow execution has been reached', 451); + } - // Monthly Limit - if ( - workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query(`SELECT COUNT(*) + // Monthly Limit + if ( + workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED && + ( + await this.manager.query(`SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE extract (year from we.created_at) = extract (year from current_date) AND extract (month from we.created_at) = extract (month from current_date)`) - )[0].count >= workflowsLimit.instance.monthly_executions - ) { - throw new HttpException('Maximum monthly limit for workflow execution has been reached', 451); + )[0].count >= workflowsLimit.instance.monthly_executions + ) { + throw new HttpException('Maximum monthly limit for workflow execution has been reached', 451); + } } return true; From ce5dcb6b0ad537272de69dacb010bd7e1f4b6abe Mon Sep 17 00:00:00 2001 From: Akshay Sasidharan Date: Tue, 1 Jul 2025 13:02:37 +0530 Subject: [PATCH 02/25] chore: update subproject commits for frontend and server --- frontend/ee | 2 +- server/ee | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/frontend/ee b/frontend/ee index aa52120545..b117b06fd7 160000 --- a/frontend/ee +++ b/frontend/ee @@ -1 +1 @@ -Subproject commit aa521205455afd59e85762716a0012c1e44986e1 +Subproject commit b117b06fd7dbc43a45d317084ae27b0aedbf4a35 diff --git a/server/ee b/server/ee index 0d32dede92..71e393543d 160000 --- a/server/ee +++ b/server/ee @@ -1 +1 @@ -Subproject commit 0d32dede925b3c7cf75b09c17007a9b46fae8cef +Subproject commit 71e393543ded7c10d1488455b73e3259d1b4d7b6 From 131ff6a288a7fe7ea040ad6ab2a5f4f7c7311965 Mon Sep 17 00:00:00 2001 From: Akshay Sasidharan Date: Tue, 1 Jul 2025 19:17:07 +0530 Subject: [PATCH 03/25] feat: Setup async workflow query execution with SSE Overview This PR implements real-time workflow execution monitoring using Server-Sent Events (SSE) as part of our LTS feature support. The implementation allows for tracking long-running workflows without requiring deployment changes. Technical Changes 1. Server Components Added WorkflowStreamService to manage persistent SSE streams with automatic cleanup Implemented @sse endpoint in WorkflowExecutionsController for streaming status updates Created WorkflowTriggersListener to emit workflow execution events with EventEmitter2 Added workflow execution status constants to identify different states in the execution lifecycle 2. Client-side Components Implemented AsyncQueryHandler to manage SSE connections and parse event streams Enhanced queryPanelSlice with methods to create async handlers and trigger workflows Added support for non-blocking workflow execution with real-time status updates 3. Workflow Integration Modified workflow triggering to use the SSE-based monitoring approach Maintained same-server architecture to avoid deployment changes Added automatic reconnection handling and error recovery for client connections Architecture Decisions Selected Same Server Approach: Chose to implement workflows within the same HTTP server to maintain the existing deployment setup for LTS users Real-time Updates with SSE: Leveraged Server-Sent Events for their simplicity, efficiency, and compatibility with existing infrastructure Future Extension Path: Implementation can be extended to Worker Threads or Microservice architecture later if needed --- .../_stores/slices/queryPanelSlice.js | 437 +++++++++++++----- .../AppBuilder/_utils/async-query-handler.js | 132 ++++++ .../_services/workflow_executions.service.js | 21 + server/ee | 2 +- server/src/main.ts | 6 +- .../modules/auth/workflow-sse-auth.guard.ts | 12 + .../modules/licensing/constants/PlanTerms.ts | 2 +- .../modules/licensing/guards/webhook.guard.ts | 65 +-- .../workflow-executions.controller.ts | 20 +- .../workflow-webhooks.controller.ts | 25 +- .../listeners/workflow-triggers.listener.ts | 37 ++ server/src/modules/workflows/module.ts | 6 +- .../services/workflow-executions.service.ts | 4 + .../services/workflow-stream.service.ts | 31 ++ 14 files changed, 646 insertions(+), 154 deletions(-) create mode 100644 frontend/src/AppBuilder/_utils/async-query-handler.js create mode 100644 server/src/modules/auth/workflow-sse-auth.guard.ts create mode 100644 server/src/modules/workflows/listeners/workflow-triggers.listener.ts create mode 100644 server/src/modules/workflows/services/workflow-stream.service.ts diff --git a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js index f33f67c66e..fa917e54b6 100644 --- a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js +++ b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js @@ -1,3 +1,5 @@ +import { toast } from 'react-hot-toast'; +import { AsyncQueryHandler } from '@/AppBuilder/_utils/async-query-handler'; import _, { isEmpty } from 'lodash'; import { resolveReferences, loadPyodide, hasCircularDependency } from '@/_helpers/utils'; import { fetchOAuthToken, fetchOauthTokenForSlackAndGSheet } from '@/AppBuilder/_utils/auth'; @@ -7,7 +9,6 @@ import axios from 'axios'; import { validateMultilineCode } from '@/_helpers/utility'; import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel'; import { deepClone } from '@/_helpers/utilities/utils.helpers'; -import toast from 'react-hot-toast'; const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {}; const initialState = { @@ -26,6 +27,7 @@ const initialState = { loadingDataQueries: false, isPreviewQueryLoading: false, queryPanelSearchTem: '', + asyncQueryRuns: [], // Array to track active AsyncQueryHandler instances }; export const createQueryPanelSlice = (set, get) => ({ @@ -162,6 +164,19 @@ export const createQueryPanelSlice = (set, get) => ({ 'setLoadingDataQueries' ), + setAsyncQueryRuns: (updater) => + set( + (state) => { + if (typeof updater === 'function') { + state.queryPanel.asyncQueryRuns = updater(state.queryPanel.asyncQueryRuns); + } else { + state.queryPanel.asyncQueryRuns = updater; + } + }, + false, + 'setAsyncQueryRuns' + ), + onQueryConfirmOrCancel: (queryConfirmationData, isConfirm = false, mode = 'edit', moduleId = 'canvas') => { const { queryPanel, dataQuery, setResolvedQuery } = get(); const { runQuery } = queryPanel; @@ -202,6 +217,70 @@ export const createQueryPanelSlice = (set, get) => ({ ); }, + createWorkflowAsyncHandler: ({ + executionId, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }) => { + const asyncHandler = new AsyncQueryHandler({ + streamSSE: (jobId) => { + return workflowExecutionsService.streamSSE(jobId); + }, + extractJobId: () => executionId, + classifyEventStatus: (eventData) => { + // hardcoded for workflows + if (eventData.type === 'workflow_connection_close') { + return { status: 'CLOSE', data: eventData }; + } else if (eventData.type === 'workflow_execution_completed') { + return { status: 'COMPLETE', result: eventData.result, data: eventData }; + } else if (eventData.type === 'workflow_execution_error') { + return { status: 'ERROR', data: eventData }; + } else { + return { status: 'PROGRESS', data: eventData }; + } + }, + callbacks: { + onProgress: (progressData) => { + // Update UI with progress information + if (shouldSetPreviewData) { + setPreviewData({ ...progressData }); + } + setResolvedQuery(queryId, { + isLoading: true, + progress: progressData.progress, + currentData: progressData.partialData || [], + }); + }, + onComplete: async (result) => { + await processQueryResults(result); + // Remove the AsyncQueryHandler instance from asyncQueryRuns on completion + get().queryPanel.setAsyncQueryRuns((currentRuns) => + currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId) + ); + }, + onError: (e) => { + handleFailure({ + status: 'failed', + data: { + message: e.message || 'Error in async processing', + error: e.error, + }, + }); + // Remove the AsyncQueryHandler instance from asyncQueryRuns on error + get().queryPanel.setAsyncQueryRuns((currentRuns) => + currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId) + ); + }, + }, + }); + + return asyncHandler; + }, + runQuery: ( queryId, queryName, @@ -231,7 +310,7 @@ export const createQueryPanelSlice = (set, get) => ({ setPreviewPanelExpanded, executeRunPycode, runTransformation, - executeWorkflow, + triggerWorkflow, executeMultilineJS, } = queryPanel; const queryUpdatePromise = dataQuerySlice.queryUpdates[queryId]; @@ -332,6 +411,97 @@ export const createQueryPanelSlice = (set, get) => ({ } } + // Handler for transformation and completion of query results + const processQueryResults = async (data, rawData = null) => { + let finalData = data; + rawData = rawData || data; + + if (dataQuery.options.enableTransformation) { + finalData = await runTransformation( + finalData, + query.options.transformation, + query.options.transformationLanguage, + query, + mode, + moduleId + ); + + if (finalData.status === 'failed') { + handleFailure(finalData); + return finalData; + } + } + + if (shouldSetPreviewData) { + setPreviewLoading(false); + setPreviewData(finalData); + } + + if (dataQuery.options.showSuccessNotification) { + const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000; + toast.success(dataQuery.options.successMessage, { + duration: notificationDuration, + }); + } + + get().debugger.log({ + logLevel: 'success', + type: 'query', + kind: query.kind, + key: query.name, + message: 'Query executed successfully', + isQuerySuccessLog: true, + errorTarget: 'Queries', + }); + + setResolvedQuery( + queryId, + { + isLoading: false, + data: finalData, + rawData, + metadata: data?.metadata, + request: data?.metadata?.request, + response: data?.metadata?.response, + }, + moduleId + ); + + onEvent('onDataQuerySuccess', queryEvents, mode); + return { status: 'ok', data: finalData }; + }; + + // Handler for query failures + const handleFailure = (errorData) => { + if (shouldSetPreviewData) { + setPreviewLoading(false); + setPreviewData(errorData); + } + + get().debugger.log({ + logLevel: 'error', + type: 'query', + kind: query.kind, + key: query.name, + message: errorData?.description || 'Query failed', + errorTarget: 'Queries', + error: errorData, + isQuerySuccessLog: false, + }); + + setResolvedQuery( + queryId, + { + isLoading: false, + error: errorData, + }, + moduleId + ); + + onEvent('onDataQueryFailure', queryEvents); + return errorData; + }; + // eslint-disable-next-line no-unused-vars return new Promise(function (resolve, reject) { if (shouldSetPreviewData) { @@ -356,7 +526,7 @@ export const createQueryPanelSlice = (set, get) => ({ } else if (query.kind === 'runpy') { queryExecutionPromise = executeRunPycode(query.options.code, query, false, mode, queryState, moduleId); } else if (query.kind === 'workflows') { - queryExecutionPromise = executeWorkflow( + queryExecutionPromise = triggerWorkflow( moduleId, query.options.workflowId, query.options.blocking, @@ -386,6 +556,53 @@ export const createQueryPanelSlice = (set, get) => ({ fetchOAuthToken(url, dataQuery['data_source_id'] || dataQuery['dataSourceId']); } + // Asynchronous query execution + // Currently async query resolution is applicable only to workflows + // Change this conditional to async query type check for other + // async queries in the future + if (query.kind === 'workflows') { + try { + const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ + executionId: data.data.executionId, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }); + + // Process initial response and start SSE monitoring + asyncHandler.processInitialResponse(data.data); + + // Add the AsyncQueryHandler instance to asyncQueryRuns + get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); + + // Set initial state with jobId + setResolvedQuery( + queryId, + { + isLoading: true, + jobId: asyncHandler.jobId, + // data: data.data, + }, + moduleId + ); + + // Resolve with async status + resolve({ + jobId: asyncHandler.jobId, + // data: data.data, + }); + } catch (error) { + toast.error(error.message || 'Failed to start async query'); + resolve({ status: 'failed', message: error.message }); + } + return; + } + + // Handle synchronous queries (original code) + let queryStatusCode = data?.status ?? null; const promiseStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status; // Note: Need to move away from statusText -> statusCode @@ -420,114 +637,15 @@ export const createQueryPanelSlice = (set, get) => ({ errorData = data; break; } - if (shouldSetPreviewData) { - setPreviewLoading(false); - setPreviewData(errorData); - } + errorData = query.kind === 'runpy' || query.kind === 'runjs' ? data?.data : data; - get().debugger.log({ - logLevel: 'error', - type: 'query', - kind: query.kind, - key: query.name, - message: errorData?.description, - errorTarget: 'Queries', - error: - query.kind === 'restapi' - ? { - substitutedVariables: options, - request: data?.data?.requestObject, - response: data?.data?.responseObject, - } - : errorData, - isQuerySuccessLog: false, - }); - - setResolvedQuery( - queryId, - { - isLoading: false, - ...(query.kind === 'restapi' - ? { - metadata: data.metadata, - request: data.data.requestObject, - response: data.data.responseObject, - responseHeaders: data.data.responseHeaders, - } - : {}), - }, - moduleId - ); - - resolve(data); - onEvent('onDataQueryFailure', queryEvents); + const result = handleFailure(errorData); + resolve(result); return; } else { - let rawData = data.data; - let finalData = data.data; - if (dataQuery.options.enableTransformation) { - finalData = await runTransformation( - finalData, - query.options.transformation, - query.options.transformationLanguage, - query, - 'edit', - moduleId - ); - if (finalData.status === 'failed') { - setResolvedQuery( - queryId, - { - isLoading: false, - }, - moduleId - ); - - resolve(finalData); - onEvent('onDataQueryFailure', queryEvents); - setPreviewLoading(false); - if (shouldSetPreviewData) setPreviewData(finalData); - return; - } - } - - if (shouldSetPreviewData) { - setPreviewLoading(false); - setPreviewData(finalData); - } - - if (dataQuery.options.showSuccessNotification) { - const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000; - toast.success(dataQuery.options.successMessage, { - duration: notificationDuration, - }); - } - - get().debugger.log({ - logLevel: 'success', - type: 'query', - kind: query.kind, - key: query.name, - message: 'Query executed successfully', - isQuerySuccessLog: true, - errorTarget: 'Queries', - }); - - setResolvedQuery( - queryId, - { - isLoading: false, - data: finalData, - rawData, - metadata: data?.metadata, - request: data?.metadata?.request, - response: data?.metadata?.response, - }, - moduleId - ); - - resolve({ status: 'ok', data: finalData }); - onEvent('onDataQuerySuccess', queryEvents, mode); + const rawData = data.data; + const result = await processQueryResults(data.data, rawData); + resolve(result); } }) .catch((e) => { @@ -547,7 +665,7 @@ export const createQueryPanelSlice = (set, get) => ({ setPreviewPanelExpanded, executeRunPycode, runTransformation, - executeWorkflow, + triggerWorkflow, executeMultilineJS, setIsPreviewQueryLoading, } = queryPanel; @@ -607,7 +725,7 @@ export const createQueryPanelSlice = (set, get) => ({ } else if (query.kind === 'runpy') { queryExecutionPromise = executeRunPycode(query.options.code, query, true, 'edit', queryState); } else if (query.kind === 'workflows') { - queryExecutionPromise = executeWorkflow( + queryExecutionPromise = triggerWorkflow( moduleId, query.options.workflowId, query.options.blocking, @@ -620,11 +738,72 @@ export const createQueryPanelSlice = (set, get) => ({ queryExecutionPromise .then(async (data) => { + // Asynchronous query execution + // Currently async query resolution is applicable only to workflows + // Change this conditional to async query type check for other + // async queries in the future + if (query.kind === 'workflows') { + const processQueryResultsPreview = async (result) => { + let finalData = result; + if (query.options.enableTransformation) { + finalData = await runTransformation( + finalData, + query.options.transformation, + query.options.transformationLanguage, + query, + 'edit', + moduleId + ); + if (finalData.status === 'failed') { + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + if (!calledFromQuery) setPreviewData(finalData); + resolve({ status: 'failed', data: finalData }); + return finalData; + } + } + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + if (!calledFromQuery) setPreviewData(finalData); + resolve({ status: 'ok', data: finalData }); + return { status: 'ok', data: finalData }; + }; + const handleFailurePreview = (errorData) => { + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + if (!calledFromQuery) setPreviewData(errorData); + resolve({ status: 'failed', data: errorData }); + return errorData; + }; + + try { + const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ + executionId: data.data.executionId, + queryId: query.id, + processQueryResults: processQueryResultsPreview, + handleFailure: handleFailurePreview, + shouldSetPreviewData: true, + setPreviewData, + setResolvedQuery: () => {}, // No resolvedQuery for preview + }); + // Process initial response and start SSE monitoring + asyncHandler.processInitialResponse(data.data); + get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); + // Resolve immediately with jobId for UI tracking + resolve({ jobId: asyncHandler.jobId }); + } catch (error) { + toast.error(error.message || 'Failed to start async preview query'); + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + resolve({ status: 'failed', message: error.message }); + } + return; + } + let finalData = data.data; let queryStatusCode = data?.status ?? null; const queryStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status; switch (true) { - // Note: Need to move away from statusText -> statusCode case queryStatus === 'Bad Request' || queryStatus === 'Not Found' || queryStatus === 'Unprocessable Entity' || @@ -656,9 +835,7 @@ export const createQueryPanelSlice = (set, get) => ({ } onEvent('onDataQueryFailure', queryEvents); - if (!calledFromQuery) setPreviewData(errorData); - break; } case queryStatus === 'needs_oauth': { @@ -721,11 +898,19 @@ export const createQueryPanelSlice = (set, get) => ({ }); }, - executeRunPycode: async (code, query, isPreview, mode, currentState) => { + executeRunPycode: async (code, query, isPreview, mode, currentState, _moduleId = 'canvas') => { const { queryPanel: { evaluatePythonCode }, } = get(); - return { data: await evaluatePythonCode({ code, query, isPreview, mode, currentState }) }; + return { + data: await evaluatePythonCode({ + code, + query, + isPreview, + mode, + currentState, + }), + }; }, evaluatePythonCode: async (options, moduleId = 'canvas') => { @@ -919,7 +1104,13 @@ export const createQueryPanelSlice = (set, get) => ({ const { queryPanel: { evaluatePythonCode }, } = get(); - return await evaluatePythonCode({ queryResult, code, query, mode, currentState }); + return await evaluatePythonCode({ + queryResult, + code, + query, + mode, + currentState, + }); }, updateQuerySuggestions: (oldName, newName) => { @@ -940,7 +1131,7 @@ export const createQueryPanelSlice = (set, get) => ({ delete updatedQueries[oldName]; - const oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`); + const _oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`); // useResolveStore.getState().actions.removeAppSuggestions(oldSuggestions); // useCurrentStateStore.getState().actions.setCurrentState({ @@ -961,6 +1152,18 @@ export const createQueryPanelSlice = (set, get) => ({ return { data: undefined, status: 'failed' }; } }, + triggerWorkflow: async (moduleId, workflowAppId, _blocking = false, params = {}, appEnvId) => { + const { getAllExposedValues } = get(); + const currentState = getAllExposedValues(); + const resolvedParams = get().resolveReferences(moduleId, params, currentState, {}, {}); + + try { + const executionResponse = await workflowExecutionsService.trigger(workflowAppId, resolvedParams, appEnvId); + return { data: executionResponse.result, status: 'ok' }; + } catch (e) { + return { data: e?.message, status: 'failed' }; + } + }, createProxy: (obj, path = '') => { const { queryPanel } = get(); diff --git a/frontend/src/AppBuilder/_utils/async-query-handler.js b/frontend/src/AppBuilder/_utils/async-query-handler.js new file mode 100644 index 0000000000..b23590343f --- /dev/null +++ b/frontend/src/AppBuilder/_utils/async-query-handler.js @@ -0,0 +1,132 @@ +// AsyncQueryHandler manages long-running operations via server-sent events (SSE). +export class AsyncQueryHandler { + /** + * Creates a new AsyncQueryHandler + * @param {Object} options - Configuration options + * @param {Function} options.streamSSE - Function that returns an EventSource for SSE status updates + * @param {Function} options.extractJobId - Function to extract job ID from response + * @param {Function} options.classifyEventStatus - Function to classify SSE events into status categories + * @param {Object} options.callbacks - Event callbacks + * @param {Function} options.callbacks.onProgress - Progress update handler + * @param {Function} options.callbacks.onComplete - Completion handler + * @param {Function} options.callbacks.onError - Error handler + * @param {Function} options.callbacks.onClose - Close handler + */ + constructor(options = {}) { + this.config = { + streamSSE: () => {}, + extractJobId: (response) => response.data?.id, + // Default implementation that doesn't make assumptions about specific status/type fields + classifyEventStatus: (data) => { + return { + // Default to treating all messages as progress updates + status: 'PROGRESS', + result: data.result || data, + // Return data for callback handlers + data, + }; + }, + callbacks: { + onProgress: () => {}, + onComplete: () => {}, + onError: () => {}, + onClose: () => {}, + }, + ...options, + }; + this.eventSource = null; + this.jobId = null; + } + + /** + * Processes the initial query response and starts SSE monitoring + * @param {Object} response - The initial query response + * @returns {{ jobId: string, cancel: Function }} Status object with jobId and control methods + */ + processInitialResponse(response) { + const jobId = this.config.extractJobId(response); + if (!jobId) throw new Error('Could not extract job ID for async query'); + this.jobId = jobId; + this.eventSource = this.startSSE(jobId); + + return { jobId, cancel: () => this.cancel() }; + } + + /** + * Opens an SSE connection to receive real-time updates for the given job. + * @private + * @param {string} jobId - Identifier for the async job + * @returns {EventSource} SSE event source for updates + */ + startSSE(jobId) { + const eventSource = this.config.streamSSE(jobId); + eventSource.onmessage = (event) => this.handleMessage(event, eventSource); + eventSource.onerror = (error) => this.handleError(error, eventSource); + + return eventSource; + } + + /** + * Processes incoming SSE messages and delegates to the appropriate callback. + * @private + * @param {MessageEvent} event - Incoming SSE message + * @param {EventSource} eventSource - EventSource instance for the SSE connection + */ + handleMessage(event, eventSource) { + try { + const payload = JSON.parse(event.data); + const { status, result, data } = this.config.classifyEventStatus(payload); + + switch (status) { + case 'PROGRESS': + this.config.callbacks.onProgress(data); + break; + case 'COMPLETE': + eventSource.close(); + this.config.callbacks.onComplete(result); + break; + case 'ERROR': + eventSource.close(); + this.config.callbacks.onError(data); + break; + case 'CLOSE': + eventSource.close(); + this.config.callbacks.onClose(data); + break; + default: + this.config.callbacks.onProgress(data); + } + } catch (err) { + console.error('Error parsing SSE message:', err); + eventSource.close(); + this.config.callbacks.onError({ message: 'Invalid server message', error: err }); + } + } + + /** + * Handles SSE connection errors and notifies onError if closed. + * @private + * @param {any} error - Error event or object + * @param {EventSource} eventSource - EventSource instance for the SSE connection + */ + handleError(error, eventSource) { + if (eventSource.readyState === EventSource.CLOSED) { + this.config.callbacks.onError({ message: 'SSE connection closed', error }); + } + } + + /** + * Cancels the ongoing async operation and cleans up resources. + */ + cancel() { + if (this.eventSource) { + this.eventSource.close(); + } + // Notify backend to cancel the job if jobId exists + // if (this.jobId) { + // fetch(`${this.config.endpoint}/${this.jobId}/cancel`, { method: 'POST' }).catch((e) => + // console.error('Failed to cancel async job', e) + // ); + // } + } +} diff --git a/frontend/src/_services/workflow_executions.service.js b/frontend/src/_services/workflow_executions.service.js index 8eac985252..6eadc2e43b 100644 --- a/frontend/src/_services/workflow_executions.service.js +++ b/frontend/src/_services/workflow_executions.service.js @@ -10,6 +10,8 @@ export const workflowExecutionsService = { all, enableWebhook, previewQueryNode, + trigger, + streamSSE, }; function previewQueryNode(queryId, appVersionId, nodeId) { @@ -70,3 +72,22 @@ function enableWebhook(appId, value) { const requestOptions = { method: 'PATCH', headers: authHeader(), body: JSON.stringify(body), credentials: 'include' }; return fetch(`${config.apiUrl}/v2/webhooks/workflows/${appId}`, requestOptions).then(handleResponse); } + +function trigger(workflowAppId, params, environmentId) { + const currentSession = authenticationService.currentSessionValue; + const body = { + appId: workflowAppId, + userId: currentSession.current_user?.id, + executeUsing: 'app', + params: Object.fromEntries(params.map((param) => [param.key, param.value])), + environmentId, + }; + const requestOptions = { method: 'POST', headers: authHeader(), body: JSON.stringify(body), credentials: 'include' }; + return fetch(`${config.apiUrl}/workflow_executions/${workflowAppId}/trigger`, requestOptions).then(handleResponse); +} + +function streamSSE(workflowExecutionId) { + return new EventSource(`${config.apiUrl}/workflow_executions/${workflowExecutionId}/stream`, { + withCredentials: true, + }); +} diff --git a/server/ee b/server/ee index 71e393543d..61ab78568c 160000 --- a/server/ee +++ b/server/ee @@ -1 +1 @@ -Subproject commit 71e393543ded7c10d1488455b73e3259d1b4d7b6 +Subproject commit 61ab78568c7843d0fbee66768db04c7f9c482945 diff --git a/server/src/main.ts b/server/src/main.ts index f72400bb94..34098a2fbe 100644 --- a/server/src/main.ts +++ b/server/src/main.ts @@ -49,8 +49,12 @@ async function handleLicensingInit(app: NestExpressApplication) { const License = LicenseModule.default; licenseUtilService.validateHostnameSubpath(License.Instance()?.domains); + const { isValid, licenseType, terms } = License.Instance(); console.log( - `License valid : ${License.Instance().isValid} License Terms : ${JSON.stringify(License.Instance().terms)} šŸš€` + `\nšŸ“‘ License Information: + - Valid: ${isValid} + - Type: ${licenseType} + - Terms:\n${JSON.stringify(terms, null, 2)}\n` ); } function replaceSubpathPlaceHoldersInStaticAssets() { diff --git a/server/src/modules/auth/workflow-sse-auth.guard.ts b/server/src/modules/auth/workflow-sse-auth.guard.ts new file mode 100644 index 0000000000..26fb28092c --- /dev/null +++ b/server/src/modules/auth/workflow-sse-auth.guard.ts @@ -0,0 +1,12 @@ +import { Injectable, ExecutionContext } from '@nestjs/common'; +import { AuthGuard } from '@nestjs/passport'; + +@Injectable() +export class WorkflowSseAuthGuard extends AuthGuard('jwt') { + canActivate(context: ExecutionContext) { + const request = context.switchToHttp().getRequest(); + request.isUserNotMandatory = true; + + return request; + } +} diff --git a/server/src/modules/licensing/constants/PlanTerms.ts b/server/src/modules/licensing/constants/PlanTerms.ts index 509de76918..95a5dcb763 100644 --- a/server/src/modules/licensing/constants/PlanTerms.ts +++ b/server/src/modules/licensing/constants/PlanTerms.ts @@ -84,7 +84,7 @@ export const ENTERPRISE_PLAN_TERMS = { export const WORKFLOW_TEAM_PLAN_TERMS: Partial = { workflows: { - execution_timeout: 60, + execution_timeout: 1800, instance: { total: LICENSE_LIMIT.UNLIMITED, daily_executions: LICENSE_LIMIT.UNLIMITED, diff --git a/server/src/modules/licensing/guards/webhook.guard.ts b/server/src/modules/licensing/guards/webhook.guard.ts index 67dec9fd98..b896d519ee 100644 --- a/server/src/modules/licensing/guards/webhook.guard.ts +++ b/server/src/modules/licensing/guards/webhook.guard.ts @@ -40,38 +40,38 @@ export class WebhookGuard implements CanActivate { // Workspace Level - if (workflowsLimit.workspace) { // Daily Limit - if ( - workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query( - `SELECT COUNT(*) + const workspaceDailyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE a.organization_id = $1 - AND DATE(we.created_at) = current_date`, - [workflowApp.organizationId] - ) - )[0].count >= workflowsLimit.workspace.daily_executions + AND DATE(we.created_at) = current_date + `; + + if ( + workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(workspaceDailyExecutionsQuery, [workflowApp.organizationId]))[0].count >= + workflowsLimit.workspace.daily_executions ) { throw new HttpException('Maximum daily limit for workflow execution has reached for this workspace', 451); } // Monthly Limit - if ( - workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query( - `SELECT COUNT(*) + const workspaceMonthlyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE a.organization_id = $1 AND extract (year from we.created_at) = extract (year from current_date) - AND extract (month from we.created_at) = extract (month from current_date)`, - [workflowApp.organizationId] - ) - )[0].count >= workflowsLimit.workspace.monthly_executions + AND extract (month from we.created_at) = extract (month from current_date) + `; + + if ( + workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(workspaceMonthlyExecutionsQuery, [workflowApp.organizationId]))[0].count >= + workflowsLimit.workspace.monthly_executions ) { throw new HttpException('Maximum monthly limit for workflow execution has reached for this workspace', 451); } @@ -80,30 +80,35 @@ export class WebhookGuard implements CanActivate { // Instance Level - if (workflowsLimit.instance) { // Daily Limit - if ( - workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query(`SELECT COUNT(*) + const instanceDailyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id - WHERE DATE(we.created_at) = current_date`) - )[0].count >= workflowsLimit.instance.daily_executions + WHERE DATE(we.created_at) = current_date + `; + + if ( + workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(instanceDailyExecutionsQuery))[0].count >= workflowsLimit.instance.daily_executions ) { throw new HttpException('Maximum daily limit for workflow execution has been reached', 451); } // Monthly Limit - if ( - workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query(`SELECT COUNT(*) + const instanceMonthlyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE extract (year from we.created_at) = extract (year from current_date) - AND extract (month from we.created_at) = extract (month from current_date)`) - )[0].count >= workflowsLimit.instance.monthly_executions + AND extract (month from we.created_at) = extract (month from current_date) + `; + + if ( + workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(instanceMonthlyExecutionsQuery))[0].count >= + workflowsLimit.instance.monthly_executions ) { throw new HttpException('Maximum monthly limit for workflow execution has been reached', 451); } diff --git a/server/src/modules/workflows/controllers/workflow-executions.controller.ts b/server/src/modules/workflows/controllers/workflow-executions.controller.ts index 6de5ced93f..c6ce798e38 100644 --- a/server/src/modules/workflows/controllers/workflow-executions.controller.ts +++ b/server/src/modules/workflows/controllers/workflow-executions.controller.ts @@ -1,4 +1,4 @@ -import { Body, Controller, Get, Param, Post, Res } from '@nestjs/common'; +import { Body, Controller, Get, Param, Post, Res, Sse } from '@nestjs/common'; import { Response } from 'express'; import { IWorkflowExecutionController } from '../interfaces/IWorkflowExecutionController'; import { CreateWorkflowExecutionDto } from '@dto/create-workflow-execution.dto'; @@ -9,6 +9,7 @@ import { InitModule } from '@modules/app/decorators/init-module'; import { MODULES } from '@modules/app/constants/modules'; import { InitFeature } from '@modules/app/decorators/init-feature.decorator'; import { FEATURE_KEY } from '@modules/workflows/constants'; +import { Observable } from 'rxjs'; @InitModule(MODULES.WORKFLOWS) @Controller('workflow_executions') @@ -52,4 +53,21 @@ export class WorkflowExecutionsController implements IWorkflowExecutionControlle ): Promise<{ result: any }> { throw new Error('Method not implemented.'); } + + @InitFeature(FEATURE_KEY.EXECUTE_WORKFLOW) + @Post(':id/trigger') + async trigger( + @Param('id') id: string, + @Body() createWorkflowExecutionDto: CreateWorkflowExecutionDto, + @User() user, + @Res({ passthrough: true }) response: Response + ): Promise<{ result: any }> { + throw new Error('Method not implemented.'); + } + + @InitFeature(FEATURE_KEY.WORKFLOW_EXECUTION_STATUS) + @Sse(':id/stream') + streamWorkflowExecution(@Param('id') id: string): Observable { + throw new Error('Method not implemented.'); + } } diff --git a/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts b/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts index 4558327f6b..982448aaf2 100644 --- a/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts +++ b/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Post, Param, Body, Patch, Query, Res } from '@nestjs/common'; +import { Controller, Post, Param, Body, Patch, Query, Res, Get, Sse } from '@nestjs/common'; import { Response } from 'express'; import { IWorkflowWebhooksController } from '../interfaces/IWorkflowWebhooksController'; import { InitModule } from '@modules/app/decorators/init-module'; @@ -25,6 +25,29 @@ export class WorkflowWebhooksController implements IWorkflowWebhooksController { throw new Error('Method not implemented.'); } + @InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW) + @Post('workflows/:idOrName/trigger-async') + async triggerWorkflowAsync( + @Param('app') app: any, + @Param('idOrName') idOrName: string, + @Body() workflowParams: Record, + @Query('environment') environment: string + ): Promise { + throw new Error('Method not implemented.'); + } + + @InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW) + @Get('workflows/:idOrName/status/:executionId') + async getExecutionStatus(@Param('executionId') executionId: string): Promise { + throw new Error('Method not implemented.'); + } + + @InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW) + @Sse('workflows/:idOrName/execution/:executionId/stream') + async triggerWorkflowStream(@Param('executionId') executionId: string): Promise { + throw new Error('Method not implemented.'); + } + @InitFeature(FEATURE_KEY.UPDATE_WORKFLOW_WEBHOOK_DETAILS) @Patch('workflows/:id') async updateWorkflow(@Param('id') id, @Body() workflowValuesToUpdate): Promise { diff --git a/server/src/modules/workflows/listeners/workflow-triggers.listener.ts b/server/src/modules/workflows/listeners/workflow-triggers.listener.ts new file mode 100644 index 0000000000..69683b92e4 --- /dev/null +++ b/server/src/modules/workflows/listeners/workflow-triggers.listener.ts @@ -0,0 +1,37 @@ +import { Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { WorkflowExecutionsService } from '../services/workflow-executions.service'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { Logger } from 'nestjs-pino'; +import { CreateWorkflowExecutionDto } from '@dto/create-workflow-execution.dto'; +import { WorkflowExecution } from '@entities/workflow_execution.entity'; +import { Response } from 'express'; + +export const WORKFLOW_EXECUTION_STATUS = { + TRIGGERED: 'workflow_execution_triggered', + RUNNING: 'workflow_execution_running', + COMPLETED: 'workflow_execution_completed', + ERROR: 'workflow_execution_error', +}; + +@Injectable() +export class WorkflowTriggersListener { + constructor( + protected workflowExecutionsService: WorkflowExecutionsService, + protected readonly logger: Logger, + protected readonly eventEmitter: EventEmitter2 + ) {} + + @OnEvent('triggerWorkflow') + async handleTriggerWorkflow({ + createWorkflowExecutionDto, + workflowExecution, + response, + }: { + createWorkflowExecutionDto: CreateWorkflowExecutionDto; + workflowExecution: WorkflowExecution; + response: Response; + }): Promise { + throw new Error('Not implemented.'); + } +} diff --git a/server/src/modules/workflows/module.ts b/server/src/modules/workflows/module.ts index a0c3078258..67caead1c0 100644 --- a/server/src/modules/workflows/module.ts +++ b/server/src/modules/workflows/module.ts @@ -55,7 +55,8 @@ export class WorkflowsModule { const { PageHelperService } = await import(`${importPath}/apps/services/page.util.service`); const { WorkflowSchedulesService } = await import(`${importPath}/workflows/services/workflow-schedules.service`); const { TemporalService } = await import(`${importPath}/workflows/services/temporal.service`); - const { WorkflowWebhooksListener } = await import(`${importPath}/workflows/listeners/workflow-webhooks.listener`); + const { WorkflowStreamService } = await import(`${importPath}/workflows/services/workflow-stream.service`); + const { WorkflowTriggersListener } = await import(`${importPath}/workflows/listeners/workflow-triggers.listener`); const { FeatureAbilityFactory } = await import(`${importPath}/workflows/ability/app`); return { @@ -110,7 +111,8 @@ export class WorkflowsModule { PageService, EventsService, WorkflowExecutionsService, - WorkflowWebhooksListener, + WorkflowStreamService, + WorkflowTriggersListener, WorkflowWebhooksService, OrganizationConstantsService, ComponentsService, diff --git a/server/src/modules/workflows/services/workflow-executions.service.ts b/server/src/modules/workflows/services/workflow-executions.service.ts index a81f2db6ff..b63056816d 100644 --- a/server/src/modules/workflows/services/workflow-executions.service.ts +++ b/server/src/modules/workflows/services/workflow-executions.service.ts @@ -40,4 +40,8 @@ export class WorkflowExecutionsService implements IWorkflowExecutionsService { ): Promise { throw new Error('Method not implemented.'); } + + async findOne(id: string, relations?: string[]): Promise { + throw new Error('Method not implemented.'); + } } diff --git a/server/src/modules/workflows/services/workflow-stream.service.ts b/server/src/modules/workflows/services/workflow-stream.service.ts new file mode 100644 index 0000000000..d0901c9a4e --- /dev/null +++ b/server/src/modules/workflows/services/workflow-stream.service.ts @@ -0,0 +1,31 @@ +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { Observable } from 'rxjs'; +import { OnEvent } from '@nestjs/event-emitter'; + +export const WORKFLOW_CONNECTION_TYPES = { + INITIALIZED: 'workflow_connection_initialized', + STREAMING: 'workflow_connection_streaming', + ERROR: 'workflow_connection_error', + CLOSE: 'workflow_connection_close', +}; + +// Base WorkflowStreamService class for CE +// This provides the interface but throws "Not implemented" errors for all methods +// EE version will extend this class and provide actual implementations +@Injectable() +export class WorkflowStreamService implements OnModuleInit { + constructor() {} + + onModuleInit() { + // CE version - no implementation needed + } + + @OnEvent('workflow.status') + handleWorkflowStatus({ executionId, status }: { executionId: string; status: any }) { + throw new Error('Method not implemented.'); + } + + getStream(executionId: string): Observable { + throw new Error('Method not implemented.'); + } +} From 1cfdec70d75b46da429a9eb81554b7eb195a24c4 Mon Sep 17 00:00:00 2001 From: Akshay Sasidharan Date: Wed, 2 Jul 2025 13:21:18 +0530 Subject: [PATCH 04/25] fix: Async workflow in runjs --- frontend/ee | 2 +- .../_stores/slices/queryPanelSlice.js | 148 +++++++++++------- .../AppBuilder/_utils/async-query-handler.js | 13 +- server/ee | 2 +- .../apps/controllers/workflow.controller.ts | 3 + server/src/modules/apps/module.ts | 5 +- .../ability/workflow-version.ability.ts | 3 + .../workflow-executions.controller.ts | 2 +- 8 files changed, 112 insertions(+), 66 deletions(-) diff --git a/frontend/ee b/frontend/ee index b117b06fd7..820a61e329 160000 --- a/frontend/ee +++ b/frontend/ee @@ -1 +1 @@ -Subproject commit b117b06fd7dbc43a45d317084ae27b0aedbf4a35 +Subproject commit 820a61e32907263b62fcb9a2fb5ba1389627614c diff --git a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js index fa917e54b6..776542d831 100644 --- a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js +++ b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js @@ -9,6 +9,7 @@ import axios from 'axios'; import { validateMultilineCode } from '@/_helpers/utility'; import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel'; import { deepClone } from '@/_helpers/utilities/utils.helpers'; + const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {}; const initialState = { @@ -27,7 +28,6 @@ const initialState = { loadingDataQueries: false, isPreviewQueryLoading: false, queryPanelSearchTem: '', - asyncQueryRuns: [], // Array to track active AsyncQueryHandler instances }; export const createQueryPanelSlice = (set, get) => ({ @@ -561,42 +561,27 @@ export const createQueryPanelSlice = (set, get) => ({ // Change this conditional to async query type check for other // async queries in the future if (query.kind === 'workflows') { - try { - const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ - executionId: data.data.executionId, - queryId, - processQueryResults, - handleFailure, - shouldSetPreviewData, - setPreviewData, - setResolvedQuery, - }); + const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({ + data, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }); - // Process initial response and start SSE monitoring - asyncHandler.processInitialResponse(data.data); + if (error) { + resolve({ status: 'failed', message: error }); + return; + } - // Add the AsyncQueryHandler instance to asyncQueryRuns - get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); - - // Set initial state with jobId - setResolvedQuery( - queryId, - { - isLoading: true, - jobId: asyncHandler.jobId, - // data: data.data, - }, - moduleId - ); - - // Resolve with async status - resolve({ - jobId: asyncHandler.jobId, - // data: data.data, - }); - } catch (error) { - toast.error(error.message || 'Failed to start async query'); - resolve({ status: 'failed', message: error.message }); + if (!error && completionPromise) { + // This early resolution pattern is temporary - once the UI fully supports + // tracking individual async queries through their lifecycle, we can refactor + // this to rely on the completion promise concurrently + const result = await completionPromise; + resolve(result); } return; } @@ -758,44 +743,45 @@ export const createQueryPanelSlice = (set, get) => ({ setPreviewLoading(false); setIsPreviewQueryLoading(false); if (!calledFromQuery) setPreviewData(finalData); - resolve({ status: 'failed', data: finalData }); - return finalData; + return { status: 'failed', data: finalData }; } } setPreviewLoading(false); setIsPreviewQueryLoading(false); if (!calledFromQuery) setPreviewData(finalData); - resolve({ status: 'ok', data: finalData }); return { status: 'ok', data: finalData }; }; const handleFailurePreview = (errorData) => { setPreviewLoading(false); setIsPreviewQueryLoading(false); if (!calledFromQuery) setPreviewData(errorData); - resolve({ status: 'failed', data: errorData }); - return errorData; + return { status: 'failed', data: errorData }; }; - try { - const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ - executionId: data.data.executionId, - queryId: query.id, - processQueryResults: processQueryResultsPreview, - handleFailure: handleFailurePreview, - shouldSetPreviewData: true, - setPreviewData, - setResolvedQuery: () => {}, // No resolvedQuery for preview - }); - // Process initial response and start SSE monitoring - asyncHandler.processInitialResponse(data.data); - get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); - // Resolve immediately with jobId for UI tracking - resolve({ jobId: asyncHandler.jobId }); - } catch (error) { - toast.error(error.message || 'Failed to start async preview query'); - setPreviewLoading(false); - setIsPreviewQueryLoading(false); - resolve({ status: 'failed', message: error.message }); + const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({ + data, + queryId: query.id, + processQueryResults: processQueryResultsPreview, + handleFailure: handleFailurePreview, + shouldSetPreviewData: true, + setPreviewData, + setResolvedQuery: () => {}, // No resolvedQuery for preview + resolve, + }); + + if (!error && completionPromise) { + try { + // This early resolution pattern is temporary - once the UI fully supports + // tracking individual async queries through their lifecycle, we can refactor + // this to rely on the completion promise concurrently + const result = await completionPromise; + resolve(result); + } catch (error) { + toast.error('Async operation failed:', error); + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + resolve({ status: 'failed', message: error?.message || 'Unknown error' }); + } } return; } @@ -1339,6 +1325,48 @@ export const createQueryPanelSlice = (set, get) => ({ isQuerySelected: (queryId) => { return get().queryPanel.selectedQuery?.id === queryId; }, + + setupAsyncWorkflowHandler: ({ + data, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }) => { + try { + const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ + executionId: data.data.executionId, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }); + + // Process initial response and start SSE monitoring + const { __asyncCompletionPromise } = asyncHandler.processInitialResponse(data.data); + + // Add the AsyncQueryHandler instance to asyncQueryRuns + get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); + + if (setResolvedQuery) { + setResolvedQuery(queryId, { + isLoading: true, + jobId: asyncHandler.jobId, + }); + } + + return { + handler: asyncHandler, + completionPromise: __asyncCompletionPromise, + }; + } catch (error) { + return { error }; + } + }, runQueryOnShortcut: () => { const { queryPanel } = get(); const { runQuery, selectedQuery } = queryPanel; diff --git a/frontend/src/AppBuilder/_utils/async-query-handler.js b/frontend/src/AppBuilder/_utils/async-query-handler.js index b23590343f..ea2202797e 100644 --- a/frontend/src/AppBuilder/_utils/async-query-handler.js +++ b/frontend/src/AppBuilder/_utils/async-query-handler.js @@ -41,7 +41,7 @@ export class AsyncQueryHandler { /** * Processes the initial query response and starts SSE monitoring * @param {Object} response - The initial query response - * @returns {{ jobId: string, cancel: Function }} Status object with jobId and control methods + * @returns {{ __jobId: string, __cancel: Function, __asyncCompletionPromise: Promise }} Status object with jobId, control methods, and completion promise */ processInitialResponse(response) { const jobId = this.config.extractJobId(response); @@ -49,7 +49,14 @@ export class AsyncQueryHandler { this.jobId = jobId; this.eventSource = this.startSSE(jobId); - return { jobId, cancel: () => this.cancel() }; + // Return the reserved async completion promise for consumers + this.__asyncCompletionPromise = + this.__asyncCompletionPromise || + new Promise((resolve, reject) => { + this.resolveCompletion = resolve; + this.rejectCompletion = reject; + }); + return { __jobId: jobId, __cancel: () => this.cancel(), __asyncCompletionPromise: this.__asyncCompletionPromise }; } /** @@ -84,10 +91,12 @@ export class AsyncQueryHandler { case 'COMPLETE': eventSource.close(); this.config.callbacks.onComplete(result); + this.resolveCompletion(result); break; case 'ERROR': eventSource.close(); this.config.callbacks.onError(data); + this.rejectCompletion(data); break; case 'CLOSE': eventSource.close(); diff --git a/server/ee b/server/ee index 61ab78568c..cde2ebcce6 160000 --- a/server/ee +++ b/server/ee @@ -1 +1 @@ -Subproject commit 61ab78568c7843d0fbee66768db04c7f9c482945 +Subproject commit cde2ebcce6c552ba5d9ba49099cd4aae82ca5e1c diff --git a/server/src/modules/apps/controllers/workflow.controller.ts b/server/src/modules/apps/controllers/workflow.controller.ts index cc91daf852..32f724cfd0 100644 --- a/server/src/modules/apps/controllers/workflow.controller.ts +++ b/server/src/modules/apps/controllers/workflow.controller.ts @@ -9,12 +9,15 @@ import { ValidAppGuard } from '../guards/valid-app.guard'; import { AppDecorator as App } from '@modules/app/decorators/app.decorator'; import { WorkflowService } from '../services/workflow.service'; import { IWorkflowController } from '../interfaces/IControllerWorkflow'; +import { InitFeature } from '@modules/app/decorators/init-feature.decorator'; +import { FEATURE_KEY } from '../constants'; @InitModule(MODULES.APP) @Controller('apps') export class WorkflowController implements IWorkflowController { constructor(protected readonly workflowService: WorkflowService) {} + @InitFeature(FEATURE_KEY.GET) @UseGuards(JwtAuthGuard, ValidAppGuard, FeatureAbilityGuard) @Get(':id/workflows') async fetchWorkflows(@App() app: AppEntity) { diff --git a/server/src/modules/apps/module.ts b/server/src/modules/apps/module.ts index af6c809b6e..4bbaa2c6fe 100644 --- a/server/src/modules/apps/module.ts +++ b/server/src/modules/apps/module.ts @@ -30,7 +30,9 @@ export class AppsModule { static async register(configs: { IS_GET_CONTEXT: boolean }): Promise { const importPath = await getImportPath(configs.IS_GET_CONTEXT); const { AppsController } = await import(`${importPath}/apps/controller`); + const { WorkflowController } = await import(`${importPath}/apps/controllers/workflow.controller`); const { AppsService } = await import(`${importPath}/apps/service`); + const { WorkflowService } = await import(`${importPath}/apps/services/workflow.service`); const { AppsUtilService } = await import(`${importPath}/apps/util.service`); const { AppEnvironmentUtilService } = await import(`${importPath}/app-environments/util.service`); const { PageService } = await import(`${importPath}/apps/services/page.service`); @@ -53,9 +55,10 @@ export class AppsModule { await AppPermissionsModule.register(configs), await UsersModule.register(configs), ], - controllers: [AppsController], + controllers: [AppsController, WorkflowController], providers: [ AppsService, + WorkflowService, VersionRepository, AppsRepository, AppGitRepository, diff --git a/server/src/modules/versions/ability/workflow-version.ability.ts b/server/src/modules/versions/ability/workflow-version.ability.ts index 16c54f8e92..e3733a5b3a 100644 --- a/server/src/modules/versions/ability/workflow-version.ability.ts +++ b/server/src/modules/versions/ability/workflow-version.ability.ts @@ -22,6 +22,7 @@ export function defineWorkflowVersionAbility( FEATURE_KEY.UPDATE, FEATURE_KEY.UPDATE_SETTINGS, FEATURE_KEY.PROMOTE, + FEATURE_KEY.APP_VERSION_UPDATE, ], App ); @@ -41,6 +42,7 @@ export function defineWorkflowVersionAbility( FEATURE_KEY.UPDATE, FEATURE_KEY.UPDATE_SETTINGS, FEATURE_KEY.PROMOTE, + FEATURE_KEY.APP_VERSION_UPDATE, ], App ); @@ -58,6 +60,7 @@ export function defineWorkflowVersionAbility( FEATURE_KEY.UPDATE, FEATURE_KEY.UPDATE_SETTINGS, FEATURE_KEY.PROMOTE, + FEATURE_KEY.APP_VERSION_UPDATE, ], App ); diff --git a/server/src/modules/workflows/controllers/workflow-executions.controller.ts b/server/src/modules/workflows/controllers/workflow-executions.controller.ts index c6ce798e38..0917173d7f 100644 --- a/server/src/modules/workflows/controllers/workflow-executions.controller.ts +++ b/server/src/modules/workflows/controllers/workflow-executions.controller.ts @@ -67,7 +67,7 @@ export class WorkflowExecutionsController implements IWorkflowExecutionControlle @InitFeature(FEATURE_KEY.WORKFLOW_EXECUTION_STATUS) @Sse(':id/stream') - streamWorkflowExecution(@Param('id') id: string): Observable { + async streamWorkflowExecution(@Param('id') id: string): Promise> { throw new Error('Method not implemented.'); } } From dfd18fc59cc075cb027c3d59594cb8aeca0ed888 Mon Sep 17 00:00:00 2001 From: Akshay Sasidharan Date: Wed, 2 Jul 2025 14:20:33 +0530 Subject: [PATCH 05/25] Release: Workflows sprint 7 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit What's Changed? šŸ› ļø Fixes Fix: Codehinter modal input crashing on scroll in #3576 by @manishkushare Fix: Unable to configure long string as params because it causes WF to halt execution in #3483 by @manishkushare --- frontend/ee | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/ee b/frontend/ee index 820a61e329..9139824449 160000 --- a/frontend/ee +++ b/frontend/ee @@ -1 +1 @@ -Subproject commit 820a61e32907263b62fcb9a2fb5ba1389627614c +Subproject commit 9139824449c6b82e5c85caba75dc25d6032c6efb From 8fca46f7cb2da14e8271aa1f53c0999f75534a96 Mon Sep 17 00:00:00 2001 From: Akshay Sasidharan Date: Wed, 2 Jul 2025 16:15:24 +0530 Subject: [PATCH 06/25] feat: Workflows import export support --- frontend/src/HomePage/AppMenu.jsx | 17 ++- frontend/src/HomePage/BlankPage.jsx | 58 ++++---- frontend/src/HomePage/HomePage.jsx | 140 ++++++++++++++++-- .../BaseImportAppMenu/BaseImportAppMenu.jsx | 17 ++- server/ee | 2 +- .../services/app-import-export.service.ts | 63 +++++++- 6 files changed, 247 insertions(+), 50 deletions(-) diff --git a/frontend/src/HomePage/AppMenu.jsx b/frontend/src/HomePage/AppMenu.jsx index 05281de413..2cd51be364 100644 --- a/frontend/src/HomePage/AppMenu.jsx +++ b/frontend/src/HomePage/AppMenu.jsx @@ -82,13 +82,22 @@ export const AppMenu = function AppMenu({ )} )} - {canUpdateApp && canCreateApp && appType !== 'workflow' && !isModuleApp && ( + {canUpdateApp && canCreateApp && !isModuleApp && ( <> + {appType !== 'workflow' && ( + openAppActionModal('clone-app')} + /> + )} openAppActionModal('clone-app')} + text={ + appType === 'workflow' + ? t('homePage.appCard.exportWorkflow', 'Export workflow') + : t('homePage.appCard.exportApp', 'Export app') + } + onClick={exportApp} /> - )} {canDeleteApp && ( diff --git a/frontend/src/HomePage/BlankPage.jsx b/frontend/src/HomePage/BlankPage.jsx index 631608dfef..050eaf093d 100644 --- a/frontend/src/HomePage/BlankPage.jsx +++ b/frontend/src/HomePage/BlankPage.jsx @@ -147,34 +147,38 @@ export const BlankPage = function BlankPage({ Create new {appType !== 'workflow' ? 'application' : 'workflow'} - {appType !== 'workflow' && ( -
- + + -
- )} +   + {appType !== 'workflow' + ? t('blankPage.importApplication', 'Import an app') + : t('blankPage.importWorkflow', 'Import a workflow')} + + + +
diff --git a/frontend/src/HomePage/HomePage.jsx b/frontend/src/HomePage/HomePage.jsx index deeada63a4..94612f8358 100644 --- a/frontend/src/HomePage/HomePage.jsx +++ b/frontend/src/HomePage/HomePage.jsx @@ -12,7 +12,7 @@ import { } from '@/_services'; import { ConfirmDialog, AppModal } from '@/_components'; import Select from '@/_ui/Select'; -import _, { sample, isEmpty } from 'lodash'; +import _, { sample, isEmpty, capitalize } from 'lodash'; import { Folders } from './Folders'; import { BlankPage } from './BlankPage'; import { toast } from 'react-hot-toast'; @@ -252,7 +252,11 @@ class HomePageComponent extends React.Component { }; getAppType = () => { - return this.props.appType === 'module' ? 'Module' : this.props.appType === 'workflow' ? 'Workflow' : 'App'; + const { appType } = this.props; + if (appType === 'front-end') return 'App'; + if (appType === 'workflow') return 'Workflow'; + if (appType === 'module') return 'Module'; + return 'app'; }; createApp = async (appName) => { @@ -330,6 +334,66 @@ class HomePageComponent extends React.Component { this.setState({ isExportingApp: true, app: app }); }; + exportAppDirectly = async (app) => { + try { + const fetchVersions = await appsService.getVersions(app.id); + const { versions } = fetchVersions; + + const currentEditingVersion = versions?.filter((version) => version?.isCurrentEditingVersion)[0]; + if (!currentEditingVersion) { + toast.error('Could not find current editing version.', { + position: 'top-center', + }); + return; + } + + // Export all TJDB tables used by default + const fetchTables = await appsService.getTables(app.id); + const { tables: allTables } = fetchTables; + + const versionId = currentEditingVersion.id; + const exportTjDb = true; + const exportTables = allTables; + + const appOpts = { + app: [ + { + id: app.id, + search_params: { version_id: versionId }, + }, + ], + }; + + const requestBody = { + ...appOpts, + ...(exportTjDb && { tooljet_database: exportTables }), + organization_id: app.organization_id, + }; + + const data = await appsService.exportResource(requestBody); + + const appName = app.name.replace(/\s+/g, '-').toLowerCase(); + const fileName = `${appName}-export-${new Date().getTime()}`; + const json = JSON.stringify(data, null, 2); + const blob = new Blob([json], { type: 'application/json' }); + const href = URL.createObjectURL(blob); + const link = document.createElement('a'); + link.href = href; + link.download = fileName + '.json'; + document.body.appendChild(link); + link.click(); + document.body.removeChild(link); + + toast.success('Workflow exported successfully!', { + position: 'top-center', + }); + } catch (error) { + toast.error(`Could not export workflow: ${error?.data?.message || error.message}`, { + position: 'top-center', + }); + } + }; + readAndImport = (event) => { try { const file = event.target.files[0]; @@ -411,7 +475,7 @@ class HomePageComponent extends React.Component { } const data = await appsService.importResource(requestBody); - toast.success('App imported successfully.'); + toast.success(`${capitalize(this.getAppType())} imported successfully.`); this.setState({ isImportingApp: false }); if (!isEmpty(data.imports.app)) { @@ -433,7 +497,7 @@ class HomePageComponent extends React.Component { this.setState({ isImportingApp: false }); if (error.statusCode === 409) return false; - toast.error(error?.error || error?.message || 'App import failed'); + toast.error(error?.error || error?.message || `${capitalize(this.getAppType())} import failed`); } }; @@ -935,6 +999,53 @@ class HomePageComponent extends React.Component { importingGitAppOperations: validationMessage, }); }; + + // Helper functions for workflow limit checks + hasWorkflowLimitReached = () => { + const { workflowInstanceLevelLimit, workflowWorkspaceLevelLimit } = this.state; + + const instanceLimitReached = + workflowInstanceLevelLimit.total === 0 || workflowInstanceLevelLimit.current >= workflowInstanceLevelLimit.total; + const workspaceLimitReached = + workflowWorkspaceLevelLimit.total === 0 || + workflowWorkspaceLevelLimit.current >= workflowWorkspaceLevelLimit.total; + + return instanceLimitReached || workspaceLimitReached; + }; + + hasWorkflowLimitWarning = () => { + const { workflowInstanceLevelLimit, workflowWorkspaceLevelLimit } = this.state; + return this.hasInstanceLimitWarning() || this.hasWorkspaceLimitWarning(); + }; + + hasInstanceLimitWarning = () => { + const { workflowInstanceLevelLimit } = this.state; + const percentage = workflowInstanceLevelLimit.percentage; + + return ( + workflowInstanceLevelLimit.current >= workflowInstanceLevelLimit.total || + (percentage >= 90 && percentage < 100) || + workflowInstanceLevelLimit.current === workflowInstanceLevelLimit.total - 1 + ); + }; + + hasWorkspaceLimitWarning = () => { + const { workflowWorkspaceLevelLimit } = this.state; + const percentage = workflowWorkspaceLevelLimit.percentage; + + return ( + workflowWorkspaceLevelLimit.current >= workflowWorkspaceLevelLimit.total || + (percentage >= 90 && percentage < 100) || + workflowWorkspaceLevelLimit.current === workflowWorkspaceLevelLimit.total - 1 + ); + }; + + getWorkflowLimit = () => { + return this.hasInstanceLimitWarning() + ? this.state.workflowInstanceLevelLimit + : this.state.workflowWorkspaceLevelLimit; + }; + render() { const { apps, @@ -1436,16 +1547,24 @@ class HomePageComponent extends React.Component { 'Create new app' )} - - {this.props.appType !== 'workflow' && this.props.appType !== 'module' && ( + {this.props.appType === 'workflow' ? ( = 100 || (this.props.appType === 'module' && invalidLicense) - } + disabled={this.hasWorkflowLimitReached()} split className="d-inline" data-cy="import-dropdown-menu" /> + ) : ( + this.props.appType !== 'module' && ( + = 100 || (this.props.appType === 'module' && invalidLicense) + } + split + className="d-inline" + data-cy="import-dropdown-menu" + /> + ) )}
@@ -1621,7 +1741,7 @@ class HomePageComponent extends React.Component { canUpdateApp={this.canUpdateApp} deleteApp={this.deleteApp} cloneApp={this.cloneApp} - exportApp={this.exportApp} + exportApp={this.props.appType === 'workflow' ? this.exportAppDirectly : this.exportApp} meta={meta} currentFolder={currentFolder} isLoading={isLoading || !featuresLoaded} diff --git a/frontend/src/modules/common/components/BaseImportAppMenu/BaseImportAppMenu.jsx b/frontend/src/modules/common/components/BaseImportAppMenu/BaseImportAppMenu.jsx index ddf12aacab..624fda5e2c 100644 --- a/frontend/src/modules/common/components/BaseImportAppMenu/BaseImportAppMenu.jsx +++ b/frontend/src/modules/common/components/BaseImportAppMenu/BaseImportAppMenu.jsx @@ -9,19 +9,22 @@ const BaseImportAppMenu = ({ showCloudMenuItems = false, CloudMenuComponent = () => null, darkMode = false, + appType = 'front-end', ...props }) => { const fileInput = React.createRef(); const { t } = useTranslation(); return ( - - {t('homePage.header.chooseFromTemplate', 'Choose from template')} - + {appType !== 'workflow' && ( + + {t('homePage.header.chooseFromTemplate', 'Choose from template')} + + )}