diff --git a/frontend/ee b/frontend/ee index b117b06fd7..820a61e329 160000 --- a/frontend/ee +++ b/frontend/ee @@ -1 +1 @@ -Subproject commit b117b06fd7dbc43a45d317084ae27b0aedbf4a35 +Subproject commit 820a61e32907263b62fcb9a2fb5ba1389627614c diff --git a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js index fa917e54b6..776542d831 100644 --- a/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js +++ b/frontend/src/AppBuilder/_stores/slices/queryPanelSlice.js @@ -9,6 +9,7 @@ import axios from 'axios'; import { validateMultilineCode } from '@/_helpers/utility'; import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel'; import { deepClone } from '@/_helpers/utilities/utils.helpers'; + const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {}; const initialState = { @@ -27,7 +28,6 @@ const initialState = { loadingDataQueries: false, isPreviewQueryLoading: false, queryPanelSearchTem: '', - asyncQueryRuns: [], // Array to track active AsyncQueryHandler instances }; export const createQueryPanelSlice = (set, get) => ({ @@ -561,42 +561,27 @@ export const createQueryPanelSlice = (set, get) => ({ // Change this conditional to async query type check for other // async queries in the future if (query.kind === 'workflows') { - try { - const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ - executionId: data.data.executionId, - queryId, - processQueryResults, - handleFailure, - shouldSetPreviewData, - setPreviewData, - setResolvedQuery, - }); + const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({ + data, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }); - // Process initial response and start SSE monitoring - asyncHandler.processInitialResponse(data.data); + if (error) { + resolve({ status: 'failed', message: error }); + return; + } - // Add the AsyncQueryHandler instance to asyncQueryRuns - get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); - - // Set initial state with jobId - setResolvedQuery( - queryId, - { - isLoading: true, - jobId: asyncHandler.jobId, - // data: data.data, - }, - moduleId - ); - - // Resolve with async status - resolve({ - jobId: asyncHandler.jobId, - // data: data.data, - }); - } catch (error) { - toast.error(error.message || 'Failed to start async query'); - resolve({ status: 'failed', message: error.message }); + if (!error && completionPromise) { + // This early resolution pattern is temporary - once the UI fully supports + // tracking individual async queries through their lifecycle, we can refactor + // this to rely on the completion promise concurrently + const result = await completionPromise; + resolve(result); } return; } @@ -758,44 +743,45 @@ export const createQueryPanelSlice = (set, get) => ({ setPreviewLoading(false); setIsPreviewQueryLoading(false); if (!calledFromQuery) setPreviewData(finalData); - resolve({ status: 'failed', data: finalData }); - return finalData; + return { status: 'failed', data: finalData }; } } setPreviewLoading(false); setIsPreviewQueryLoading(false); if (!calledFromQuery) setPreviewData(finalData); - resolve({ status: 'ok', data: finalData }); return { status: 'ok', data: finalData }; }; const handleFailurePreview = (errorData) => { setPreviewLoading(false); setIsPreviewQueryLoading(false); if (!calledFromQuery) setPreviewData(errorData); - resolve({ status: 'failed', data: errorData }); - return errorData; + return { status: 'failed', data: errorData }; }; - try { - const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ - executionId: data.data.executionId, - queryId: query.id, - processQueryResults: processQueryResultsPreview, - handleFailure: handleFailurePreview, - shouldSetPreviewData: true, - setPreviewData, - setResolvedQuery: () => {}, // No resolvedQuery for preview - }); - // Process initial response and start SSE monitoring - asyncHandler.processInitialResponse(data.data); - get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); - // Resolve immediately with jobId for UI tracking - resolve({ jobId: asyncHandler.jobId }); - } catch (error) { - toast.error(error.message || 'Failed to start async preview query'); - setPreviewLoading(false); - setIsPreviewQueryLoading(false); - resolve({ status: 'failed', message: error.message }); + const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({ + data, + queryId: query.id, + processQueryResults: processQueryResultsPreview, + handleFailure: handleFailurePreview, + shouldSetPreviewData: true, + setPreviewData, + setResolvedQuery: () => {}, // No resolvedQuery for preview + resolve, + }); + + if (!error && completionPromise) { + try { + // This early resolution pattern is temporary - once the UI fully supports + // tracking individual async queries through their lifecycle, we can refactor + // this to rely on the completion promise concurrently + const result = await completionPromise; + resolve(result); + } catch (error) { + toast.error('Async operation failed:', error); + setPreviewLoading(false); + setIsPreviewQueryLoading(false); + resolve({ status: 'failed', message: error?.message || 'Unknown error' }); + } } return; } @@ -1339,6 +1325,48 @@ export const createQueryPanelSlice = (set, get) => ({ isQuerySelected: (queryId) => { return get().queryPanel.selectedQuery?.id === queryId; }, + + setupAsyncWorkflowHandler: ({ + data, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }) => { + try { + const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({ + executionId: data.data.executionId, + queryId, + processQueryResults, + handleFailure, + shouldSetPreviewData, + setPreviewData, + setResolvedQuery, + }); + + // Process initial response and start SSE monitoring + const { __asyncCompletionPromise } = asyncHandler.processInitialResponse(data.data); + + // Add the AsyncQueryHandler instance to asyncQueryRuns + get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]); + + if (setResolvedQuery) { + setResolvedQuery(queryId, { + isLoading: true, + jobId: asyncHandler.jobId, + }); + } + + return { + handler: asyncHandler, + completionPromise: __asyncCompletionPromise, + }; + } catch (error) { + return { error }; + } + }, runQueryOnShortcut: () => { const { queryPanel } = get(); const { runQuery, selectedQuery } = queryPanel; diff --git a/frontend/src/AppBuilder/_utils/async-query-handler.js b/frontend/src/AppBuilder/_utils/async-query-handler.js index b23590343f..ea2202797e 100644 --- a/frontend/src/AppBuilder/_utils/async-query-handler.js +++ b/frontend/src/AppBuilder/_utils/async-query-handler.js @@ -41,7 +41,7 @@ export class AsyncQueryHandler { /** * Processes the initial query response and starts SSE monitoring * @param {Object} response - The initial query response - * @returns {{ jobId: string, cancel: Function }} Status object with jobId and control methods + * @returns {{ __jobId: string, __cancel: Function, __asyncCompletionPromise: Promise }} Status object with jobId, control methods, and completion promise */ processInitialResponse(response) { const jobId = this.config.extractJobId(response); @@ -49,7 +49,14 @@ export class AsyncQueryHandler { this.jobId = jobId; this.eventSource = this.startSSE(jobId); - return { jobId, cancel: () => this.cancel() }; + // Return the reserved async completion promise for consumers + this.__asyncCompletionPromise = + this.__asyncCompletionPromise || + new Promise((resolve, reject) => { + this.resolveCompletion = resolve; + this.rejectCompletion = reject; + }); + return { __jobId: jobId, __cancel: () => this.cancel(), __asyncCompletionPromise: this.__asyncCompletionPromise }; } /** @@ -84,10 +91,12 @@ export class AsyncQueryHandler { case 'COMPLETE': eventSource.close(); this.config.callbacks.onComplete(result); + this.resolveCompletion(result); break; case 'ERROR': eventSource.close(); this.config.callbacks.onError(data); + this.rejectCompletion(data); break; case 'CLOSE': eventSource.close(); diff --git a/server/ee b/server/ee index 61ab78568c..cde2ebcce6 160000 --- a/server/ee +++ b/server/ee @@ -1 +1 @@ -Subproject commit 61ab78568c7843d0fbee66768db04c7f9c482945 +Subproject commit cde2ebcce6c552ba5d9ba49099cd4aae82ca5e1c diff --git a/server/src/modules/apps/controllers/workflow.controller.ts b/server/src/modules/apps/controllers/workflow.controller.ts index cc91daf852..32f724cfd0 100644 --- a/server/src/modules/apps/controllers/workflow.controller.ts +++ b/server/src/modules/apps/controllers/workflow.controller.ts @@ -9,12 +9,15 @@ import { ValidAppGuard } from '../guards/valid-app.guard'; import { AppDecorator as App } from '@modules/app/decorators/app.decorator'; import { WorkflowService } from '../services/workflow.service'; import { IWorkflowController } from '../interfaces/IControllerWorkflow'; +import { InitFeature } from '@modules/app/decorators/init-feature.decorator'; +import { FEATURE_KEY } from '../constants'; @InitModule(MODULES.APP) @Controller('apps') export class WorkflowController implements IWorkflowController { constructor(protected readonly workflowService: WorkflowService) {} + @InitFeature(FEATURE_KEY.GET) @UseGuards(JwtAuthGuard, ValidAppGuard, FeatureAbilityGuard) @Get(':id/workflows') async fetchWorkflows(@App() app: AppEntity) { diff --git a/server/src/modules/apps/module.ts b/server/src/modules/apps/module.ts index af6c809b6e..4bbaa2c6fe 100644 --- a/server/src/modules/apps/module.ts +++ b/server/src/modules/apps/module.ts @@ -30,7 +30,9 @@ export class AppsModule { static async register(configs: { IS_GET_CONTEXT: boolean }): Promise { const importPath = await getImportPath(configs.IS_GET_CONTEXT); const { AppsController } = await import(`${importPath}/apps/controller`); + const { WorkflowController } = await import(`${importPath}/apps/controllers/workflow.controller`); const { AppsService } = await import(`${importPath}/apps/service`); + const { WorkflowService } = await import(`${importPath}/apps/services/workflow.service`); const { AppsUtilService } = await import(`${importPath}/apps/util.service`); const { AppEnvironmentUtilService } = await import(`${importPath}/app-environments/util.service`); const { PageService } = await import(`${importPath}/apps/services/page.service`); @@ -53,9 +55,10 @@ export class AppsModule { await AppPermissionsModule.register(configs), await UsersModule.register(configs), ], - controllers: [AppsController], + controllers: [AppsController, WorkflowController], providers: [ AppsService, + WorkflowService, VersionRepository, AppsRepository, AppGitRepository, diff --git a/server/src/modules/versions/ability/workflow-version.ability.ts b/server/src/modules/versions/ability/workflow-version.ability.ts index 16c54f8e92..e3733a5b3a 100644 --- a/server/src/modules/versions/ability/workflow-version.ability.ts +++ b/server/src/modules/versions/ability/workflow-version.ability.ts @@ -22,6 +22,7 @@ export function defineWorkflowVersionAbility( FEATURE_KEY.UPDATE, FEATURE_KEY.UPDATE_SETTINGS, FEATURE_KEY.PROMOTE, + FEATURE_KEY.APP_VERSION_UPDATE, ], App ); @@ -41,6 +42,7 @@ export function defineWorkflowVersionAbility( FEATURE_KEY.UPDATE, FEATURE_KEY.UPDATE_SETTINGS, FEATURE_KEY.PROMOTE, + FEATURE_KEY.APP_VERSION_UPDATE, ], App ); @@ -58,6 +60,7 @@ export function defineWorkflowVersionAbility( FEATURE_KEY.UPDATE, FEATURE_KEY.UPDATE_SETTINGS, FEATURE_KEY.PROMOTE, + FEATURE_KEY.APP_VERSION_UPDATE, ], App ); diff --git a/server/src/modules/workflows/controllers/workflow-executions.controller.ts b/server/src/modules/workflows/controllers/workflow-executions.controller.ts index c6ce798e38..0917173d7f 100644 --- a/server/src/modules/workflows/controllers/workflow-executions.controller.ts +++ b/server/src/modules/workflows/controllers/workflow-executions.controller.ts @@ -67,7 +67,7 @@ export class WorkflowExecutionsController implements IWorkflowExecutionControlle @InitFeature(FEATURE_KEY.WORKFLOW_EXECUTION_STATUS) @Sse(':id/stream') - streamWorkflowExecution(@Param('id') id: string): Observable { + async streamWorkflowExecution(@Param('id') id: string): Promise> { throw new Error('Method not implemented.'); } }