fix: Async workflow in runjs

This commit is contained in:
Akshay Sasidharan 2025-07-02 13:21:18 +05:30
parent 131ff6a288
commit 1cfdec70d7
8 changed files with 112 additions and 66 deletions

@ -1 +1 @@
Subproject commit b117b06fd7dbc43a45d317084ae27b0aedbf4a35
Subproject commit 820a61e32907263b62fcb9a2fb5ba1389627614c

View file

@ -9,6 +9,7 @@ import axios from 'axios';
import { validateMultilineCode } from '@/_helpers/utility';
import { convertMapSet, getQueryVariables } from '@/AppBuilder/_utils/queryPanel';
import { deepClone } from '@/_helpers/utilities/utils.helpers';
const queryManagerPreferences = JSON.parse(localStorage.getItem('queryManagerPreferences')) ?? {};
const initialState = {
@ -27,7 +28,6 @@ const initialState = {
loadingDataQueries: false,
isPreviewQueryLoading: false,
queryPanelSearchTem: '',
asyncQueryRuns: [], // Array to track active AsyncQueryHandler instances
};
export const createQueryPanelSlice = (set, get) => ({
@ -561,42 +561,27 @@ export const createQueryPanelSlice = (set, get) => ({
// Change this conditional to async query type check for other
// async queries in the future
if (query.kind === 'workflows') {
try {
const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({
executionId: data.data.executionId,
queryId,
processQueryResults,
handleFailure,
shouldSetPreviewData,
setPreviewData,
setResolvedQuery,
});
const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({
data,
queryId,
processQueryResults,
handleFailure,
shouldSetPreviewData,
setPreviewData,
setResolvedQuery,
});
// Process initial response and start SSE monitoring
asyncHandler.processInitialResponse(data.data);
if (error) {
resolve({ status: 'failed', message: error });
return;
}
// Add the AsyncQueryHandler instance to asyncQueryRuns
get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]);
// Set initial state with jobId
setResolvedQuery(
queryId,
{
isLoading: true,
jobId: asyncHandler.jobId,
// data: data.data,
},
moduleId
);
// Resolve with async status
resolve({
jobId: asyncHandler.jobId,
// data: data.data,
});
} catch (error) {
toast.error(error.message || 'Failed to start async query');
resolve({ status: 'failed', message: error.message });
if (!error && completionPromise) {
// This early resolution pattern is temporary - once the UI fully supports
// tracking individual async queries through their lifecycle, we can refactor
// this to rely on the completion promise concurrently
const result = await completionPromise;
resolve(result);
}
return;
}
@ -758,44 +743,45 @@ export const createQueryPanelSlice = (set, get) => ({
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
if (!calledFromQuery) setPreviewData(finalData);
resolve({ status: 'failed', data: finalData });
return finalData;
return { status: 'failed', data: finalData };
}
}
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
if (!calledFromQuery) setPreviewData(finalData);
resolve({ status: 'ok', data: finalData });
return { status: 'ok', data: finalData };
};
const handleFailurePreview = (errorData) => {
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
if (!calledFromQuery) setPreviewData(errorData);
resolve({ status: 'failed', data: errorData });
return errorData;
return { status: 'failed', data: errorData };
};
try {
const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({
executionId: data.data.executionId,
queryId: query.id,
processQueryResults: processQueryResultsPreview,
handleFailure: handleFailurePreview,
shouldSetPreviewData: true,
setPreviewData,
setResolvedQuery: () => {}, // No resolvedQuery for preview
});
// Process initial response and start SSE monitoring
asyncHandler.processInitialResponse(data.data);
get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]);
// Resolve immediately with jobId for UI tracking
resolve({ jobId: asyncHandler.jobId });
} catch (error) {
toast.error(error.message || 'Failed to start async preview query');
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
resolve({ status: 'failed', message: error.message });
const { error, completionPromise } = get().queryPanel.setupAsyncWorkflowHandler({
data,
queryId: query.id,
processQueryResults: processQueryResultsPreview,
handleFailure: handleFailurePreview,
shouldSetPreviewData: true,
setPreviewData,
setResolvedQuery: () => {}, // No resolvedQuery for preview
resolve,
});
if (!error && completionPromise) {
try {
// This early resolution pattern is temporary - once the UI fully supports
// tracking individual async queries through their lifecycle, we can refactor
// this to rely on the completion promise concurrently
const result = await completionPromise;
resolve(result);
} catch (error) {
toast.error('Async operation failed:', error);
setPreviewLoading(false);
setIsPreviewQueryLoading(false);
resolve({ status: 'failed', message: error?.message || 'Unknown error' });
}
}
return;
}
@ -1339,6 +1325,48 @@ export const createQueryPanelSlice = (set, get) => ({
isQuerySelected: (queryId) => {
return get().queryPanel.selectedQuery?.id === queryId;
},
setupAsyncWorkflowHandler: ({
data,
queryId,
processQueryResults,
handleFailure,
shouldSetPreviewData,
setPreviewData,
setResolvedQuery,
}) => {
try {
const asyncHandler = get().queryPanel.createWorkflowAsyncHandler({
executionId: data.data.executionId,
queryId,
processQueryResults,
handleFailure,
shouldSetPreviewData,
setPreviewData,
setResolvedQuery,
});
// Process initial response and start SSE monitoring
const { __asyncCompletionPromise } = asyncHandler.processInitialResponse(data.data);
// Add the AsyncQueryHandler instance to asyncQueryRuns
get().queryPanel.setAsyncQueryRuns((currentRuns) => [...currentRuns, asyncHandler]);
if (setResolvedQuery) {
setResolvedQuery(queryId, {
isLoading: true,
jobId: asyncHandler.jobId,
});
}
return {
handler: asyncHandler,
completionPromise: __asyncCompletionPromise,
};
} catch (error) {
return { error };
}
},
runQueryOnShortcut: () => {
const { queryPanel } = get();
const { runQuery, selectedQuery } = queryPanel;

View file

@ -41,7 +41,7 @@ export class AsyncQueryHandler {
/**
* Processes the initial query response and starts SSE monitoring
* @param {Object} response - The initial query response
* @returns {{ jobId: string, cancel: Function }} Status object with jobId and control methods
* @returns {{ __jobId: string, __cancel: Function, __asyncCompletionPromise: Promise<any> }} Status object with jobId, control methods, and completion promise
*/
processInitialResponse(response) {
const jobId = this.config.extractJobId(response);
@ -49,7 +49,14 @@ export class AsyncQueryHandler {
this.jobId = jobId;
this.eventSource = this.startSSE(jobId);
return { jobId, cancel: () => this.cancel() };
// Return the reserved async completion promise for consumers
this.__asyncCompletionPromise =
this.__asyncCompletionPromise ||
new Promise((resolve, reject) => {
this.resolveCompletion = resolve;
this.rejectCompletion = reject;
});
return { __jobId: jobId, __cancel: () => this.cancel(), __asyncCompletionPromise: this.__asyncCompletionPromise };
}
/**
@ -84,10 +91,12 @@ export class AsyncQueryHandler {
case 'COMPLETE':
eventSource.close();
this.config.callbacks.onComplete(result);
this.resolveCompletion(result);
break;
case 'ERROR':
eventSource.close();
this.config.callbacks.onError(data);
this.rejectCompletion(data);
break;
case 'CLOSE':
eventSource.close();

@ -1 +1 @@
Subproject commit 61ab78568c7843d0fbee66768db04c7f9c482945
Subproject commit cde2ebcce6c552ba5d9ba49099cd4aae82ca5e1c

View file

@ -9,12 +9,15 @@ import { ValidAppGuard } from '../guards/valid-app.guard';
import { AppDecorator as App } from '@modules/app/decorators/app.decorator';
import { WorkflowService } from '../services/workflow.service';
import { IWorkflowController } from '../interfaces/IControllerWorkflow';
import { InitFeature } from '@modules/app/decorators/init-feature.decorator';
import { FEATURE_KEY } from '../constants';
@InitModule(MODULES.APP)
@Controller('apps')
export class WorkflowController implements IWorkflowController {
constructor(protected readonly workflowService: WorkflowService) {}
@InitFeature(FEATURE_KEY.GET)
@UseGuards(JwtAuthGuard, ValidAppGuard, FeatureAbilityGuard)
@Get(':id/workflows')
async fetchWorkflows(@App() app: AppEntity) {

View file

@ -30,7 +30,9 @@ export class AppsModule {
static async register(configs: { IS_GET_CONTEXT: boolean }): Promise<DynamicModule> {
const importPath = await getImportPath(configs.IS_GET_CONTEXT);
const { AppsController } = await import(`${importPath}/apps/controller`);
const { WorkflowController } = await import(`${importPath}/apps/controllers/workflow.controller`);
const { AppsService } = await import(`${importPath}/apps/service`);
const { WorkflowService } = await import(`${importPath}/apps/services/workflow.service`);
const { AppsUtilService } = await import(`${importPath}/apps/util.service`);
const { AppEnvironmentUtilService } = await import(`${importPath}/app-environments/util.service`);
const { PageService } = await import(`${importPath}/apps/services/page.service`);
@ -53,9 +55,10 @@ export class AppsModule {
await AppPermissionsModule.register(configs),
await UsersModule.register(configs),
],
controllers: [AppsController],
controllers: [AppsController, WorkflowController],
providers: [
AppsService,
WorkflowService,
VersionRepository,
AppsRepository,
AppGitRepository,

View file

@ -22,6 +22,7 @@ export function defineWorkflowVersionAbility(
FEATURE_KEY.UPDATE,
FEATURE_KEY.UPDATE_SETTINGS,
FEATURE_KEY.PROMOTE,
FEATURE_KEY.APP_VERSION_UPDATE,
],
App
);
@ -41,6 +42,7 @@ export function defineWorkflowVersionAbility(
FEATURE_KEY.UPDATE,
FEATURE_KEY.UPDATE_SETTINGS,
FEATURE_KEY.PROMOTE,
FEATURE_KEY.APP_VERSION_UPDATE,
],
App
);
@ -58,6 +60,7 @@ export function defineWorkflowVersionAbility(
FEATURE_KEY.UPDATE,
FEATURE_KEY.UPDATE_SETTINGS,
FEATURE_KEY.PROMOTE,
FEATURE_KEY.APP_VERSION_UPDATE,
],
App
);

View file

@ -67,7 +67,7 @@ export class WorkflowExecutionsController implements IWorkflowExecutionControlle
@InitFeature(FEATURE_KEY.WORKFLOW_EXECUTION_STATUS)
@Sse(':id/stream')
streamWorkflowExecution(@Param('id') id: string): Observable<MessageEvent> {
async streamWorkflowExecution(@Param('id') id: string): Promise<Observable<MessageEvent>> {
throw new Error('Method not implemented.');
}
}