diff --git a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js index f33f67c66e..fa917e54b6 100644 --- a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js +++ b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js @@ -1,3 +1,5 @@ +import { toast } from 'react-hot-toast'; +import { AsyncQueryHandler } from '@/AppBuilder/_utils/async-query-handler'; import _, { isEmpty } from 'lodash'; import { resolveReferences, loadPyodide, hasCircularDependency } from '@/_helpers/utils'; import { fetchOAuthToken, fetchOauthTokenForSlackAndGSheet } from '@/AppBuilder/_utils/auth'; @@ -7,7 +9,6 @@ import axios from 'axios'; import { validateMultilineCode } from '@/_helpers/utility'; import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel'; import { deepClone } from '@/_helpers/utilities/utils.helpers'; -import toast from 'react-hot-toast'; const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {}; const initialState = { @@ -26,6 +27,7 @@ const initialState = { loadingDataQueries: false, isPreviewQueryLoading: false, queryPanelSearchTem: '', + asyncQueryRuns: [], // Array to track active AsyncQueryHandler instances }; export const createQueryPanelSlice = (set, get) => ({ @@ -162,6 +164,19 @@ export const createQueryPanelSlice = (set, get) => ({ 'setLoadingDataQueries' ), + setAsyncQueryRuns: (updater) => + set( + (state) => { + if (typeof updater === 'function') { + state.queryPanel.asyncQueryRuns = updater(state.queryPanel.asyncQueryRuns); + } else { + state.queryPanel.asyncQueryRuns = updater; + } + }, + false, + 'setAsyncQueryRuns' + ), + onQueryConfirmOrCancel: (queryConfirmationData, isConfirm = false, mode = 'edit', moduleId = 'canvas') => { const { queryPanel, dataQuery, setResolvedQuery } = get(); const { runQuery } = queryPanel; @@ -202,6 +217,70 @@ export const createQueryPanelSlice = (set, get) => ({ ); }, + createWorkflowAsyncHandler: ({ + executionId, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }) => { + const asyncHandler = new AsyncQueryHandler({ + streamSSE: (jobId) => { + return workflowExecutionsService.streamSSE(jobId); + }, + extractJobId: () => executionId, + classifyEventStatus: (eventData) => { + // hardcoded for workflows + if (eventData.type === 'workflow_connection_close') { + return { status: 'CLOSE', data: eventData }; + } else if (eventData.type === 'workflow_execution_completed') { + return { status: 'COMPLETE', result: eventData.result, data: eventData }; + } else if (eventData.type === 'workflow_execution_error') { + return { status: 'ERROR', data: eventData }; + } else { + return { status: 'PROGRESS', data: eventData }; + } + }, + callbacks: { + onProgress: (progressData) => { + // Update UI with progress information + if (shouldSetPreviewData) { + setPreviewData({ ...progressData }); + } + setResolvedQuery(queryId, { + isLoading: true, + progress: progressData.progress, + currentData: progressData.partialData || [], + }); + }, + onComplete: async (result) => { + await processQueryResults(result); + // Remove the AsyncQueryHandler instance from asyncQueryRuns on completion + get().queryPanel.setAsyncQueryRuns((currentRuns) => + currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId) + ); + }, + onError: (e) => { + handleFailure({ + status: 'failed', + data: { + message: e.message || 'Error in async processing', + error: e.error, + }, + }); + // Remove the AsyncQueryHandler instance from asyncQueryRuns on error + get().queryPanel.setAsyncQueryRuns((currentRuns) => + currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId) + ); + }, + }, + }); + + return asyncHandler; + }, + runQuery: ( queryId, queryName, @@ -231,7 +310,7 @@ export const createQueryPanelSlice = (set, get) => ({ setPreviewPanelExpanded, executeRunPycode, runTransformation, - executeWorkflow, + triggerWorkflow, executeMultilineJS, } = queryPanel; const queryUpdatePromise = dataQuerySlice.queryUpdates[queryId]; @@ -332,6 +411,97 @@ export const createQueryPanelSlice = (set, get) => ({ } } + // Handler for transformation and completion of query results + const processQueryResults = async (data, rawData = null) => { + let finalData = data; + rawData = rawData || data; + + if (dataQuery.options.enableTransformation) { + finalData = await runTransformation( + finalData, + query.options.transformation, + query.options.transformationLanguage, + query, + mode, + moduleId + ); + + if (finalData.status === 'failed') { + handleFailure(finalData); + return finalData; + } + } + + if (shouldSetPreviewData) { + setPreviewLoading(false); + setPreviewData(finalData); + } + + if (dataQuery.options.showSuccessNotification) { + const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000; + toast.success(dataQuery.options.successMessage, { + duration: notificationDuration, + }); + } + + get().debugger.log({ + logLevel: 'success', + type: 'query', + kind: query.kind, + key: query.name, + message: 'Query executed successfully', + isQuerySuccessLog: true, + errorTarget: 'Queries', + }); + + setResolvedQuery( + queryId, + { + isLoading: false, + data: finalData, + rawData, + metadata: data?.metadata, + request: data?.metadata?.request, + response: data?.metadata?.response, + }, + moduleId + ); + + onEvent('onDataQuerySuccess', queryEvents, mode); + return { status: 'ok', data: finalData }; + }; + + // Handler for query failures + const handleFailure = (errorData) => { + if (shouldSetPreviewData) { + setPreviewLoading(false); + setPreviewData(errorData); + } + + get().debugger.log({ + logLevel: 'error', + type: 'query', + kind: query.kind, + key: query.name, + message: errorData?.description || 'Query failed', + errorTarget: 'Queries', + error: errorData, + isQuerySuccessLog: false, + }); + + setResolvedQuery( + queryId, + { + isLoading: false, + error: errorData, + }, + moduleId + ); + + onEvent('onDataQueryFailure', queryEvents); + return errorData; + }; + // eslint-disable-next-line no-unused-vars return new Promise(function (resolve, reject) { if (shouldSetPreviewData) { @@ -356,7 +526,7 @@ export const createQueryPanelSlice = (set, get) => ({ } else if (query.kind === 'runpy') { queryExecutionPromise = executeRunPycode(query.options.code, query, false, mode, queryState, moduleId); } else if (query.kind === 'workflows') { - queryExecutionPromise = executeWorkflow( + queryExecutionPromise = triggerWorkflow( moduleId, query.options.workflowId, query.options.blocking, @@ -386,6 +556,53 @@ export const createQueryPanelSlice = (set, get) => ({ fetchOAuthToken(url, dataQuery['data_source_id'] || dataQuery['dataSourceId']); } + // Asynchronous query execution + // Currently async query resolution is applicable only to workflows + // Change this conditional to async query type check for other + // async queries in the future + if (query.kind === 'workflows') { + try { + const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ + executionId: data.data.executionId, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }); + + // Process initial response and start SSE monitoring + asyncHandler.processInitialResponse(data.data); + + // Add the AsyncQueryHandler instance to asyncQueryRuns + get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); + + // Set initial state with jobId + setResolvedQuery( + queryId, + { + isLoading: true, + jobId: asyncHandler.jobId, + // data: data.data, + }, + moduleId + ); + + // Resolve with async status + resolve({ + jobId: asyncHandler.jobId, + // data: data.data, + }); + } catch (error) { + toast.error(error.message || 'Failed to start async query'); + resolve({ status: 'failed', message: error.message }); + } + return; + } + + // Handle synchronous queries (original code) + let queryStatusCode = data?.status ?? null; const promiseStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status; // Note: Need to move away from statusText -> statusCode @@ -420,114 +637,15 @@ export const createQueryPanelSlice = (set, get) => ({ errorData = data; break; } - if (shouldSetPreviewData) { - setPreviewLoading(false); - setPreviewData(errorData); - } + errorData = query.kind === 'runpy' || query.kind === 'runjs' ? data?.data : data; - get().debugger.log({ - logLevel: 'error', - type: 'query', - kind: query.kind, - key: query.name, - message: errorData?.description, - errorTarget: 'Queries', - error: - query.kind === 'restapi' - ? { - substitutedVariables: options, - request: data?.data?.requestObject, - response: data?.data?.responseObject, - } - : errorData, - isQuerySuccessLog: false, - }); - - setResolvedQuery( - queryId, - { - isLoading: false, - ...(query.kind === 'restapi' - ? { - metadata: data.metadata, - request: data.data.requestObject, - response: data.data.responseObject, - responseHeaders: data.data.responseHeaders, - } - : {}), - }, - moduleId - ); - - resolve(data); - onEvent('onDataQueryFailure', queryEvents); + const result = handleFailure(errorData); + resolve(result); return; } else { - let rawData = data.data; - let finalData = data.data; - if (dataQuery.options.enableTransformation) { - finalData = await runTransformation( - finalData, - query.options.transformation, - query.options.transformationLanguage, - query, - 'edit', - moduleId - ); - if (finalData.status === 'failed') { - setResolvedQuery( - queryId, - { - isLoading: false, - }, - moduleId - ); - - resolve(finalData); - onEvent('onDataQueryFailure', queryEvents); - setPreviewLoading(false); - if (shouldSetPreviewData) setPreviewData(finalData); - return; - } - } - - if (shouldSetPreviewData) { - setPreviewLoading(false); - setPreviewData(finalData); - } - - if (dataQuery.options.showSuccessNotification) { - const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000; - toast.success(dataQuery.options.successMessage, { - duration: notificationDuration, - }); - } - - get().debugger.log({ - logLevel: 'success', - type: 'query', - kind: query.kind, - key: query.name, - message: 'Query executed successfully', - isQuerySuccessLog: true, - errorTarget: 'Queries', - }); - - setResolvedQuery( - queryId, - { - isLoading: false, - data: finalData, - rawData, - metadata: data?.metadata, - request: data?.metadata?.request, - response: data?.metadata?.response, - }, - moduleId - ); - - resolve({ status: 'ok', data: finalData }); - onEvent('onDataQuerySuccess', queryEvents, mode); + const rawData = data.data; + const result = await processQueryResults(data.data, rawData); + resolve(result); } }) .catch((e) => { @@ -547,7 +665,7 @@ export const createQueryPanelSlice = (set, get) => ({ setPreviewPanelExpanded, executeRunPycode, runTransformation, - executeWorkflow, + triggerWorkflow, executeMultilineJS, setIsPreviewQueryLoading, } = queryPanel; @@ -607,7 +725,7 @@ export const createQueryPanelSlice = (set, get) => ({ } else if (query.kind === 'runpy') { queryExecutionPromise = executeRunPycode(query.options.code, query, true, 'edit', queryState); } else if (query.kind === 'workflows') { - queryExecutionPromise = executeWorkflow( + queryExecutionPromise = triggerWorkflow( moduleId, query.options.workflowId, query.options.blocking, @@ -620,11 +738,72 @@ export const createQueryPanelSlice = (set, get) => ({ queryExecutionPromise .then(async (data) => { + // Asynchronous query execution + // Currently async query resolution is applicable only to workflows + // Change this conditional to async query type check for other + // async queries in the future + if (query.kind === 'workflows') { + const processQueryResultsPreview = async (result) => { + let finalData = result; + if (query.options.enableTransformation) { + finalData = await runTransformation( + finalData, + query.options.transformation, + query.options.transformationLanguage, + query, + 'edit', + moduleId + ); + if (finalData.status === 'failed') { + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + if (!calledFromQuery) setPreviewData(finalData); + resolve({ status: 'failed', data: finalData }); + return finalData; + } + } + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + if (!calledFromQuery) setPreviewData(finalData); + resolve({ status: 'ok', data: finalData }); + return { status: 'ok', data: finalData }; + }; + const handleFailurePreview = (errorData) => { + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + if (!calledFromQuery) setPreviewData(errorData); + resolve({ status: 'failed', data: errorData }); + return errorData; + }; + + try { + const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ + executionId: data.data.executionId, + queryId: query.id, + processQueryResults: processQueryResultsPreview, + handleFailure: handleFailurePreview, + shouldSetPreviewData: true, + setPreviewData, + setResolvedQuery: () => {}, // No resolvedQuery for preview + }); + // Process initial response and start SSE monitoring + asyncHandler.processInitialResponse(data.data); + get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); + // Resolve immediately with jobId for UI tracking + resolve({ jobId: asyncHandler.jobId }); + } catch (error) { + toast.error(error.message || 'Failed to start async preview query'); + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + resolve({ status: 'failed', message: error.message }); + } + return; + } + let finalData = data.data; let queryStatusCode = data?.status ?? null; const queryStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status; switch (true) { - // Note: Need to move away from statusText -> statusCode case queryStatus === 'Bad Request' || queryStatus === 'Not Found' || queryStatus === 'Unprocessable Entity' || @@ -656,9 +835,7 @@ export const createQueryPanelSlice = (set, get) => ({ } onEvent('onDataQueryFailure', queryEvents); - if (!calledFromQuery) setPreviewData(errorData); - break; } case queryStatus === 'needs_oauth': { @@ -721,11 +898,19 @@ export const createQueryPanelSlice = (set, get) => ({ }); }, - executeRunPycode: async (code, query, isPreview, mode, currentState) => { + executeRunPycode: async (code, query, isPreview, mode, currentState, _moduleId = 'canvas') => { const { queryPanel: { evaluatePythonCode }, } = get(); - return { data: await evaluatePythonCode({ code, query, isPreview, mode, currentState }) }; + return { + data: await evaluatePythonCode({ + code, + query, + isPreview, + mode, + currentState, + }), + }; }, evaluatePythonCode: async (options, moduleId = 'canvas') => { @@ -919,7 +1104,13 @@ export const createQueryPanelSlice = (set, get) => ({ const { queryPanel: { evaluatePythonCode }, } = get(); - return await evaluatePythonCode({ queryResult, code, query, mode, currentState }); + return await evaluatePythonCode({ + queryResult, + code, + query, + mode, + currentState, + }); }, updateQuerySuggestions: (oldName, newName) => { @@ -940,7 +1131,7 @@ export const createQueryPanelSlice = (set, get) => ({ delete updatedQueries[oldName]; - const oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`); + const _oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`); // useResolveStore.getState().actions.removeAppSuggestions(oldSuggestions); // useCurrentStateStore.getState().actions.setCurrentState({ @@ -961,6 +1152,18 @@ export const createQueryPanelSlice = (set, get) => ({ return { data: undefined, status: 'failed' }; } }, + triggerWorkflow: async (moduleId, workflowAppId, _blocking = false, params = {}, appEnvId) => { + const { getAllExposedValues } = get(); + const currentState = getAllExposedValues(); + const resolvedParams = get().resolveReferences(moduleId, params, currentState, {}, {}); + + try { + const executionResponse = await workflowExecutionsService.trigger(workflowAppId, resolvedParams, appEnvId); + return { data: executionResponse.result, status: 'ok' }; + } catch (e) { + return { data: e?.message, status: 'failed' }; + } + }, createProxy: (obj, path = '') => { const { queryPanel } = get(); diff --git a/frontend/src/AppBuilder/_utils/async-query-handler.js b/frontend/src/AppBuilder/_utils/async-query-handler.js new file mode 100644 index 0000000000..b23590343f --- /dev/null +++ b/frontend/src/AppBuilder/_utils/async-query-handler.js @@ -0,0 +1,132 @@ +// AsyncQueryHandler manages long-running operations via server-sent events (SSE). +export class AsyncQueryHandler { + /** + * Creates a new AsyncQueryHandler + * @param {Object} options - Configuration options + * @param {Function} options.streamSSE - Function that returns an EventSource for SSE status updates + * @param {Function} options.extractJobId - Function to extract job ID from response + * @param {Function} options.classifyEventStatus - Function to classify SSE events into status categories + * @param {Object} options.callbacks - Event callbacks + * @param {Function} options.callbacks.onProgress - Progress update handler + * @param {Function} options.callbacks.onComplete - Completion handler + * @param {Function} options.callbacks.onError - Error handler + * @param {Function} options.callbacks.onClose - Close handler + */ + constructor(options = {}) { + this.config = { + streamSSE: () => {}, + extractJobId: (response) => response.data?.id, + // Default implementation that doesn't make assumptions about specific status/type fields + classifyEventStatus: (data) => { + return { + // Default to treating all messages as progress updates + status: 'PROGRESS', + result: data.result || data, + // Return data for callback handlers + data, + }; + }, + callbacks: { + onProgress: () => {}, + onComplete: () => {}, + onError: () => {}, + onClose: () => {}, + }, + ...options, + }; + this.eventSource = null; + this.jobId = null; + } + + /** + * Processes the initial query response and starts SSE monitoring + * @param {Object} response - The initial query response + * @returns {{ jobId: string, cancel: Function }} Status object with jobId and control methods + */ + processInitialResponse(response) { + const jobId = this.config.extractJobId(response); + if (!jobId) throw new Error('Could not extract job ID for async query'); + this.jobId = jobId; + this.eventSource = this.startSSE(jobId); + + return { jobId, cancel: () => this.cancel() }; + } + + /** + * Opens an SSE connection to receive real-time updates for the given job. + * @private + * @param {string} jobId - Identifier for the async job + * @returns {EventSource} SSE event source for updates + */ + startSSE(jobId) { + const eventSource = this.config.streamSSE(jobId); + eventSource.onmessage = (event) => this.handleMessage(event, eventSource); + eventSource.onerror = (error) => this.handleError(error, eventSource); + + return eventSource; + } + + /** + * Processes incoming SSE messages and delegates to the appropriate callback. + * @private + * @param {MessageEvent} event - Incoming SSE message + * @param {EventSource} eventSource - EventSource instance for the SSE connection + */ + handleMessage(event, eventSource) { + try { + const payload = JSON.parse(event.data); + const { status, result, data } = this.config.classifyEventStatus(payload); + + switch (status) { + case 'PROGRESS': + this.config.callbacks.onProgress(data); + break; + case 'COMPLETE': + eventSource.close(); + this.config.callbacks.onComplete(result); + break; + case 'ERROR': + eventSource.close(); + this.config.callbacks.onError(data); + break; + case 'CLOSE': + eventSource.close(); + this.config.callbacks.onClose(data); + break; + default: + this.config.callbacks.onProgress(data); + } + } catch (err) { + console.error('Error parsing SSE message:', err); + eventSource.close(); + this.config.callbacks.onError({ message: 'Invalid server message', error: err }); + } + } + + /** + * Handles SSE connection errors and notifies onError if closed. + * @private + * @param {any} error - Error event or object + * @param {EventSource} eventSource - EventSource instance for the SSE connection + */ + handleError(error, eventSource) { + if (eventSource.readyState === EventSource.CLOSED) { + this.config.callbacks.onError({ message: 'SSE connection closed', error }); + } + } + + /** + * Cancels the ongoing async operation and cleans up resources. + */ + cancel() { + if (this.eventSource) { + this.eventSource.close(); + } + // Notify backend to cancel the job if jobId exists + // if (this.jobId) { + // fetch(`${this.config.endpoint}/${this.jobId}/cancel`, { method: 'POST' }).catch((e) => + // console.error('Failed to cancel async job', e) + // ); + // } + } +} diff --git a/frontend/src/_services/workflow_executions.service.js b/frontend/src/_services/workflow_executions.service.js index 8eac985252..6eadc2e43b 100644 --- a/frontend/src/_services/workflow_executions.service.js +++ b/frontend/src/_services/workflow_executions.service.js @@ -10,6 +10,8 @@ export const workflowExecutionsService = { all, enableWebhook, previewQueryNode, + trigger, + streamSSE, }; function previewQueryNode(queryId, appVersionId, nodeId) { @@ -70,3 +72,22 @@ function enableWebhook(appId, value) { const requestOptions = { method: 'PATCH', headers: authHeader(), body: JSON.stringify(body), credentials: 'include' }; return fetch(`${config.apiUrl}/v2/webhooks/workflows/${appId}`, requestOptions).then(handleResponse); } + +function trigger(workflowAppId, params, environmentId) { + const currentSession = authenticationService.currentSessionValue; + const body = { + appId: workflowAppId, + userId: currentSession.current_user?.id, + executeUsing: 'app', + params: Object.fromEntries(params.map((param) => [param.key, param.value])), + environmentId, + }; + const requestOptions = { method: 'POST', headers: authHeader(), body: JSON.stringify(body), credentials: 'include' }; + return fetch(`${config.apiUrl}/workflow_executions/${workflowAppId}/trigger`, requestOptions).then(handleResponse); +} + +function streamSSE(workflowExecutionId) { + return new EventSource(`${config.apiUrl}/workflow_executions/${workflowExecutionId}/stream`, { + withCredentials: true, + }); +} diff --git a/server/ee b/server/ee index 71e393543d..61ab78568c 160000 --- a/server/ee +++ b/server/ee @@ -1 +1 @@ -Subproject commit 71e393543ded7c10d1488455b73e3259d1b4d7b6 +Subproject commit 61ab78568c7843d0fbee66768db04c7f9c482945 diff --git a/server/src/main.ts b/server/src/main.ts index f72400bb94..34098a2fbe 100644 --- a/server/src/main.ts +++ b/server/src/main.ts @@ -49,8 +49,12 @@ async function handleLicensingInit(app: NestExpressApplication) { const License = LicenseModule.default; licenseUtilService.validateHostnameSubpath(License.Instance()?.domains); + const { isValid, licenseType, terms } = License.Instance(); console.log( - `License valid : ${License.Instance().isValid} License Terms : ${JSON.stringify(License.Instance().terms)} šŸš€` + `\nšŸ“‘ License Information: + - Valid: ${isValid} + - Type: ${licenseType} + - Terms:\n${JSON.stringify(terms, null, 2)}\n` ); } function replaceSubpathPlaceHoldersInStaticAssets() { diff --git a/server/src/modules/auth/workflow-sse-auth.guard.ts b/server/src/modules/auth/workflow-sse-auth.guard.ts new file mode 100644 index 0000000000..26fb28092c --- /dev/null +++ b/server/src/modules/auth/workflow-sse-auth.guard.ts @@ -0,0 +1,12 @@ +import { Injectable, ExecutionContext } from '@nestjs/common'; +import { AuthGuard } from '@nestjs/passport'; + +@Injectable() +export class WorkflowSseAuthGuard extends AuthGuard('jwt') { + canActivate(context: ExecutionContext) { + const request = context.switchToHttp().getRequest(); + request.isUserNotMandatory = true; + + return request; + } +} diff --git a/server/src/modules/licensing/constants/PlanTerms.ts b/server/src/modules/licensing/constants/PlanTerms.ts index 509de76918..95a5dcb763 100644 --- a/server/src/modules/licensing/constants/PlanTerms.ts +++ b/server/src/modules/licensing/constants/PlanTerms.ts @@ -84,7 +84,7 @@ export const ENTERPRISE_PLAN_TERMS = { export const WORKFLOW_TEAM_PLAN_TERMS: Partial = { workflows: { - execution_timeout: 60, + execution_timeout: 1800, instance: { total: LICENSE_LIMIT.UNLIMITED, daily_executions: LICENSE_LIMIT.UNLIMITED, diff --git a/server/src/modules/licensing/guards/webhook.guard.ts b/server/src/modules/licensing/guards/webhook.guard.ts index 67dec9fd98..b896d519ee 100644 --- a/server/src/modules/licensing/guards/webhook.guard.ts +++ b/server/src/modules/licensing/guards/webhook.guard.ts @@ -40,38 +40,38 @@ export class WebhookGuard implements CanActivate { // Workspace Level - if (workflowsLimit.workspace) { // Daily Limit - if ( - workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query( - `SELECT COUNT(*) + const workspaceDailyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE a.organization_id = $1 - AND DATE(we.created_at) = current_date`, - [workflowApp.organizationId] - ) - )[0].count >= workflowsLimit.workspace.daily_executions + AND DATE(we.created_at) = current_date + `; + + if ( + workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(workspaceDailyExecutionsQuery, [workflowApp.organizationId]))[0].count >= + workflowsLimit.workspace.daily_executions ) { throw new HttpException('Maximum daily limit for workflow execution has reached for this workspace', 451); } // Monthly Limit - if ( - workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query( - `SELECT COUNT(*) + const workspaceMonthlyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE a.organization_id = $1 AND extract (year from we.created_at) = extract (year from current_date) - AND extract (month from we.created_at) = extract (month from current_date)`, - [workflowApp.organizationId] - ) - )[0].count >= workflowsLimit.workspace.monthly_executions + AND extract (month from we.created_at) = extract (month from current_date) + `; + + if ( + workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(workspaceMonthlyExecutionsQuery, [workflowApp.organizationId]))[0].count >= + workflowsLimit.workspace.monthly_executions ) { throw new HttpException('Maximum monthly limit for workflow execution has reached for this workspace', 451); } @@ -80,30 +80,35 @@ export class WebhookGuard implements CanActivate { // Instance Level - if (workflowsLimit.instance) { // Daily Limit - if ( - workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query(`SELECT COUNT(*) + const instanceDailyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id - WHERE DATE(we.created_at) = current_date`) - )[0].count >= workflowsLimit.instance.daily_executions + WHERE DATE(we.created_at) = current_date + `; + + if ( + workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(instanceDailyExecutionsQuery))[0].count >= workflowsLimit.instance.daily_executions ) { throw new HttpException('Maximum daily limit for workflow execution has been reached', 451); } // Monthly Limit - if ( - workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED && - ( - await this.manager.query(`SELECT COUNT(*) + const instanceMonthlyExecutionsQuery = ` + SELECT COUNT(*) FROM apps a INNER JOIN app_versions av on av.app_id = a.id INNER JOIN workflow_executions we on we.app_version_id = av.id WHERE extract (year from we.created_at) = extract (year from current_date) - AND extract (month from we.created_at) = extract (month from current_date)`) - )[0].count >= workflowsLimit.instance.monthly_executions + AND extract (month from we.created_at) = extract (month from current_date) + `; + + if ( + workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED && + (await this.manager.query(instanceMonthlyExecutionsQuery))[0].count >= + workflowsLimit.instance.monthly_executions ) { throw new HttpException('Maximum monthly limit for workflow execution has been reached', 451); } diff --git a/server/src/modules/workflows/controllers/workflow-executions.controller.ts b/server/src/modules/workflows/controllers/workflow-executions.controller.ts index 6de5ced93f..c6ce798e38 100644 --- a/server/src/modules/workflows/controllers/workflow-executions.controller.ts +++ b/server/src/modules/workflows/controllers/workflow-executions.controller.ts @@ -1,4 +1,4 @@ -import { Body, Controller, Get, Param, Post, Res } from '@nestjs/common'; +import { Body, Controller, Get, Param, Post, Res, Sse } from '@nestjs/common'; import { Response } from 'express'; import { IWorkflowExecutionController } from '../interfaces/IWorkflowExecutionController'; import { CreateWorkflowExecutionDto } from '@dto/create-workflow-execution.dto'; @@ -9,6 +9,7 @@ import { InitModule } from '@modules/app/decorators/init-module'; import { MODULES } from '@modules/app/constants/modules'; import { InitFeature } from '@modules/app/decorators/init-feature.decorator'; import { FEATURE_KEY } from '@modules/workflows/constants'; +import { Observable } from 'rxjs'; @InitModule(MODULES.WORKFLOWS) @Controller('workflow_executions') @@ -52,4 +53,21 @@ export class WorkflowExecutionsController implements IWorkflowExecutionControlle ): Promise<{ result: any }> { throw new Error('Method not implemented.'); } + + @InitFeature(FEATURE_KEY.EXECUTE_WORKFLOW) + @Post(':id/trigger') + async trigger( + @Param('id') id: string, + @Body() createWorkflowExecutionDto: CreateWorkflowExecutionDto, + @User() user, + @Res({ passthrough: true }) response: Response + ): Promise<{ result: any }> { + throw new Error('Method not implemented.'); + } + + @InitFeature(FEATURE_KEY.WORKFLOW_EXECUTION_STATUS) + @Sse(':id/stream') + streamWorkflowExecution(@Param('id') id: string): Observable { + throw new Error('Method not implemented.'); + } } diff --git a/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts b/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts index 4558327f6b..982448aaf2 100644 --- a/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts +++ b/server/src/modules/workflows/controllers/workflow-webhooks.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Post, Param, Body, Patch, Query, Res } from '@nestjs/common'; +import { Controller, Post, Param, Body, Patch, Query, Res, Get, Sse } from '@nestjs/common'; import { Response } from 'express'; import { IWorkflowWebhooksController } from '../interfaces/IWorkflowWebhooksController'; import { InitModule } from '@modules/app/decorators/init-module'; @@ -25,6 +25,29 @@ export class WorkflowWebhooksController implements IWorkflowWebhooksController { throw new Error('Method not implemented.'); } + @InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW) + @Post('workflows/:idOrName/trigger-async') + async triggerWorkflowAsync( + @Param('app') app: any, + @Param('idOrName') idOrName: string, + @Body() workflowParams: Record, + @Query('environment') environment: string + ): Promise { + throw new Error('Method not implemented.'); + } + + @InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW) + @Get('workflows/:idOrName/status/:executionId') + async getExecutionStatus(@Param('executionId') executionId: string): Promise { + throw new Error('Method not implemented.'); + } + + @InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW) + @Sse('workflows/:idOrName/execution/:executionId/stream') + async triggerWorkflowStream(@Param('executionId') executionId: string): Promise { + throw new Error('Method not implemented.'); + } + @InitFeature(FEATURE_KEY.UPDATE_WORKFLOW_WEBHOOK_DETAILS) @Patch('workflows/:id') async updateWorkflow(@Param('id') id, @Body() workflowValuesToUpdate): Promise { diff --git a/server/src/modules/workflows/listeners/workflow-triggers.listener.ts b/server/src/modules/workflows/listeners/workflow-triggers.listener.ts new file mode 100644 index 0000000000..69683b92e4 --- /dev/null +++ b/server/src/modules/workflows/listeners/workflow-triggers.listener.ts @@ -0,0 +1,37 @@ +import { Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { WorkflowExecutionsService } from '../services/workflow-executions.service'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { Logger } from 'nestjs-pino'; +import { CreateWorkflowExecutionDto } from '@dto/create-workflow-execution.dto'; +import { WorkflowExecution } from '@entities/workflow_execution.entity'; +import { Response } from 'express'; + +export const WORKFLOW_EXECUTION_STATUS = { + TRIGGERED: 'workflow_execution_triggered', + RUNNING: 'workflow_execution_running', + COMPLETED: 'workflow_execution_completed', + ERROR: 'workflow_execution_error', +}; + +@Injectable() +export class WorkflowTriggersListener { + constructor( + protected workflowExecutionsService: WorkflowExecutionsService, + protected readonly logger: Logger, + protected readonly eventEmitter: EventEmitter2 + ) {} + + @OnEvent('triggerWorkflow') + async handleTriggerWorkflow({ + createWorkflowExecutionDto, + workflowExecution, + response, + }: { + createWorkflowExecutionDto: CreateWorkflowExecutionDto; + workflowExecution: WorkflowExecution; + response: Response; + }): Promise { + throw new Error('Not implemented.'); + } +} diff --git a/server/src/modules/workflows/module.ts b/server/src/modules/workflows/module.ts index a0c3078258..67caead1c0 100644 --- a/server/src/modules/workflows/module.ts +++ b/server/src/modules/workflows/module.ts @@ -55,7 +55,8 @@ export class WorkflowsModule { const { PageHelperService } = await import(`${importPath}/apps/services/page.util.service`); const { WorkflowSchedulesService } = await import(`${importPath}/workflows/services/workflow-schedules.service`); const { TemporalService } = await import(`${importPath}/workflows/services/temporal.service`); - const { WorkflowWebhooksListener } = await import(`${importPath}/workflows/listeners/workflow-webhooks.listener`); + const { WorkflowStreamService } = await import(`${importPath}/workflows/services/workflow-stream.service`); + const { WorkflowTriggersListener } = await import(`${importPath}/workflows/listeners/workflow-triggers.listener`); const { FeatureAbilityFactory } = await import(`${importPath}/workflows/ability/app`); return { @@ -110,7 +111,8 @@ export class WorkflowsModule { PageService, EventsService, WorkflowExecutionsService, - WorkflowWebhooksListener, + WorkflowStreamService, + WorkflowTriggersListener, WorkflowWebhooksService, OrganizationConstantsService, ComponentsService, diff --git a/server/src/modules/workflows/services/workflow-executions.service.ts b/server/src/modules/workflows/services/workflow-executions.service.ts index a81f2db6ff..b63056816d 100644 --- a/server/src/modules/workflows/services/workflow-executions.service.ts +++ b/server/src/modules/workflows/services/workflow-executions.service.ts @@ -40,4 +40,8 @@ export class WorkflowExecutionsService implements IWorkflowExecutionsService { ): Promise { throw new Error('Method not implemented.'); } + + async findOne(id: string, relations?: string[]): Promise { + throw new Error('Method not implemented.'); + } } diff --git a/server/src/modules/workflows/services/workflow-stream.service.ts b/server/src/modules/workflows/services/workflow-stream.service.ts new file mode 100644 index 0000000000..d0901c9a4e --- /dev/null +++ b/server/src/modules/workflows/services/workflow-stream.service.ts @@ -0,0 +1,31 @@ +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { Observable } from 'rxjs'; +import { OnEvent } from '@nestjs/event-emitter'; + +export const WORKFLOW_CONNECTION_TYPES = { + INITIALIZED: 'workflow_connection_initialized', + STREAMING: 'workflow_connection_streaming', + ERROR: 'workflow_connection_error', + CLOSE: 'workflow_connection_close', +}; + +// Base WorkflowStreamService class for CE +// This provides the interface but throws "Not implemented" errors for all methods +// EE version will extend this class and provide actual implementations +@Injectable() +export class WorkflowStreamService implements OnModuleInit { + constructor() {} + + onModuleInit() { + // CE version - no implementation needed + } + + @OnEvent('workflow.status') + handleWorkflowStatus({ executionId, status }: { executionId: string; status: any }) { + throw new Error('Method not implemented.'); + } + + getStream(executionId: string): Observable { + throw new Error('Method not implemented.'); + } +}