+
+
URL
@@ -371,7 +376,7 @@ class Restapi extends React.Component {
-
diff --git a/frontend/src/AppBuilder/QueryManager/QueryEditors/Restapi/styles.css b/frontend/src/AppBuilder/QueryManager/QueryEditors/Restapi/styles.css
new file mode 100644
index 0000000000..39ab33a758
--- /dev/null
+++ b/frontend/src/AppBuilder/QueryManager/QueryEditors/Restapi/styles.css
@@ -0,0 +1,45 @@
+/* Specific styling for workflow modal */
+.workflow-rest-api {
+ display: flex;
+ flex-direction: column;
+}
+
+/* Ensure method and URL fields have full width in workflow node */
+.workflow-rest-api .me-2 {
+ width: 100% !important;
+ margin-bottom: 16px; /* Increased spacing to avoid label overlap */
+}
+
+/* Ensure URL label doesn't overlap with Method dropdown */
+.workflow-rest-api .field .font-weight-medium {
+ margin-bottom: 4px;
+ display: block;
+ padding-top: 4px; /* Add space above URL label */
+}
+
+/* Fix the method dropdown width and height for workflow */
+.workflow-rest-api .me-2 {
+ width: 150px !important; /* Wider to accommodate "DELETE" and other long options */
+ height: auto !important;
+ min-height: 32px;
+}
+
+/* Fix Add more button to fit text properly */
+.add-params-btn {
+ width: 100px !important;
+ padding: 4px 8px;
+}
+
+.add-params-btn p {
+ display: flex;
+ align-items: center;
+ white-space: nowrap;
+ overflow: hidden;
+ text-overflow: ellipsis;
+}
+
+/* Button fix for workflow */
+.workflow-rest-api ~ .query-pane-restapi-tabs .add-params-btn {
+ width: auto !important;
+ min-width: 100px;
+}
diff --git a/frontend/src/AppBuilder/QueryManager/QueryEditors/Workflows.jsx b/frontend/src/AppBuilder/QueryManager/QueryEditors/Workflows.jsx
index 60210d4d07..7e49cd3c27 100644
--- a/frontend/src/AppBuilder/QueryManager/QueryEditors/Workflows.jsx
+++ b/frontend/src/AppBuilder/QueryManager/QueryEditors/Workflows.jsx
@@ -7,6 +7,7 @@ import { v4 as uuidv4 } from 'uuid';
import useStore from '@/AppBuilder/_stores/store';
import { useModuleContext } from '@/AppBuilder/_contexts/ModuleContext';
import usePopoverObserver from '@/AppBuilder/_hooks/usePopoverObserver';
+import useWorkflowStore from '@/_stores/workflowStore';
export function Workflows({ options, optionsChanged, currentState }) {
const { moduleId } = useModuleContext();
@@ -15,7 +16,9 @@ export function Workflows({ options, optionsChanged, currentState }) {
const [_selectedWorkflowId, setSelectedWorkflowId] = useState(undefined);
const [params, setParams] = useState([...(options.params ?? [{ key: '', value: '' }])]);
- const appId = useStore((state) => state.appStore.modules[moduleId].app.appId);
+ const workflowIdFromStore = useWorkflowStore((state) => state.workflowId);
+ const appIdFromStore = useStore((state) => state.appStore.modules[moduleId].app.appId);
+ const appId = workflowIdFromStore || appIdFromStore;
usePopoverObserver(
document.getElementsByClassName('query-details')[0],
diff --git a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js
index 903b548ca2..84dbd4b1a1 100644
--- a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js
+++ b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js
@@ -1,3 +1,5 @@
+import { toast } from 'react-hot-toast';
+import { AsyncQueryHandler } from '@/AppBuilder/_utils/async-query-handler';
import _, { isEmpty } from 'lodash';
import { resolveReferences, loadPyodide, hasCircularDependency } from '@/_helpers/utils';
import { fetchOAuthToken, fetchOauthTokenForSlackAndGSheet } from '@/AppBuilder/_utils/auth';
@@ -7,7 +9,7 @@ import axios from 'axios';
import { validateMultilineCode } from '@/_helpers/utility';
import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel';
import { deepClone } from '@/_helpers/utilities/utils.helpers';
-import toast from 'react-hot-toast';
+
const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {};
const initialState = {
@@ -168,6 +170,19 @@ export const createQueryPanelSlice = (set, get) => ({
'setLoadingDataQueries'
),
+ setAsyncQueryRuns: (updater) =>
+ set(
+ (state) => {
+ if (typeof updater === 'function') {
+ state.queryPanel.asyncQueryRuns = updater(state.queryPanel.asyncQueryRuns);
+ } else {
+ state.queryPanel.asyncQueryRuns = updater;
+ }
+ },
+ false,
+ 'setAsyncQueryRuns'
+ ),
+
onQueryConfirmOrCancel: (queryConfirmationData, isConfirm = false, mode = 'edit', moduleId = 'canvas') => {
const { queryPanel, dataQuery, setResolvedQuery } = get();
const { runQuery } = queryPanel;
@@ -208,6 +223,69 @@ export const createQueryPanelSlice = (set, get) => ({
);
},
+ createWorkflowAsyncHandler: ({
+ executionId,
+ queryId,
+ processQueryResults,
+ handleFailure,
+ shouldSetPreviewData,
+ setPreviewData,
+ setResolvedQuery,
+ }) => {
+ const asyncHandler = new AsyncQueryHandler({
+ streamSSE: (jobId) => {
+ return workflowExecutionsService.streamSSE(jobId);
+ },
+ extractJobId: () => executionId,
+ classifyEventStatus: (eventData) => {
+ // hardcoded for workflows
+ if (eventData.type === 'workflow_connection_close') {
+ return { status: 'CLOSE', data: eventData };
+ } else if (eventData.type === 'workflow_execution_completed') {
+ return { status: 'COMPLETE', result: eventData.result, data: eventData };
+ } else if (eventData.type === 'workflow_execution_error') {
+ return { status: 'ERROR', data: eventData };
+ } else {
+ return { status: 'PROGRESS', data: eventData };
+ }
+ },
+ callbacks: {
+ onProgress: (progressData) => {
+ // Update UI with progress information
+ if (shouldSetPreviewData) {
+ setPreviewData({ ...progressData });
+ }
+ setResolvedQuery(queryId, {
+ isLoading: true,
+ progress: progressData.progress,
+ currentData: progressData.partialData || [],
+ });
+ },
+ onComplete: async (result) => {
+ await processQueryResults(result);
+ // Remove the AsyncQueryHandler instance from asyncQueryRuns on completion
+ get().queryPanel.setAsyncQueryRuns((currentRuns) =>
+ currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId)
+ );
+ },
+ onError: (e) => {
+ handleFailure({
+ status: 'failed',
+ message: e?.error?.message || 'Error running workflow',
+ description: e?.error?.description || null,
+ data: typeof e?.error === 'object' ? { ...e.error } : e?.error,
+ });
+ // Remove the AsyncQueryHandler instance from asyncQueryRuns on error
+ get().queryPanel.setAsyncQueryRuns((currentRuns) =>
+ currentRuns.filter((handler) => handler.jobId !== asyncHandler.jobId)
+ );
+ },
+ },
+ });
+
+ return asyncHandler;
+ },
+
runQuery: (
queryId,
queryName,
@@ -238,7 +316,7 @@ export const createQueryPanelSlice = (set, get) => ({
setPreviewPanelExpanded,
executeRunPycode,
runTransformation,
- executeWorkflow,
+ triggerWorkflow,
executeMultilineJS,
} = queryPanel;
const queryUpdatePromise = dataQuerySlice.queryUpdates[queryId];
@@ -339,6 +417,120 @@ export const createQueryPanelSlice = (set, get) => ({
}
}
+ // Handler for transformation and completion of query results
+ const processQueryResults = async (data, rawData = null) => {
+ let finalData = data;
+ rawData = rawData || data;
+
+ if (dataQuery.options.enableTransformation) {
+ finalData = await runTransformation(
+ finalData,
+ query.options.transformation,
+ query.options.transformationLanguage,
+ query,
+ mode,
+ moduleId
+ );
+
+ if (finalData.status === 'failed') {
+ handleFailure(finalData);
+ return finalData;
+ }
+ }
+
+ if (shouldSetPreviewData) {
+ setPreviewLoading(false);
+ setPreviewData(finalData);
+ }
+
+ if (dataQuery.options.showSuccessNotification) {
+ const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000;
+ toast.success(dataQuery.options.successMessage, {
+ duration: notificationDuration,
+ });
+ }
+
+ get().debugger.log({
+ logLevel: 'success',
+ type: 'query',
+ kind: query.kind,
+ key: query.name,
+ message: 'Query executed successfully',
+ isQuerySuccessLog: true,
+ errorTarget: 'Queries',
+ });
+
+ setResolvedQuery(
+ queryId,
+ {
+ isLoading: false,
+ data: finalData,
+ rawData,
+ metadata: data?.metadata,
+ request: data?.metadata?.request,
+ response: data?.metadata?.response,
+ },
+ moduleId
+ );
+
+ onEvent('onDataQuerySuccess', queryEvents, mode);
+ return { status: 'ok', data: finalData };
+ };
+
+ // Handler for query failures
+ const handleFailure = (errorData) => {
+ if (shouldSetPreviewData) {
+ setPreviewLoading(false);
+ setPreviewData(errorData);
+ }
+
+ get().debugger.log({
+ logLevel: 'error',
+ type: 'query',
+ kind: query.kind,
+ key: query.name,
+ message: errorData?.description,
+ errorTarget: 'Queries',
+ error:
+ query.kind === 'restapi'
+ ? {
+ substitutedVariables: options,
+ request: errorData?.requestObject,
+ response: errorData?.responseObject,
+ }
+ : errorData,
+ isQuerySuccessLog: false,
+ });
+
+ setResolvedQuery(
+ queryId,
+ {
+ isLoading: false,
+ ...(query.kind === 'restapi' || errorData?.type === 'tj-401'
+ ? {
+ metadata: errorData?.metadata,
+ request: errorData?.requestObject,
+ response: errorData?.responseObject,
+ responseHeaders: errorData?.responseHeaders,
+ }
+ : {}),
+ },
+ moduleId
+ );
+
+ setResolvedQuery(
+ queryId,
+ {
+ isLoading: false,
+ error: errorData,
+ },
+ moduleId
+ );
+
+ onEvent('onDataQueryFailure', queryEvents);
+ return errorData;
+ };
+
// eslint-disable-next-line no-unused-vars
return new Promise(function (resolve, reject) {
if (shouldSetPreviewData) {
@@ -363,9 +555,8 @@ export const createQueryPanelSlice = (set, get) => ({
} else if (query.kind === 'runpy') {
queryExecutionPromise = executeRunPycode(query.options?.code, query, false, mode, queryState, moduleId);
} else if (query.kind === 'workflows') {
- queryExecutionPromise = executeWorkflow(
+ queryExecutionPromise = triggerWorkflow(
moduleId,
- query,
query.options?.workflowId,
query.options?.blocking,
query.options?.params,
@@ -395,6 +586,38 @@ export const createQueryPanelSlice = (set, get) => ({
fetchOAuthToken(url, dataQuery['data_source_id'] || dataQuery['dataSourceId']);
}
+ // Asynchronous query execution
+ // Currently async query resolution is applicable only to workflows
+ // Change this conditional to async query type check for other
+ // async queries in the future
+ if (query.kind === 'workflows') {
+ const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({
+ data,
+ queryId,
+ processQueryResults,
+ handleFailure,
+ shouldSetPreviewData,
+ setPreviewData,
+ setResolvedQuery,
+ });
+
+ if (error) {
+ resolve({ status: 'failed', message: error });
+ return;
+ }
+
+ if (!error && completionPromise) {
+ // This early resolution pattern is temporary - once the UI fully supports
+ // tracking individual async queries through their lifecycle, we can refactor
+ // this to rely on the completion promise concurrently
+ const result = await completionPromise;
+ resolve(result);
+ }
+ return;
+ }
+
+ // Handle synchronous queries (original code)
+
let queryStatusCode = data?.status ?? null;
const promiseStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status;
// Note: Need to move away from statusText -> statusCode
@@ -429,120 +652,22 @@ export const createQueryPanelSlice = (set, get) => ({
errorData = data;
break;
}
- if (shouldSetPreviewData) {
- setPreviewLoading(false);
- setPreviewData(errorData);
- }
+
errorData = query.kind === 'runpy' || query.kind === 'runjs' ? data?.data : data;
- get().debugger.log({
- logLevel: 'error',
- type: 'query',
- kind: query.kind,
- key: query.name,
- message: errorData?.description,
- errorTarget: 'Queries',
- error:
- query.kind === 'restapi'
- ? {
- substitutedVariables: options,
- request: data?.data?.requestObject,
- response: data?.data?.responseObject,
- }
- : errorData,
- isQuerySuccessLog: false,
- });
-
- setResolvedQuery(
- queryId,
- {
- isLoading: false,
- ...(query.kind === 'restapi' || data.data.type === 'tj-401'
- ? {
- metadata: data.metadata,
- request: data.data.requestObject,
- response: data.data.responseObject,
- responseHeaders: data.data.responseHeaders,
- }
- : {}),
- },
- moduleId
- );
-
- resolve(data);
- onEvent('onDataQueryFailure', queryEvents);
+ const result = handleFailure(errorData);
+ resolve(result);
return;
} else {
- let rawData = data.data;
- let finalData = data.data;
- if (dataQuery.options.enableTransformation) {
- finalData = await runTransformation(
- finalData,
- query.options.transformation,
- query.options.transformationLanguage,
- query,
- 'edit',
- moduleId
- );
- if (finalData.status === 'failed') {
- setResolvedQuery(
- queryId,
- {
- isLoading: false,
- },
- moduleId
- );
-
- resolve(finalData);
- onEvent('onDataQueryFailure', queryEvents);
- setPreviewLoading(false);
- if (shouldSetPreviewData) setPreviewData(finalData);
- return;
- }
- }
-
- if (shouldSetPreviewData) {
- setPreviewLoading(false);
- setPreviewData(finalData);
- }
-
- if (dataQuery.options.showSuccessNotification) {
- const notificationDuration = dataQuery.options.notificationDuration * 1000 || 5000;
- toast.success(dataQuery.options.successMessage, {
- duration: notificationDuration,
- });
- }
-
- get().debugger.log({
- logLevel: 'success',
- type: 'query',
- kind: query.kind,
- key: query.name,
- message: 'Query executed successfully',
- isQuerySuccessLog: true,
- errorTarget: 'Queries',
- });
-
- setResolvedQuery(
- queryId,
- {
- isLoading: false,
- data: finalData,
- rawData,
- metadata: data?.metadata,
- request: data?.metadata?.request,
- response: data?.metadata?.response,
- },
- moduleId
- );
-
- resolve({ status: 'ok', data: finalData });
- onEvent('onDataQuerySuccess', queryEvents, mode);
+ const rawData = data.data;
+ const result = await processQueryResults(data.data, rawData);
+ resolve(result);
}
})
.catch((e) => {
const { error } = e;
- if (mode !== 'view') toast.error(error ?? 'Unknown error');
- resolve({ status: 'failed', message: error });
+ const errorMessage = typeof error === 'string' ? error : error?.message || 'Unknown error';
+ if (mode !== 'view') toast.error(errorMessage);
+ resolve({ status: 'failed', message: errorMessage });
});
});
},
@@ -556,7 +681,7 @@ export const createQueryPanelSlice = (set, get) => ({
setPreviewPanelExpanded,
executeRunPycode,
runTransformation,
- executeWorkflow,
+ triggerWorkflow,
executeMultilineJS,
setIsPreviewQueryLoading,
} = queryPanel;
@@ -616,7 +741,7 @@ export const createQueryPanelSlice = (set, get) => ({
} else if (query.kind === 'runpy') {
queryExecutionPromise = executeRunPycode(query.options.code, query, true, 'edit', queryState);
} else if (query.kind === 'workflows') {
- queryExecutionPromise = executeWorkflow(
+ queryExecutionPromise = triggerWorkflow(
moduleId,
query.options.workflowId,
query.options.blocking,
@@ -629,11 +754,73 @@ export const createQueryPanelSlice = (set, get) => ({
queryExecutionPromise
.then(async (data) => {
+ // Asynchronous query execution
+ // Currently async query resolution is applicable only to workflows
+ // Change this conditional to async query type check for other
+ // async queries in the future
+ if (query.kind === 'workflows') {
+ const processQueryResultsPreview = async (result) => {
+ let finalData = result;
+ if (query.options.enableTransformation) {
+ finalData = await runTransformation(
+ finalData,
+ query.options.transformation,
+ query.options.transformationLanguage,
+ query,
+ 'edit',
+ moduleId
+ );
+ if (finalData.status === 'failed') {
+ setPreviewLoading(false);
+ setIsPreviewQueryLoading(false);
+ if (!calledFromQuery) setPreviewData(finalData);
+ return { status: 'failed', data: finalData };
+ }
+ }
+ setPreviewLoading(false);
+ setIsPreviewQueryLoading(false);
+ if (!calledFromQuery) setPreviewData(finalData);
+ return { status: 'ok', data: finalData };
+ };
+ const handleFailurePreview = (errorData) => {
+ setPreviewLoading(false);
+ setIsPreviewQueryLoading(false);
+ if (!calledFromQuery) setPreviewData(errorData);
+ return { status: 'failed', data: errorData };
+ };
+
+ const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({
+ data,
+ queryId: query.id,
+ processQueryResults: processQueryResultsPreview,
+ handleFailure: handleFailurePreview,
+ shouldSetPreviewData: true,
+ setPreviewData,
+ setResolvedQuery: () => {}, // No resolvedQuery for preview
+ resolve,
+ });
+
+ if (!error && completionPromise) {
+ try {
+ // This early resolution pattern is temporary - once the UI fully supports
+ // tracking individual async queries through their lifecycle, we can refactor
+ // this to rely on the completion promise concurrently
+ const result = await completionPromise;
+ resolve(result);
+ } catch (error) {
+ toast.error('Async operation failed:', error);
+ setPreviewLoading(false);
+ setIsPreviewQueryLoading(false);
+ resolve({ status: 'failed', message: error?.message || 'Unknown error' });
+ }
+ }
+ return;
+ }
+
let finalData = data.data;
let queryStatusCode = data?.status ?? null;
const queryStatus = query.kind === 'runpy' ? data?.data?.status ?? 'ok' : data.status;
switch (true) {
- // Note: Need to move away from statusText -> statusCode
case queryStatus === 'Bad Request' ||
queryStatus === 'Not Found' ||
queryStatus === 'Unprocessable Entity' ||
@@ -665,9 +852,7 @@ export const createQueryPanelSlice = (set, get) => ({
}
onEvent('onDataQueryFailure', queryEvents);
-
if (!calledFromQuery) setPreviewData(errorData);
-
break;
}
case queryStatus === 'needs_oauth': {
@@ -730,7 +915,7 @@ export const createQueryPanelSlice = (set, get) => ({
});
},
- executeRunPycode: async (code, query, isPreview, mode, currentState) => {
+ executeRunPycode: async (code, query, isPreview, mode, currentState, _moduleId = 'canvas') => {
const {
queryPanel: { evaluatePythonCode },
} = get();
@@ -950,7 +1135,13 @@ export const createQueryPanelSlice = (set, get) => ({
const {
queryPanel: { evaluatePythonCode },
} = get();
- return await evaluatePythonCode({ queryResult, code, query, mode, currentState });
+ return await evaluatePythonCode({
+ queryResult,
+ code,
+ query,
+ mode,
+ currentState,
+ });
},
updateQuerySuggestions: (oldName, newName) => {
@@ -971,7 +1162,7 @@ export const createQueryPanelSlice = (set, get) => ({
delete updatedQueries[oldName];
- const oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`);
+ const _oldSuggestions = Object.keys(queries[oldName]).map((key) => `queries.${oldName}.${key}`);
// useResolveStore.getState().actions.removeAppSuggestions(oldSuggestions);
// useCurrentStateStore.getState().actions.setCurrentState({
@@ -1013,6 +1204,18 @@ export const createQueryPanelSlice = (set, get) => ({
return { data: undefined, status: 'failed' };
}
},
+ triggerWorkflow: async (moduleId, workflowAppId, _blocking = false, params = {}, appEnvId) => {
+ const { getAllExposedValues } = get();
+ const currentState = getAllExposedValues();
+ const resolvedParams = get().resolveReferences(moduleId, params, currentState, {}, {});
+
+ try {
+ const executionResponse = await workflowExecutionsService.trigger(workflowAppId, resolvedParams, appEnvId);
+ return { data: executionResponse.result, status: 'ok' };
+ } catch (e) {
+ return { data: e?.message, status: 'failed' };
+ }
+ },
createProxy: (obj, path = '') => {
const { queryPanel } = get();
@@ -1209,6 +1412,48 @@ export const createQueryPanelSlice = (set, get) => ({
isQuerySelected: (queryId) => {
return get().queryPanel.selectedQuery?.id === queryId;
},
+
+ setupAsyncWorkflowHandler: ({
+ data,
+ queryId,
+ processQueryResults,
+ handleFailure,
+ shouldSetPreviewData,
+ setPreviewData,
+ setResolvedQuery,
+ }) => {
+ try {
+ const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({
+ executionId: data.data.executionId,
+ queryId,
+ processQueryResults,
+ handleFailure,
+ shouldSetPreviewData,
+ setPreviewData,
+ setResolvedQuery,
+ });
+
+ // Process initial response and start SSE monitoring
+ const { __asyncCompletionPromise } = asyncHandler.processInitialResponse(data.data);
+
+ // Add the AsyncQueryHandler instance to asyncQueryRuns
+ get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]);
+
+ if (setResolvedQuery) {
+ setResolvedQuery(queryId, {
+ isLoading: true,
+ jobId: asyncHandler.jobId,
+ });
+ }
+
+ return {
+ handler: asyncHandler,
+ completionPromise: __asyncCompletionPromise,
+ };
+ } catch (error) {
+ return { error };
+ }
+ },
runQueryOnShortcut: () => {
const { queryPanel } = get();
const { runQuery, selectedQuery } = queryPanel;
diff --git a/frontend/src/AppBuilder/_utils/async-query-handler.js b/frontend/src/AppBuilder/_utils/async-query-handler.js
new file mode 100644
index 0000000000..ea2202797e
--- /dev/null
+++ b/frontend/src/AppBuilder/_utils/async-query-handler.js
@@ -0,0 +1,141 @@
+// AsyncQueryHandler manages long-running operations via server-sent events (SSE).
+export class AsyncQueryHandler {
+ /**
+ * Creates a new AsyncQueryHandler
+ * @param {Object} options - Configuration options
+ * @param {Function} options.streamSSE - Function that returns an EventSource for SSE status updates
+ * @param {Function} options.extractJobId - Function to extract job ID from response
+ * @param {Function} options.classifyEventStatus - Function to classify SSE events into status categories
+ * @param {Object} options.callbacks - Event callbacks
+ * @param {Function} options.callbacks.onProgress - Progress update handler
+ * @param {Function} options.callbacks.onComplete - Completion handler
+ * @param {Function} options.callbacks.onError - Error handler
+ * @param {Function} options.callbacks.onClose - Close handler
+ */
+ constructor(options = {}) {
+ this.config = {
+ streamSSE: () => {},
+ extractJobId: (response) => response.data?.id,
+ // Default implementation that doesn't make assumptions about specific status/type fields
+ classifyEventStatus: (data) => {
+ return {
+ // Default to treating all messages as progress updates
+ status: 'PROGRESS',
+ result: data.result || data,
+ // Return data for callback handlers
+ data,
+ };
+ },
+ callbacks: {
+ onProgress: () => {},
+ onComplete: () => {},
+ onError: () => {},
+ onClose: () => {},
+ },
+ ...options,
+ };
+ this.eventSource = null;
+ this.jobId = null;
+ }
+
+ /**
+ * Processes the initial query response and starts SSE monitoring
+ * @param {Object} response - The initial query response
+ * @returns {{ __jobId: string, __cancel: Function, __asyncCompletionPromise: Promise
}} Status object with jobId, control methods, and completion promise
+ */
+ processInitialResponse(response) {
+ const jobId = this.config.extractJobId(response);
+ if (!jobId) throw new Error('Could not extract job ID for async query');
+ this.jobId = jobId;
+ this.eventSource = this.startSSE(jobId);
+
+ // Return the reserved async completion promise for consumers
+ this.__asyncCompletionPromise =
+ this.__asyncCompletionPromise ||
+ new Promise((resolve, reject) => {
+ this.resolveCompletion = resolve;
+ this.rejectCompletion = reject;
+ });
+ return { __jobId: jobId, __cancel: () => this.cancel(), __asyncCompletionPromise: this.__asyncCompletionPromise };
+ }
+
+ /**
+ * Opens an SSE connection to receive real-time updates for the given job.
+ * @private
+ * @param {string} jobId - Identifier for the async job
+ * @returns {EventSource} SSE event source for updates
+ */
+ startSSE(jobId) {
+ const eventSource = this.config.streamSSE(jobId);
+ eventSource.onmessage = (event) => this.handleMessage(event, eventSource);
+ eventSource.onerror = (error) => this.handleError(error, eventSource);
+
+ return eventSource;
+ }
+
+ /**
+ * Processes incoming SSE messages and delegates to the appropriate callback.
+ * @private
+ * @param {MessageEvent} event - Incoming SSE message
+ * @param {EventSource} eventSource - EventSource instance for the SSE connection
+ */
+ handleMessage(event, eventSource) {
+ try {
+ const payload = JSON.parse(event.data);
+ const { status, result, data } = this.config.classifyEventStatus(payload);
+
+ switch (status) {
+ case 'PROGRESS':
+ this.config.callbacks.onProgress(data);
+ break;
+ case 'COMPLETE':
+ eventSource.close();
+ this.config.callbacks.onComplete(result);
+ this.resolveCompletion(result);
+ break;
+ case 'ERROR':
+ eventSource.close();
+ this.config.callbacks.onError(data);
+ this.rejectCompletion(data);
+ break;
+ case 'CLOSE':
+ eventSource.close();
+ this.config.callbacks.onClose(data);
+ break;
+ default:
+ this.config.callbacks.onProgress(data);
+ }
+ } catch (err) {
+ console.error('Error parsing SSE message:', err);
+ eventSource.close();
+ this.config.callbacks.onError({ message: 'Invalid server message', error: err });
+ }
+ }
+
+ /**
+ * Handles SSE connection errors and notifies onError if closed.
+ * @private
+ * @param {any} error - Error event or object
+ * @param {EventSource} eventSource - EventSource instance for the SSE connection
+ */
+ handleError(error, eventSource) {
+ if (eventSource.readyState === EventSource.CLOSED) {
+ this.config.callbacks.onError({ message: 'SSE connection closed', error });
+ }
+ }
+
+ /**
+ * Cancels the ongoing async operation and cleans up resources.
+ */
+ cancel() {
+ if (this.eventSource) {
+ this.eventSource.close();
+ }
+ // Notify backend to cancel the job if jobId exists
+ // if (this.jobId) {
+ // fetch(`${this.config.endpoint}/${this.jobId}/cancel`, { method: 'POST' }).catch((e) =>
+ // console.error('Failed to cancel async job', e)
+ // );
+ // }
+ }
+}
diff --git a/frontend/src/Editor/QueryManager/Components/DataSourcePicker.jsx b/frontend/src/Editor/QueryManager/Components/DataSourcePicker.jsx
index 11a9dd6d52..765152cbcc 100644
--- a/frontend/src/Editor/QueryManager/Components/DataSourcePicker.jsx
+++ b/frontend/src/Editor/QueryManager/Components/DataSourcePicker.jsx
@@ -14,6 +14,7 @@ import { useQueryPanelActions } from '@/_stores/queryPanelStore';
import { Tooltip } from 'react-tooltip';
import { canCreateDataSource } from '@/_helpers';
import SolidIcon from '@/_ui/Icon/SolidIcons';
+import { isWorkflowsFeatureEnabled } from '@/modules/common/helpers/utils';
import '../queryManager.theme.scss';
function DataSourcePicker({ dataSources, sampleDataSource, staticDataSources, darkMode, globalDataSources }) {
@@ -50,7 +51,7 @@ function DataSourcePicker({ dataSources, sampleDataSource, staticDataSources, da
navigate(`/${workspaceId}/data-sources`);
};
- const workflowsEnabled = window.public_config?.ENABLE_WORKFLOWS_FEATURE == 'true';
+ const workflowsEnabled = isWorkflowsFeatureEnabled();
return (
<>
diff --git a/frontend/src/Editor/QueryManager/Components/DataSourceSelect.jsx b/frontend/src/Editor/QueryManager/Components/DataSourceSelect.jsx
index d1afd378f4..aef29d8f52 100644
--- a/frontend/src/Editor/QueryManager/Components/DataSourceSelect.jsx
+++ b/frontend/src/Editor/QueryManager/Components/DataSourceSelect.jsx
@@ -14,8 +14,17 @@ import { DataBaseSources, ApiSources, CloudStorageSources } from '@/modules/comm
import { canCreateDataSource } from '@/_helpers';
import './../queryManager.theme.scss';
import { DATA_SOURCE_TYPE } from '@/_helpers/constants';
+import { workflowDefaultSources } from '../constants';
-function DataSourceSelect({ isDisabled, selectRef, closePopup, workflowDataSources, onNewNode, staticDataSources }) {
+function DataSourceSelect({
+ isDisabled,
+ selectRef,
+ closePopup,
+ workflowDataSources,
+ onNewNode,
+ staticDataSources,
+ sampleDataSources = [],
+}) {
const dataSources = useDataSources();
const globalDataSources = useGlobalDataSources();
const sampleDataSource = useSampleDataSource();
@@ -32,6 +41,10 @@ function DataSourceSelect({ isDisabled, selectRef, closePopup, workflowDataSourc
closePopup();
};
+ function cleanWord(word) {
+ return word.replace(/default/g, '');
+ }
+
useEffect(() => {
const shouldAddSampleDataSource = !!sampleDataSource;
const allDataSources = [...dataSources, ...globalDataSources, shouldAddSampleDataSource && sampleDataSource].filter(
@@ -132,6 +145,37 @@ function DataSourceSelect({ isDisabled, selectRef, closePopup, workflowDataSourc
...userDefinedSourcesOpts,
];
+ // Group sample data sources by kind
+ const groupedSampleDataSources =
+ sampleDataSources && sampleDataSources.length > 0
+ ? Object.entries(groupBy(sampleDataSources, 'kind')).map(([kind, sources]) => ({
+ label: (
+
+
+ {dataSourcesKinds.find((dsk) => dsk.kind === kind)?.name || kind}
+
+ ),
+ options: sources.map((source) => ({
+ label: (
+
+ {decodeEntities(source.name)}
+
+
+ ),
+ value: source.id,
+ isNested: true,
+ source,
+ })),
+ }))
+ : [];
+
const dataSourcesAvailable = [
{
label: (
@@ -146,7 +190,7 @@ function DataSourceSelect({ isDisabled, selectRef, closePopup, workflowDataSourc
label: (
{' '}
- {source?.name ?? source.kind}
+ {workflowDefaultSources[cleanWord(source.name)]?.name}
),
value: source.name,
@@ -154,6 +198,22 @@ function DataSourceSelect({ isDisabled, selectRef, closePopup, workflowDataSourc
})),
},
...userDefinedSourcesOpts,
+ // Sample data sources group header
+ ...(groupedSampleDataSources.length > 0
+ ? [
+ {
+ label: (
+
+
+ Sample data sources
+
+
+ ),
+ isDisabled: true,
+ },
+ ...groupedSampleDataSources,
+ ]
+ : []),
];
const dataSourceList = workflowDataSources && workflowDataSources ? dataSourcesAvailable : DataSourceOptions;
diff --git a/frontend/src/Editor/QueryManager/constants.js b/frontend/src/Editor/QueryManager/constants.js
index f06125ef3c..e8f7739429 100644
--- a/frontend/src/Editor/QueryManager/constants.js
+++ b/frontend/src/Editor/QueryManager/constants.js
@@ -106,3 +106,10 @@ export const defaultSources = {
runpy: { kind: 'runpy', id: 'runpy', name: 'Run Python code' },
workflows: { kind: 'workflows', id: 'null', name: 'Run Workflow' },
};
+
+export const workflowDefaultSources = {
+ ...defaultSources,
+ 'If condition': { kind: 'if', id: 'if', name: 'If condition' },
+ Response: { kind: 'response', id: 'response', name: 'Response' },
+ Loop: { kind: 'loop', id: 'loop', name: 'Loop' },
+};
diff --git a/frontend/src/HomePage/AppMenu.jsx b/frontend/src/HomePage/AppMenu.jsx
index e20137aff7..470ebd5cb9 100644
--- a/frontend/src/HomePage/AppMenu.jsx
+++ b/frontend/src/HomePage/AppMenu.jsx
@@ -85,6 +85,12 @@ export const AppMenu = function AppMenu({
)}
{canUpdateApp && canCreateApp && appType !== 'workflow' && (
<>
+ {appType !== 'workflow' && (
+ openAppActionModal('clone-app')}
+ />
+ )}
- )}
+
+ {appType !== 'workflow'
+ ? t('blankPage.importApplication', 'Import an app')
+ : t('blankPage.importWorkflow', 'Import a workflow')}
+