feat: Setup async workflow query execution with SSE

Overview
This PR implements real-time workflow execution monitoring using Server-Sent Events (SSE) as part of our LTS feature support. The implementation allows for tracking long-running workflows without requiring deployment changes.

Technical Changes
1. Server Components
Added WorkflowStreamService to manage persistent SSE streams with automatic cleanup
Implemented @sse endpoint in WorkflowExecutionsController for streaming status updates
Created WorkflowTriggersListener to emit workflow execution events with EventEmitter2
Added workflow execution status constants to identify different states in the execution lifecycle
2. Client-side Components
Implemented AsyncQueryHandler to manage SSE connections and parse event streams
Enhanced queryPanelSlice with methods to create async handlers and trigger workflows
Added support for non-blocking workflow execution with real-time status updates
3. Workflow Integration
Modified workflow triggering to use the SSE-based monitoring approach
Maintained same-server architecture to avoid deployment changes
Added automatic reconnection handling and error recovery for client connections
Architecture Decisions
Selected Same Server Approach: Chose to implement workflows within the same HTTP server to maintain the existing deployment setup for LTS users
Real-time Updates with SSE: Leveraged Server-Sent Events for their simplicity, efficiency, and compatibility with existing infrastructure
Future Extension Path: Implementation can be extended to Worker Threads or Microservice architecture later if needed
This commit is contained in:
Akshay Sasidharan 2025-07-01 19:17:07 +05:30
parent ce5dcb6b0a
commit 131ff6a288
14 changed files with 646 additions and 154 deletions

View file

@ -1,3 +1,5 @@
import { toast } from 'react-hot-toast';
import { AsyncQueryHandler } from '@/AppBuilder/_utils/async-query-handler';
import _, { isEmpty } from 'lodash';
import { resolveReferences, loadPyodide, hasCircularDependency } from '@/_helpers/utils';
import { fetchOAuthToken, fetchOauthTokenForSlackAndGSheet } from '@/AppBuilder/_utils/auth';
@ -7,7 +9,6 @@ import axios from 'axios';
import { validateMultilineCode } from '@/_helpers/utility';
import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel';
import { deepClone } from '@/_helpers/utilities/utils.helpers';
import toast from 'react-hot-toast';
const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {};
const initialState = {
@ -26,6 +27,7 @@ const initialState = {
loadingDataQueries: false,
isPreviewQueryLoading: false,
queryPanelSearchTem: '',
asyncQueryRuns: [], // Array to track active AsyncQueryHandler instances
};
export const createQueryPanelSlice = (set, get) => ({
@ -162,6 +164,19 @@ export const createQueryPanelSlice = (set, get) => ({
'setLoadingDataQueries'
),
setAsyncQueryRuns: (updater) =>
set(
(state) => {
if (typeof updater === 'function') {
state.queryPanel.asyncQueryRuns = updater(state.queryPanel.asyncQueryRuns);
} else {
state.queryPanel.asyncQueryRuns = updater;
}
},
false,
'setAsyncQueryRuns'
),
onQueryConfirmOrCancel: (queryConfirmationData, isConfirm = false, mode = 'edit', moduleId = 'canvas') => {
const { queryPanel, dataQuery, setResolvedQuery } = get();
const { runQuery } = queryPanel;
@ -202,6 +217,70 @@ export const createQueryPanelSlice = (set, get) => ({
);
},
createWorkflowAsyncHandler: ({
executionId,
queryId,
processQueryResults,
handleFailure,
shouldSetPreviewData,
setPreviewData,
setResolvedQuery,
}) => {
const asyncHandler = new AsyncQueryHandler({
streamSSE: (jobId) => {
return workflowExecutionsService.streamSSE(jobId);
},
extractJobId: () => executionId,
classifyEventStatus: (eventData) => {
// hardcoded for workflows
if (eventData.type === 'workflow_connection_close') {
return { status: 'CLOSE', data: eventData };
} else if (eventData.type === 'workflow_execution_completed') {
return { status: 'COMPLETE', result: eventData.result, data: eventData };
} else if (eventData.type === 'workflow_execution_error') {
return { status: 'ERROR', data: eventData };
} else {
return { status: 'PROGRESS', data: eventData };
}
},
callbacks: {
onProgress: (progressData) => {
// Update UI with progress information
if (shouldSetPreviewData) {
setPreviewData({ ...progressData });
}
setResolvedQuery(queryId, {
isLoading: true,
progress: progressData.progress,
currentData: progressData.partialData || [],
});
},
onComplete: async (result) => {
await processQueryResults(result);
// Remove the AsyncQueryHandler instance from asyncQueryRuns on completion
get().queryPanel.setAsyncQueryRuns((currentRuns) =>
currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId)
);
},
onError: (e) => {
handleFailure({
status: 'failed',
data: {
message: e.message || 'Error in async processing',
error: e.error,
},
});
// Remove the AsyncQueryHandler instance from asyncQueryRuns on error
get().queryPanel.setAsyncQueryRuns((currentRuns) =>
currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId)
);
},
},
});
return asyncHandler;
},
runQuery: (
queryId,
queryName,
@ -231,7 +310,7 @@ export const createQueryPanelSlice = (set, get) => ({
setPreviewPanelExpanded,
executeRunPycode,
runTransformation,
executeWorkflow,
triggerWorkflow,
executeMultilineJS,
} = queryPanel;
const queryUpdatePromise = dataQuerySlice.queryUpdates[queryId];
@ -332,6 +411,97 @@ export const createQueryPanelSlice = (set, get) => ({
}
}
// Handler for transformation and completion of query results
const processQueryResults = async (data, rawData = null) => {
let finalData = data;
rawData = rawData || data;
if (dataQuery.options.enableTransformation) {
finalData = await runTransformation(
finalData,
query.options.transformation,
query.options.transformationLanguage,
query,
mode,
moduleId
);
if (finalData.status === 'failed') {
handleFailure(finalData);
return finalData;
}
}
if (shouldSetPreviewData) {
setPreviewLoading(false);
setPreviewData(finalData);
}
if (dataQuery.options.showSuccessNotification) {
const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000;
toast.success(dataQuery.options.successMessage, {
duration: notificationDuration,
});
}
get().debugger.log({
logLevel: 'success',
type: 'query',
kind: query.kind,
key: query.name,
message: 'Query executed successfully',
isQuerySuccessLog: true,
errorTarget: 'Queries',
});
setResolvedQuery(
queryId,
{
isLoading: false,
data: finalData,
rawData,
metadata: data?.metadata,
request: data?.metadata?.request,
response: data?.metadata?.response,
},
moduleId
);
onEvent('onDataQuerySuccess', queryEvents, mode);
return { status: 'ok', data: finalData };
};
// Handler for query failures
const handleFailure = (errorData) => {
if (shouldSetPreviewData) {
setPreviewLoading(false);
setPreviewData(errorData);
}
get().debugger.log({
logLevel: 'error',
type: 'query',
kind: query.kind,
key: query.name,
message: errorData?.description || 'Query failed',
errorTarget: 'Queries',
error: errorData,
isQuerySuccessLog: false,
});
setResolvedQuery(
queryId,
{
isLoading: false,
error: errorData,
},
moduleId
);
onEvent('onDataQueryFailure', queryEvents);
return errorData;
};
// eslint-disable-next-line no-unused-vars
return new Promise(function (resolve, reject) {
if (shouldSetPreviewData) {
@ -356,7 +526,7 @@ export const createQueryPanelSlice = (set, get) => ({
} else if (query.kind === 'runpy') {
queryExecutionPromise = executeRunPycode(query.options.code, query, false, mode, queryState, moduleId);
} else if (query.kind === 'workflows') {
queryExecutionPromise = executeWorkflow(
queryExecutionPromise = triggerWorkflow(
moduleId,
query.options.workflowId,
query.options.blocking,
@ -386,6 +556,53 @@ export const createQueryPanelSlice = (set, get) => ({
fetchOAuthToken(url, dataQuery['data_source_id'] || dataQuery['dataSourceId']);
}
// Asynchronous query execution
// Currently async query resolution is applicable only to workflows
// Change this conditional to async query type check for other
// async queries in the future
if (query.kind === 'workflows') {
try {
const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({
executionId: data.data.executionId,
queryId,
processQueryResults,
handleFailure,
shouldSetPreviewData,
setPreviewData,
setResolvedQuery,
});
// Process initial response and start SSE monitoring
asyncHandler.processInitialResponse(data.data);
// Add the AsyncQueryHandler instance to asyncQueryRuns
get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]);
// Set initial state with jobId
setResolvedQuery(
queryId,
{
isLoading: true,
jobId: asyncHandler.jobId,
// data: data.data,
},
moduleId
);
// Resolve with async status
resolve({
jobId: asyncHandler.jobId,
// data: data.data,
});
} catch (error) {
toast.error(error.message || 'Failed to start async query');
resolve({ status: 'failed', message: error.message });
}
return;
}
// Handle synchronous queries (original code)
let queryStatusCode = data?.status ?? null;
const promiseStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status;
// Note: Need to move away from statusText -> statusCode
@ -420,114 +637,15 @@ export const createQueryPanelSlice = (set, get) => ({
errorData = data;
break;
}
if (shouldSetPreviewData) {
setPreviewLoading(false);
setPreviewData(errorData);
}
errorData = query.kind === 'runpy' || query.kind === 'runjs' ? data?.data : data;
get().debugger.log({
logLevel: 'error',
type: 'query',
kind: query.kind,
key: query.name,
message: errorData?.description,
errorTarget: 'Queries',
error:
query.kind === 'restapi'
? {
substitutedVariables: options,
request: data?.data?.requestObject,
response: data?.data?.responseObject,
}
: errorData,
isQuerySuccessLog: false,
});
setResolvedQuery(
queryId,
{
isLoading: false,
...(query.kind === 'restapi'
? {
metadata: data.metadata,
request: data.data.requestObject,
response: data.data.responseObject,
responseHeaders: data.data.responseHeaders,
}
: {}),
},
moduleId
);
resolve(data);
onEvent('onDataQueryFailure', queryEvents);
const result = handleFailure(errorData);
resolve(result);
return;
} else {
let rawData = data.data;
let finalData = data.data;
if (dataQuery.options.enableTransformation) {
finalData = await runTransformation(
finalData,
query.options.transformation,
query.options.transformationLanguage,
query,
'edit',
moduleId
);
if (finalData.status === 'failed') {
setResolvedQuery(
queryId,
{
isLoading: false,
},
moduleId
);
resolve(finalData);
onEvent('onDataQueryFailure', queryEvents);
setPreviewLoading(false);
if (shouldSetPreviewData) setPreviewData(finalData);
return;
}
}
if (shouldSetPreviewData) {
setPreviewLoading(false);
setPreviewData(finalData);
}
if (dataQuery.options.showSuccessNotification) {
const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000;
toast.success(dataQuery.options.successMessage, {
duration: notificationDuration,
});
}
get().debugger.log({
logLevel: 'success',
type: 'query',
kind: query.kind,
key: query.name,
message: 'Query executed successfully',
isQuerySuccessLog: true,
errorTarget: 'Queries',
});
setResolvedQuery(
queryId,
{
isLoading: false,
data: finalData,
rawData,
metadata: data?.metadata,
request: data?.metadata?.request,
response: data?.metadata?.response,
},
moduleId
);
resolve({ status: 'ok', data: finalData });
onEvent('onDataQuerySuccess', queryEvents, mode);
const rawData = data.data;
const result = await processQueryResults(data.data, rawData);
resolve(result);
}
})
.catch((e) => {
@ -547,7 +665,7 @@ export const createQueryPanelSlice = (set, get) => ({
setPreviewPanelExpanded,
executeRunPycode,
runTransformation,
executeWorkflow,
triggerWorkflow,
executeMultilineJS,
setIsPreviewQueryLoading,
} = queryPanel;
@ -607,7 +725,7 @@ export const createQueryPanelSlice = (set, get) => ({
} else if (query.kind === 'runpy') {
queryExecutionPromise = executeRunPycode(query.options.code, query, true, 'edit', queryState);
} else if (query.kind === 'workflows') {
queryExecutionPromise = executeWorkflow(
queryExecutionPromise = triggerWorkflow(
moduleId,
query.options.workflowId,
query.options.blocking,
@ -620,11 +738,72 @@ export const createQueryPanelSlice = (set, get) => ({
queryExecutionPromise
.then(async (data) => {
// Asynchronous query execution
// Currently async query resolution is applicable only to workflows
// Change this conditional to async query type check for other
// async queries in the future
if (query.kind === 'workflows') {
const processQueryResultsPreview = async (result) => {
let finalData = result;
if (query.options.enableTransformation) {
finalData = await runTransformation(
finalData,
query.options.transformation,
query.options.transformationLanguage,
query,
'edit',
moduleId
);
if (finalData.status === 'failed') {
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
if (!calledFromQuery) setPreviewData(finalData);
resolve({ status: 'failed', data: finalData });
return finalData;
}
}
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
if (!calledFromQuery) setPreviewData(finalData);
resolve({ status: 'ok', data: finalData });
return { status: 'ok', data: finalData };
};
const handleFailurePreview = (errorData) => {
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
if (!calledFromQuery) setPreviewData(errorData);
resolve({ status: 'failed', data: errorData });
return errorData;
};
try {
const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({
executionId: data.data.executionId,
queryId: query.id,
processQueryResults: processQueryResultsPreview,
handleFailure: handleFailurePreview,
shouldSetPreviewData: true,
setPreviewData,
setResolvedQuery: () => {}, // No resolvedQuery for preview
});
// Process initial response and start SSE monitoring
asyncHandler.processInitialResponse(data.data);
get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]);
// Resolve immediately with jobId for UI tracking
resolve({ jobId: asyncHandler.jobId });
} catch (error) {
toast.error(error.message || 'Failed to start async preview query');
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
resolve({ status: 'failed', message: error.message });
}
return;
}
let finalData = data.data;
let queryStatusCode = data?.status ?? null;
const queryStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status;
switch (true) {
// Note: Need to move away from statusText -> statusCode
case queryStatus === 'Bad Request' ||
queryStatus === 'Not Found' ||
queryStatus === 'Unprocessable Entity' ||
@ -656,9 +835,7 @@ export const createQueryPanelSlice = (set, get) => ({
}
onEvent('onDataQueryFailure', queryEvents);
if (!calledFromQuery) setPreviewData(errorData);
break;
}
case queryStatus === 'needs_oauth': {
@ -721,11 +898,19 @@ export const createQueryPanelSlice = (set, get) => ({
});
},
executeRunPycode: async (code, query, isPreview, mode, currentState) => {
executeRunPycode: async (code, query, isPreview, mode, currentState, _moduleId = 'canvas') => {
const {
queryPanel: { evaluatePythonCode },
} = get();
return { data: await evaluatePythonCode({ code, query, isPreview, mode, currentState }) };
return {
data: await evaluatePythonCode({
code,
query,
isPreview,
mode,
currentState,
}),
};
},
evaluatePythonCode: async (options, moduleId = 'canvas') => {
@ -919,7 +1104,13 @@ export const createQueryPanelSlice = (set, get) => ({
const {
queryPanel: { evaluatePythonCode },
} = get();
return await evaluatePythonCode({ queryResult, code, query, mode, currentState });
return await evaluatePythonCode({
queryResult,
code,
query,
mode,
currentState,
});
},
updateQuerySuggestions: (oldName, newName) => {
@ -940,7 +1131,7 @@ export const createQueryPanelSlice = (set, get) => ({
delete updatedQueries[oldName];
const oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`);
const _oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`);
// useResolveStore.getState().actions.removeAppSuggestions(oldSuggestions);
// useCurrentStateStore.getState().actions.setCurrentState({
@ -961,6 +1152,18 @@ export const createQueryPanelSlice = (set, get) => ({
return { data: undefined, status: 'failed' };
}
},
triggerWorkflow: async (moduleId, workflowAppId, _blocking = false, params = {}, appEnvId) => {
const { getAllExposedValues } = get();
const currentState = getAllExposedValues();
const resolvedParams = get().resolveReferences(moduleId, params, currentState, {}, {});
try {
const executionResponse = await workflowExecutionsService.trigger(workflowAppId, resolvedParams, appEnvId);
return { data: executionResponse.result, status: 'ok' };
} catch (e) {
return { data: e?.message, status: 'failed' };
}
},
createProxy: (obj, path = '') => {
const { queryPanel } = get();

View file

@ -0,0 +1,132 @@
// AsyncQueryHandler manages long-running operations via server-sent events (SSE).
export class AsyncQueryHandler {
/**
* Creates a new AsyncQueryHandler
* @param {Object} options - Configuration options
* @param {Function} options.streamSSE - Function that returns an EventSource for SSE status updates
* @param {Function} options.extractJobId - Function to extract job ID from response
* @param {Function} options.classifyEventStatus - Function to classify SSE events into status categories
* @param {Object} options.callbacks - Event callbacks
* @param {Function} options.callbacks.onProgress - Progress update handler
* @param {Function} options.callbacks.onComplete - Completion handler
* @param {Function} options.callbacks.onError - Error handler
* @param {Function} options.callbacks.onClose - Close handler
*/
constructor(options = {}) {
this.config = {
streamSSE: () => {},
extractJobId: (response) => response.data?.id,
// Default implementation that doesn't make assumptions about specific status/type fields
classifyEventStatus: (data) => {
return {
// Default to treating all messages as progress updates
status: 'PROGRESS',
result: data.result || data,
// Return data for callback handlers
data,
};
},
callbacks: {
onProgress: () => {},
onComplete: () => {},
onError: () => {},
onClose: () => {},
},
...options,
};
this.eventSource = null;
this.jobId = null;
}
/**
* Processes the initial query response and starts SSE monitoring
* @param {Object} response - The initial query response
* @returns {{ jobId: string, cancel: Function }} Status object with jobId and control methods
*/
processInitialResponse(response) {
const jobId = this.config.extractJobId(response);
if (!jobId) throw new Error('Could not extract job ID for async query');
this.jobId = jobId;
this.eventSource = this.startSSE(jobId);
return { jobId, cancel: () => this.cancel() };
}
/**
* Opens an SSE connection to receive real-time updates for the given job.
* @private
* @param {string} jobId - Identifier for the async job
* @returns {EventSource} SSE event source for updates
*/
startSSE(jobId) {
const eventSource = this.config.streamSSE(jobId);
eventSource.onmessage = (event) => this.handleMessage(event, eventSource);
eventSource.onerror = (error) => this.handleError(error, eventSource);
return eventSource;
}
/**
* Processes incoming SSE messages and delegates to the appropriate callback.
* @private
* @param {MessageEvent} event - Incoming SSE message
* @param {EventSource} eventSource - EventSource instance for the SSE connection
*/
handleMessage(event, eventSource) {
try {
const payload = JSON.parse(event.data);
const { status, result, data } = this.config.classifyEventStatus(payload);
switch (status) {
case 'PROGRESS':
this.config.callbacks.onProgress(data);
break;
case 'COMPLETE':
eventSource.close();
this.config.callbacks.onComplete(result);
break;
case 'ERROR':
eventSource.close();
this.config.callbacks.onError(data);
break;
case 'CLOSE':
eventSource.close();
this.config.callbacks.onClose(data);
break;
default:
this.config.callbacks.onProgress(data);
}
} catch (err) {
console.error('Error parsing SSE message:', err);
eventSource.close();
this.config.callbacks.onError({ message: 'Invalid server message', error: err });
}
}
/**
* Handles SSE connection errors and notifies onError if closed.
* @private
* @param {any} error - Error event or object
* @param {EventSource} eventSource - EventSource instance for the SSE connection
*/
handleError(error, eventSource) {
if (eventSource.readyState === EventSource.CLOSED) {
this.config.callbacks.onError({ message: 'SSE connection closed', error });
}
}
/**
* Cancels the ongoing async operation and cleans up resources.
*/
cancel() {
if (this.eventSource) {
this.eventSource.close();
}
// Notify backend to cancel the job if jobId exists
// if (this.jobId) {
// fetch(`${this.config.endpoint}/${this.jobId}/cancel`, { method: 'POST' }).catch((e) =>
// console.error('Failed to cancel async job', e)
// );
// }
}
}

View file

@ -10,6 +10,8 @@ export const workflowExecutionsService = {
all,
enableWebhook,
previewQueryNode,
trigger,
streamSSE,
};
function previewQueryNode(queryId, appVersionId, nodeId) {
@ -70,3 +72,22 @@ function enableWebhook(appId, value) {
const requestOptions = { method: 'PATCH', headers: authHeader(), body: JSON.stringify(body), credentials: 'include' };
return fetch(`${config.apiUrl}/v2/webhooks/workflows/${appId}`, requestOptions).then(handleResponse);
}
function trigger(workflowAppId, params, environmentId) {
const currentSession = authenticationService.currentSessionValue;
const body = {
appId: workflowAppId,
userId: currentSession.current_user?.id,
executeUsing: 'app',
params: Object.fromEntries(params.map((param) => [param.key, param.value])),
environmentId,
};
const requestOptions = { method: 'POST', headers: authHeader(), body: JSON.stringify(body), credentials: 'include' };
return fetch(`${config.apiUrl}/workflow_executions/${workflowAppId}/trigger`, requestOptions).then(handleResponse);
}
function streamSSE(workflowExecutionId) {
return new EventSource(`${config.apiUrl}/workflow_executions/${workflowExecutionId}/stream`, {
withCredentials: true,
});
}

@ -1 +1 @@
Subproject commit 71e393543ded7c10d1488455b73e3259d1b4d7b6
Subproject commit 61ab78568c7843d0fbee66768db04c7f9c482945

View file

@ -49,8 +49,12 @@ async function handleLicensingInit(app: NestExpressApplication) {
const License = LicenseModule.default;
licenseUtilService.validateHostnameSubpath(License.Instance()?.domains);
const { isValid, licenseType, terms } = License.Instance();
console.log(
`License valid : ${License.Instance().isValid} License Terms : ${JSON.stringify(License.Instance().terms)} 🚀`
`\n📑 License Information:
- Valid: ${isValid}
- Type: ${licenseType}
- Terms:\n${JSON.stringify(terms, null, 2)}\n`
);
}
function replaceSubpathPlaceHoldersInStaticAssets() {

View file

@ -0,0 +1,12 @@
import { Injectable, ExecutionContext } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
@Injectable()
export class WorkflowSseAuthGuard extends AuthGuard('jwt') {
canActivate(context: ExecutionContext) {
const request = context.switchToHttp().getRequest();
request.isUserNotMandatory = true;
return request;
}
}

View file

@ -84,7 +84,7 @@ export const ENTERPRISE_PLAN_TERMS = {
export const WORKFLOW_TEAM_PLAN_TERMS: Partial<Terms> = {
workflows: {
execution_timeout: 60,
execution_timeout: 1800,
instance: {
total: LICENSE_LIMIT.UNLIMITED,
daily_executions: LICENSE_LIMIT.UNLIMITED,

View file

@ -40,38 +40,38 @@ export class WebhookGuard implements CanActivate {
// Workspace Level -
if (workflowsLimit.workspace) {
// Daily Limit
if (
workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED &&
(
await this.manager.query(
`SELECT COUNT(*)
const workspaceDailyExecutionsQuery = `
SELECT COUNT(*)
FROM apps a
INNER JOIN app_versions av on av.app_id = a.id
INNER JOIN workflow_executions we on we.app_version_id = av.id
WHERE a.organization_id = $1
AND DATE(we.created_at) = current_date`,
[workflowApp.organizationId]
)
)[0].count >= workflowsLimit.workspace.daily_executions
AND DATE(we.created_at) = current_date
`;
if (
workflowsLimit.workspace.daily_executions !== LICENSE_LIMIT.UNLIMITED &&
(await this.manager.query(workspaceDailyExecutionsQuery, [workflowApp.organizationId]))[0].count >=
workflowsLimit.workspace.daily_executions
) {
throw new HttpException('Maximum daily limit for workflow execution has reached for this workspace', 451);
}
// Monthly Limit
if (
workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED &&
(
await this.manager.query(
`SELECT COUNT(*)
const workspaceMonthlyExecutionsQuery = `
SELECT COUNT(*)
FROM apps a
INNER JOIN app_versions av on av.app_id = a.id
INNER JOIN workflow_executions we on we.app_version_id = av.id
WHERE a.organization_id = $1
AND extract (year from we.created_at) = extract (year from current_date)
AND extract (month from we.created_at) = extract (month from current_date)`,
[workflowApp.organizationId]
)
)[0].count >= workflowsLimit.workspace.monthly_executions
AND extract (month from we.created_at) = extract (month from current_date)
`;
if (
workflowsLimit.workspace.monthly_executions !== LICENSE_LIMIT.UNLIMITED &&
(await this.manager.query(workspaceMonthlyExecutionsQuery, [workflowApp.organizationId]))[0].count >=
workflowsLimit.workspace.monthly_executions
) {
throw new HttpException('Maximum monthly limit for workflow execution has reached for this workspace', 451);
}
@ -80,30 +80,35 @@ export class WebhookGuard implements CanActivate {
// Instance Level -
if (workflowsLimit.instance) {
// Daily Limit
if (
workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED &&
(
await this.manager.query(`SELECT COUNT(*)
const instanceDailyExecutionsQuery = `
SELECT COUNT(*)
FROM apps a
INNER JOIN app_versions av on av.app_id = a.id
INNER JOIN workflow_executions we on we.app_version_id = av.id
WHERE DATE(we.created_at) = current_date`)
)[0].count >= workflowsLimit.instance.daily_executions
WHERE DATE(we.created_at) = current_date
`;
if (
workflowsLimit.instance.daily_executions !== LICENSE_LIMIT.UNLIMITED &&
(await this.manager.query(instanceDailyExecutionsQuery))[0].count >= workflowsLimit.instance.daily_executions
) {
throw new HttpException('Maximum daily limit for workflow execution has been reached', 451);
}
// Monthly Limit
if (
workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED &&
(
await this.manager.query(`SELECT COUNT(*)
const instanceMonthlyExecutionsQuery = `
SELECT COUNT(*)
FROM apps a
INNER JOIN app_versions av on av.app_id = a.id
INNER JOIN workflow_executions we on we.app_version_id = av.id
WHERE extract (year from we.created_at) = extract (year from current_date)
AND extract (month from we.created_at) = extract (month from current_date)`)
)[0].count >= workflowsLimit.instance.monthly_executions
AND extract (month from we.created_at) = extract (month from current_date)
`;
if (
workflowsLimit.instance.monthly_executions !== LICENSE_LIMIT.UNLIMITED &&
(await this.manager.query(instanceMonthlyExecutionsQuery))[0].count >=
workflowsLimit.instance.monthly_executions
) {
throw new HttpException('Maximum monthly limit for workflow execution has been reached', 451);
}

View file

@ -1,4 +1,4 @@
import { Body, Controller, Get, Param, Post, Res } from '@nestjs/common';
import { Body, Controller, Get, Param, Post, Res, Sse } from '@nestjs/common';
import { Response } from 'express';
import { IWorkflowExecutionController } from '../interfaces/IWorkflowExecutionController';
import { CreateWorkflowExecutionDto } from '@dto/create-workflow-execution.dto';
@ -9,6 +9,7 @@ import { InitModule } from '@modules/app/decorators/init-module';
import { MODULES } from '@modules/app/constants/modules';
import { InitFeature } from '@modules/app/decorators/init-feature.decorator';
import { FEATURE_KEY } from '@modules/workflows/constants';
import { Observable } from 'rxjs';
@InitModule(MODULES.WORKFLOWS)
@Controller('workflow_executions')
@ -52,4 +53,21 @@ export class WorkflowExecutionsController implements IWorkflowExecutionControlle
): Promise<{ result: any }> {
throw new Error('Method not implemented.');
}
@InitFeature(FEATURE_KEY.EXECUTE_WORKFLOW)
@Post(':id/trigger')
async trigger(
@Param('id') id: string,
@Body() createWorkflowExecutionDto: CreateWorkflowExecutionDto,
@User() user,
@Res({ passthrough: true }) response: Response
): Promise<{ result: any }> {
throw new Error('Method not implemented.');
}
@InitFeature(FEATURE_KEY.WORKFLOW_EXECUTION_STATUS)
@Sse(':id/stream')
streamWorkflowExecution(@Param('id') id: string): Observable<MessageEvent> {
throw new Error('Method not implemented.');
}
}

View file

@ -1,4 +1,4 @@
import { Controller, Post, Param, Body, Patch, Query, Res } from '@nestjs/common';
import { Controller, Post, Param, Body, Patch, Query, Res, Get, Sse } from '@nestjs/common';
import { Response } from 'express';
import { IWorkflowWebhooksController } from '../interfaces/IWorkflowWebhooksController';
import { InitModule } from '@modules/app/decorators/init-module';
@ -25,6 +25,29 @@ export class WorkflowWebhooksController implements IWorkflowWebhooksController {
throw new Error('Method not implemented.');
}
@InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW)
@Post('workflows/:idOrName/trigger-async')
async triggerWorkflowAsync(
@Param('app') app: any,
@Param('idOrName') idOrName: string,
@Body() workflowParams: Record<string, unknown>,
@Query('environment') environment: string
): Promise<any> {
throw new Error('Method not implemented.');
}
@InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW)
@Get('workflows/:idOrName/status/:executionId')
async getExecutionStatus(@Param('executionId') executionId: string): Promise<any> {
throw new Error('Method not implemented.');
}
@InitFeature(FEATURE_KEY.WEBHOOK_TRIGGER_WORKFLOW)
@Sse('workflows/:idOrName/execution/:executionId/stream')
async triggerWorkflowStream(@Param('executionId') executionId: string): Promise<any> {
throw new Error('Method not implemented.');
}
@InitFeature(FEATURE_KEY.UPDATE_WORKFLOW_WEBHOOK_DETAILS)
@Patch('workflows/:id')
async updateWorkflow(@Param('id') id, @Body() workflowValuesToUpdate): Promise<any> {

View file

@ -0,0 +1,37 @@
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { WorkflowExecutionsService } from '../services/workflow-executions.service';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Logger } from 'nestjs-pino';
import { CreateWorkflowExecutionDto } from '@dto/create-workflow-execution.dto';
import { WorkflowExecution } from '@entities/workflow_execution.entity';
import { Response } from 'express';
export const WORKFLOW_EXECUTION_STATUS = {
TRIGGERED: 'workflow_execution_triggered',
RUNNING: 'workflow_execution_running',
COMPLETED: 'workflow_execution_completed',
ERROR: 'workflow_execution_error',
};
@Injectable()
export class WorkflowTriggersListener {
constructor(
protected workflowExecutionsService: WorkflowExecutionsService,
protected readonly logger: Logger,
protected readonly eventEmitter: EventEmitter2
) {}
@OnEvent('triggerWorkflow')
async handleTriggerWorkflow({
createWorkflowExecutionDto,
workflowExecution,
response,
}: {
createWorkflowExecutionDto: CreateWorkflowExecutionDto;
workflowExecution: WorkflowExecution;
response: Response;
}): Promise<void> {
throw new Error('Not implemented.');
}
}

View file

@ -55,7 +55,8 @@ export class WorkflowsModule {
const { PageHelperService } = await import(`${importPath}/apps/services/page.util.service`);
const { WorkflowSchedulesService } = await import(`${importPath}/workflows/services/workflow-schedules.service`);
const { TemporalService } = await import(`${importPath}/workflows/services/temporal.service`);
const { WorkflowWebhooksListener } = await import(`${importPath}/workflows/listeners/workflow-webhooks.listener`);
const { WorkflowStreamService } = await import(`${importPath}/workflows/services/workflow-stream.service`);
const { WorkflowTriggersListener } = await import(`${importPath}/workflows/listeners/workflow-triggers.listener`);
const { FeatureAbilityFactory } = await import(`${importPath}/workflows/ability/app`);
return {
@ -110,7 +111,8 @@ export class WorkflowsModule {
PageService,
EventsService,
WorkflowExecutionsService,
WorkflowWebhooksListener,
WorkflowStreamService,
WorkflowTriggersListener,
WorkflowWebhooksService,
OrganizationConstantsService,
ComponentsService,

View file

@ -40,4 +40,8 @@ export class WorkflowExecutionsService implements IWorkflowExecutionsService {
): Promise<any> {
throw new Error('Method not implemented.');
}
async findOne(id: string, relations?: string[]): Promise<WorkflowExecution> {
throw new Error('Method not implemented.');
}
}

View file

@ -0,0 +1,31 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Observable } from 'rxjs';
import { OnEvent } from '@nestjs/event-emitter';
export const WORKFLOW_CONNECTION_TYPES = {
INITIALIZED: 'workflow_connection_initialized',
STREAMING: 'workflow_connection_streaming',
ERROR: 'workflow_connection_error',
CLOSE: 'workflow_connection_close',
};
// Base WorkflowStreamService class for CE
// This provides the interface but throws "Not implemented" errors for all methods
// EE version will extend this class and provide actual implementations
@Injectable()
export class WorkflowStreamService implements OnModuleInit {
constructor() {}
onModuleInit() {
// CE version - no implementation needed
}
@OnEvent('workflow.status')
handleWorkflowStatus({ executionId, status }: { executionId: string; status: any }) {
throw new Error('Method not implemented.');
}
getStream(executionId: string): Observable<MessageEvent> {
throw new Error('Method not implemented.');
}
}