mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
fix(workflows): make Flowable schema upgrades idempotent to survive partial migrations (#27234)
* fix(workflows): make Flowable schema upgrades idempotent to survive partial migrations Fixes #26048. When the server crashed mid-startup during a Flowable schema upgrade, the DB was left in a partially-migrated state. On restart, Flowable re-ran the same DDL and failed on already-existing objects (indexes, tables, columns), permanently wedging both the server and migrate --force. Changes: 1. WorkflowHandler: webserver now uses DB_SCHEMA_UPDATE_FALSE — it validates the schema but never runs DDL. Only migrate CLI uses DB_SCHEMA_UPDATE_TRUE. 2. OpenMetadataOperations: explicit WorkflowHandler.initialize(config, true) inside the migrate command so Flowable DDL always runs during migration. 3. WorkflowHandler: catches FlowableWrongDbException on webserver startup and rethrows with an actionable message directing the operator to run migrate. 4. IdempotentDdlDataSource + IdempotentDdlStatement: JDBC DataSource wrapper used exclusively in migration context. Intercepts execute(sql) for CREATE INDEX, CREATE TABLE, and ALTER TABLE ADD COLUMN and pre-checks existence via standard DatabaseMetaData (getIndexInfo, getTables, getColumns) before executing. If the object already exists it logs a skip and returns — no SQL state codes, no string matching, works on MySQL and PostgreSQL. Unit tests cover schema-update mode selection in both contexts. * fix(workflows): address review comments on idempotent DDL wrapper - Extract shouldSkip() helper; apply idempotency checks to all execute() and executeUpdate() overloads, not just execute(String) - Tighten ALTER TABLE regex with negative lookahead to exclude SQL keywords (CONSTRAINT, PRIMARY, UNIQUE, FOREIGN, CHECK, INDEX, KEY) from being matched as column names - IdempotentDdlDataSource now wraps a DataSource delegate instead of calling DriverManager directly; uses migrationDataSource() helper in WorkflowHandler to resolve from existing DataSource or JDBC params - Fix InvocationTargetException wrapping in Connection proxy — unwrap cause so callers receive the original SQLException - Wrap all createStatement() variants in the proxy, not just the no-arg form - Contextual error message in WorkflowHandler — distinguish between server startup and migration context - Add IdempotentDdlStatementTest: 11 tests covering skip/execute for CREATE INDEX, CREATE UNIQUE INDEX, CREATE TABLE, ALTER TABLE ADD COLUMN, keyword-guarded ALTER TABLE, executeUpdate overload, and pass-through * fix(workflows): include DB/library versions in FlowableWrongDbException message * test(workflows): add IdempotentDdlDataSourceTest for proxy wrapping and exception surfacing * test(workflows): assert exception identity in proxy exception-surfacing tests * fix(workflows): catalog-aware identifier normalization in IdempotentDdlStatement On MySQL with lower_case_table_names=0 (default on Linux), table names are stored as-is and catalog=null metadata lookups can miss existing objects. - Use connection.getCatalog() for all getIndexInfo/getTables/getColumns calls - Normalize identifiers via DatabaseMetaData.storesLowerCaseIdentifiers() / storesUpperCaseIdentifiers() instead of unconditional toLowerCase() - stripIdentifierQuotes() handles backtick, double-quote and bracket quoting - extractObjectName() handles schema-qualified names (schema.table) - columnExists now iterates and normalizes COLUMN_NAME from ResultSet - Test: added MySQL uppercase storage case to IdempotentDdlStatementTest * fix(workflows): null guard in shouldSkip, drop-create Flowable init, robust test indexing - shouldSkip() returns false immediately for null SQL, preserving JDBC contract (delegate handles null and throws the driver's own error) - drop-create command now calls WorkflowHandler.initialize(config, true) after native migrations so it produces a fully startable DB including Flowable tables - WorkflowHandlerSchemaUpdateTest: replace brittle get(1) with getLast() so the test is not sensitive to how many StandaloneProcessEngineConfiguration instances are constructed before initializeNewProcessEngine runs
This commit is contained in:
parent
bd51ae119b
commit
d2c64d8ac7
7 changed files with 1135 additions and 42 deletions
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLFeatureNotSupportedException;
|
||||
import java.sql.Statement;
|
||||
import java.util.logging.Logger;
|
||||
import javax.sql.DataSource;
|
||||
|
||||
/**
|
||||
* A {@link DataSource} wrapper that intercepts every {@link Connection} it vends and ensures
|
||||
* that all {@link Statement} variants created from that connection go through
|
||||
* {@link IdempotentDdlStatement}. Only used in migration context so Flowable upgrade scripts
|
||||
* skip already-existing DDL objects instead of failing.
|
||||
*/
|
||||
final class IdempotentDdlDataSource implements DataSource {
|
||||
|
||||
private final DataSource delegate;
|
||||
|
||||
IdempotentDdlDataSource(DataSource delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws SQLException {
|
||||
return wrapConnection(delegate.getConnection());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection(String username, String password) throws SQLException {
|
||||
return wrapConnection(delegate.getConnection(username, password));
|
||||
}
|
||||
|
||||
private Connection wrapConnection(Connection real) {
|
||||
return (Connection)
|
||||
Proxy.newProxyInstance(
|
||||
real.getClass().getClassLoader(),
|
||||
new Class<?>[] {Connection.class},
|
||||
(proxy, method, args) -> {
|
||||
if ("createStatement".equals(method.getName())) {
|
||||
Statement stmt = (Statement) invokeDelegate(method, real, args);
|
||||
return new IdempotentDdlStatement(stmt, real);
|
||||
}
|
||||
return invokeDelegate(method, real, args);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes a method on the real connection, unwrapping any {@link InvocationTargetException}
|
||||
* so callers receive the original exception type (e.g. {@link SQLException}).
|
||||
*/
|
||||
private static Object invokeDelegate(Method method, Object target, Object[] args)
|
||||
throws Throwable {
|
||||
try {
|
||||
return method.invoke(target, args);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrintWriter getLogWriter() throws SQLException {
|
||||
return delegate.getLogWriter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLogWriter(PrintWriter out) throws SQLException {
|
||||
delegate.setLogWriter(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLoginTimeout(int seconds) throws SQLException {
|
||||
delegate.setLoginTimeout(seconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getLoginTimeout() throws SQLException {
|
||||
return delegate.getLoginTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
|
||||
return delegate.getParentLogger();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T unwrap(Class<T> iface) throws SQLException {
|
||||
return delegate.unwrap(iface);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWrapperFor(Class<?> iface) throws SQLException {
|
||||
return delegate.isWrapperFor(iface);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,382 @@
|
|||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLWarning;
|
||||
import java.sql.Statement;
|
||||
import java.util.Locale;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* A {@link Statement} wrapper that makes Flowable DDL upgrade scripts idempotent using
|
||||
* standard JDBC {@link DatabaseMetaData} pre-checks. Before executing a CREATE INDEX,
|
||||
* CREATE TABLE, or ALTER TABLE ADD COLUMN statement, it checks whether the object
|
||||
* already exists and skips execution if so. This allows interrupted Flowable schema
|
||||
* upgrades to resume cleanly.
|
||||
*/
|
||||
@Slf4j
|
||||
final class IdempotentDdlStatement implements Statement {
|
||||
|
||||
private static final Pattern CREATE_INDEX_PATTERN =
|
||||
Pattern.compile("(?i)create\\s+(?:unique\\s+)?index\\s+(\\S+)\\s+on\\s+(\\S+)\\s*\\(");
|
||||
private static final Pattern CREATE_TABLE_PATTERN =
|
||||
Pattern.compile("(?i)create\\s+table\\s+(?:if\\s+not\\s+exists\\s+)?(\\S+)\\s*\\(");
|
||||
// Negative lookahead prevents matching SQL keywords (CONSTRAINT, PRIMARY, UNIQUE, etc.)
|
||||
// as column names when the ADD clause is not a column definition.
|
||||
private static final Pattern ALTER_TABLE_ADD_COLUMN_PATTERN =
|
||||
Pattern.compile(
|
||||
"(?i)alter\\s+table\\s+(\\S+)\\s+add\\s+(?:column\\s+)?"
|
||||
+ "(?!constraint\\b|primary\\b|unique\\b|foreign\\b|check\\b|index\\b|key\\b)(\\S+)\\s");
|
||||
|
||||
private final Statement delegate;
|
||||
private final Connection connection;
|
||||
|
||||
IdempotentDdlStatement(Statement delegate, Connection connection) {
|
||||
this.delegate = delegate;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
private String stripIdentifierQuotes(String identifier) {
|
||||
String trimmed = identifier.trim();
|
||||
if (trimmed.length() >= 2) {
|
||||
char first = trimmed.charAt(0);
|
||||
char last = trimmed.charAt(trimmed.length() - 1);
|
||||
if ((first == '`' && last == '`')
|
||||
|| (first == '"' && last == '"')
|
||||
|| (first == '[' && last == ']')) {
|
||||
return trimmed.substring(1, trimmed.length() - 1);
|
||||
}
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
private String extractObjectName(String identifier) {
|
||||
String unquoted = stripIdentifierQuotes(identifier);
|
||||
int dot = unquoted.lastIndexOf('.');
|
||||
return dot >= 0 ? stripIdentifierQuotes(unquoted.substring(dot + 1)) : unquoted;
|
||||
}
|
||||
|
||||
private String normalizeIdentifier(DatabaseMetaData meta, String identifier) throws SQLException {
|
||||
String name = extractObjectName(identifier);
|
||||
if (meta.storesLowerCaseIdentifiers()) return name.toLowerCase(Locale.ROOT);
|
||||
if (meta.storesUpperCaseIdentifiers()) return name.toUpperCase(Locale.ROOT);
|
||||
return name;
|
||||
}
|
||||
|
||||
private boolean shouldSkip(String sql) throws SQLException {
|
||||
if (sql == null) return false;
|
||||
DatabaseMetaData meta = connection.getMetaData();
|
||||
|
||||
Matcher indexMatcher = CREATE_INDEX_PATTERN.matcher(sql);
|
||||
if (indexMatcher.find()) {
|
||||
String indexName = normalizeIdentifier(meta, indexMatcher.group(1));
|
||||
String tableName = normalizeIdentifier(meta, indexMatcher.group(2));
|
||||
if (indexExists(meta, indexName, tableName)) {
|
||||
LOG.info("Skipping already-existing index: {} on {}", indexName, tableName);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
Matcher tableMatcher = CREATE_TABLE_PATTERN.matcher(sql);
|
||||
if (tableMatcher.find()) {
|
||||
String tableName = normalizeIdentifier(meta, tableMatcher.group(1));
|
||||
if (tableExists(meta, tableName)) {
|
||||
LOG.info("Skipping already-existing table: {}", tableName);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
Matcher alterMatcher = ALTER_TABLE_ADD_COLUMN_PATTERN.matcher(sql);
|
||||
if (alterMatcher.find()) {
|
||||
String tableName = normalizeIdentifier(meta, alterMatcher.group(1));
|
||||
String columnName = normalizeIdentifier(meta, alterMatcher.group(2));
|
||||
if (columnExists(meta, tableName, columnName)) {
|
||||
LOG.info("Skipping already-existing column: {}.{}", tableName, columnName);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean execute(String sql) throws SQLException {
|
||||
if (shouldSkip(sql)) return false;
|
||||
return delegate.execute(sql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
|
||||
if (shouldSkip(sql)) return false;
|
||||
return delegate.execute(sql, autoGeneratedKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
|
||||
if (shouldSkip(sql)) return false;
|
||||
return delegate.execute(sql, columnIndexes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean execute(String sql, String[] columnNames) throws SQLException {
|
||||
if (shouldSkip(sql)) return false;
|
||||
return delegate.execute(sql, columnNames);
|
||||
}
|
||||
|
||||
private boolean indexExists(DatabaseMetaData meta, String indexName, String tableName)
|
||||
throws SQLException {
|
||||
String catalog = connection.getCatalog();
|
||||
try (ResultSet rs = meta.getIndexInfo(catalog, null, tableName, false, false)) {
|
||||
while (rs.next()) {
|
||||
String existing = rs.getString("INDEX_NAME");
|
||||
if (existing != null && normalizeIdentifier(meta, existing).equals(indexName)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean tableExists(DatabaseMetaData meta, String tableName) throws SQLException {
|
||||
String catalog = connection.getCatalog();
|
||||
try (ResultSet rs = meta.getTables(catalog, null, tableName, null)) {
|
||||
return rs.next();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean columnExists(DatabaseMetaData meta, String tableName, String columnName)
|
||||
throws SQLException {
|
||||
String catalog = connection.getCatalog();
|
||||
try (ResultSet rs = meta.getColumns(catalog, null, tableName, null)) {
|
||||
while (rs.next()) {
|
||||
String existing = rs.getString("COLUMN_NAME");
|
||||
if (existing != null && normalizeIdentifier(meta, existing).equals(columnName)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// All remaining Statement methods delegate to the underlying statement.
|
||||
|
||||
@Override
|
||||
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
|
||||
return delegate.executeQuery(sql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int executeUpdate(String sql) throws SQLException {
|
||||
if (shouldSkip(sql)) return 0;
|
||||
return delegate.executeUpdate(sql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
|
||||
if (shouldSkip(sql)) return 0;
|
||||
return delegate.executeUpdate(sql, autoGeneratedKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
|
||||
if (shouldSkip(sql)) return 0;
|
||||
return delegate.executeUpdate(sql, columnIndexes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
|
||||
if (shouldSkip(sql)) return 0;
|
||||
return delegate.executeUpdate(sql, columnNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws SQLException {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxFieldSize() throws SQLException {
|
||||
return delegate.getMaxFieldSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxFieldSize(int max) throws SQLException {
|
||||
delegate.setMaxFieldSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxRows() throws SQLException {
|
||||
return delegate.getMaxRows();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxRows(int max) throws SQLException {
|
||||
delegate.setMaxRows(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEscapeProcessing(boolean enable) throws SQLException {
|
||||
delegate.setEscapeProcessing(enable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueryTimeout() throws SQLException {
|
||||
return delegate.getQueryTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueryTimeout(int seconds) throws SQLException {
|
||||
delegate.setQueryTimeout(seconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() throws SQLException {
|
||||
delegate.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SQLWarning getWarnings() throws SQLException {
|
||||
return delegate.getWarnings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearWarnings() throws SQLException {
|
||||
delegate.clearWarnings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCursorName(String name) throws SQLException {
|
||||
delegate.setCursorName(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.sql.ResultSet getResultSet() throws SQLException {
|
||||
return delegate.getResultSet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUpdateCount() throws SQLException {
|
||||
return delegate.getUpdateCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getMoreResults() throws SQLException {
|
||||
return delegate.getMoreResults();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFetchDirection(int direction) throws SQLException {
|
||||
delegate.setFetchDirection(direction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFetchDirection() throws SQLException {
|
||||
return delegate.getFetchDirection();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFetchSize(int rows) throws SQLException {
|
||||
delegate.setFetchSize(rows);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFetchSize() throws SQLException {
|
||||
return delegate.getFetchSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getResultSetConcurrency() throws SQLException {
|
||||
return delegate.getResultSetConcurrency();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getResultSetType() throws SQLException {
|
||||
return delegate.getResultSetType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBatch(String sql) throws SQLException {
|
||||
delegate.addBatch(sql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearBatch() throws SQLException {
|
||||
delegate.clearBatch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int[] executeBatch() throws SQLException {
|
||||
return delegate.executeBatch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws SQLException {
|
||||
return delegate.getConnection();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getMoreResults(int current) throws SQLException {
|
||||
return delegate.getMoreResults(current);
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.sql.ResultSet getGeneratedKeys() throws SQLException {
|
||||
return delegate.getGeneratedKeys();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getResultSetHoldability() throws SQLException {
|
||||
return delegate.getResultSetHoldability();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() throws SQLException {
|
||||
return delegate.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPoolable(boolean poolable) throws SQLException {
|
||||
delegate.setPoolable(poolable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPoolable() throws SQLException {
|
||||
return delegate.isPoolable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeOnCompletion() throws SQLException {
|
||||
delegate.closeOnCompletion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCloseOnCompletion() throws SQLException {
|
||||
return delegate.isCloseOnCompletion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T unwrap(Class<T> iface) throws SQLException {
|
||||
return delegate.unwrap(iface);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWrapperFor(Class<?> iface) throws SQLException {
|
||||
return delegate.isWrapperFor(iface);
|
||||
}
|
||||
}
|
||||
|
|
@ -3,6 +3,8 @@ package org.openmetadata.service.governance.workflows;
|
|||
import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName;
|
||||
import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId;
|
||||
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
|
@ -15,10 +17,12 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import javax.sql.DataSource;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.bpmn.converter.BpmnXMLConverter;
|
||||
import org.flowable.common.engine.api.FlowableObjectNotFoundException;
|
||||
import org.flowable.common.engine.api.FlowableWrongDbException;
|
||||
import org.flowable.common.engine.impl.el.DefaultExpressionManager;
|
||||
import org.flowable.engine.HistoryService;
|
||||
import org.flowable.engine.ManagementService;
|
||||
|
|
@ -74,13 +78,16 @@ public class WorkflowHandler {
|
|||
|
||||
private WorkflowHandler(OpenMetadataApplicationConfig config, boolean isMigrationContext) {
|
||||
this.isMigrationContext = isMigrationContext;
|
||||
ProcessEngineConfiguration processEngineConfiguration =
|
||||
new StandaloneProcessEngineConfiguration()
|
||||
.setJdbcUrl(config.getDataSourceFactory().getUrl())
|
||||
.setJdbcUsername(config.getDataSourceFactory().getUser())
|
||||
.setJdbcPassword(config.getDataSourceFactory().getPassword())
|
||||
.setJdbcDriver(config.getDataSourceFactory().getDriverClass())
|
||||
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
|
||||
StandaloneProcessEngineConfiguration processEngineConfiguration =
|
||||
new StandaloneProcessEngineConfiguration();
|
||||
processEngineConfiguration.setJdbcUrl(config.getDataSourceFactory().getUrl());
|
||||
processEngineConfiguration.setJdbcUsername(config.getDataSourceFactory().getUser());
|
||||
processEngineConfiguration.setJdbcPassword(config.getDataSourceFactory().getPassword());
|
||||
processEngineConfiguration.setJdbcDriver(config.getDataSourceFactory().getDriverClass());
|
||||
processEngineConfiguration.setDatabaseSchemaUpdate(
|
||||
isMigrationContext
|
||||
? ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE
|
||||
: ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
|
||||
|
||||
if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) {
|
||||
processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL);
|
||||
|
|
@ -100,6 +107,57 @@ public class WorkflowHandler {
|
|||
config.getPipelineServiceClientConfiguration()));
|
||||
}
|
||||
|
||||
private static DataSource migrationDataSource(ProcessEngineConfiguration config) {
|
||||
if (config.getDataSource() != null) {
|
||||
return config.getDataSource();
|
||||
}
|
||||
String url = config.getJdbcUrl();
|
||||
String user = config.getJdbcUsername();
|
||||
String password = config.getJdbcPassword();
|
||||
return new DataSource() {
|
||||
@Override
|
||||
public java.io.PrintWriter getLogWriter() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLogWriter(java.io.PrintWriter out) {}
|
||||
|
||||
@Override
|
||||
public void setLoginTimeout(int seconds) {}
|
||||
|
||||
@Override
|
||||
public int getLoginTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.util.logging.Logger getParentLogger() {
|
||||
return java.util.logging.Logger.getLogger("migration");
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T unwrap(Class<T> iface) throws SQLException {
|
||||
throw new SQLException("Not a wrapper");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWrapperFor(Class<?> iface) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.sql.Connection getConnection() throws SQLException {
|
||||
return DriverManager.getConnection(url, user, password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.sql.Connection getConnection(String u, String p) throws SQLException {
|
||||
return DriverManager.getConnection(url, u, p);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void initializeNewProcessEngine(
|
||||
ProcessEngineConfiguration currentProcessEngineConfiguration) {
|
||||
ProcessEngines.destroy();
|
||||
|
|
@ -109,40 +167,44 @@ public class WorkflowHandler {
|
|||
StandaloneProcessEngineConfiguration processEngineConfiguration =
|
||||
new StandaloneProcessEngineConfiguration();
|
||||
|
||||
// Setting Database Configuration
|
||||
processEngineConfiguration
|
||||
.setJdbcUrl(currentProcessEngineConfiguration.getJdbcUrl())
|
||||
.setJdbcUsername(currentProcessEngineConfiguration.getJdbcUsername())
|
||||
.setJdbcPassword(currentProcessEngineConfiguration.getJdbcPassword())
|
||||
.setJdbcDriver(currentProcessEngineConfiguration.getJdbcDriver())
|
||||
.setDatabaseType(currentProcessEngineConfiguration.getDatabaseType())
|
||||
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
|
||||
|
||||
// Setting Async Executor Configuration
|
||||
processEngineConfiguration
|
||||
.setAsyncExecutorActivate(!isMigrationContext)
|
||||
.setAsyncExecutorCorePoolSize(workflowSettings.getExecutorConfiguration().getCorePoolSize())
|
||||
.setAsyncExecutorMaxPoolSize(workflowSettings.getExecutorConfiguration().getMaxPoolSize())
|
||||
.setAsyncExecutorThreadPoolQueueSize(
|
||||
workflowSettings.getExecutorConfiguration().getQueueSize())
|
||||
.setAsyncExecutorAsyncJobLockTimeInMillis(
|
||||
workflowSettings.getExecutorConfiguration().getJobLockTimeInMillis())
|
||||
.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(
|
||||
workflowSettings.getExecutorConfiguration().getTasksDuePerAcquisition())
|
||||
.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(
|
||||
workflowSettings.getExecutorConfiguration().getAsyncJobAcquisitionInterval())
|
||||
.setAsyncExecutorDefaultTimerJobAcquireWaitTime(
|
||||
workflowSettings.getExecutorConfiguration().getTimerJobAcquisitionInterval());
|
||||
|
||||
// Setting History CleanUp - disable during migration to prevent race conditions
|
||||
processEngineConfiguration
|
||||
.setAsyncHistoryEnabled(true)
|
||||
.setEnableHistoryCleaning(!isMigrationContext)
|
||||
.setCleanInstancesEndedAfter(
|
||||
Duration.ofDays(
|
||||
workflowSettings.getHistoryCleanUpConfiguration().getCleanAfterNumberOfDays()))
|
||||
.setHistoryCleaningTimeCycleConfig(
|
||||
workflowSettings.getHistoryCleanUpConfiguration().getTimeCycleConfig());
|
||||
if (isMigrationContext) {
|
||||
processEngineConfiguration.setDataSource(
|
||||
new IdempotentDdlDataSource(migrationDataSource(currentProcessEngineConfiguration)));
|
||||
} else {
|
||||
processEngineConfiguration.setJdbcUrl(currentProcessEngineConfiguration.getJdbcUrl());
|
||||
processEngineConfiguration.setJdbcUsername(
|
||||
currentProcessEngineConfiguration.getJdbcUsername());
|
||||
processEngineConfiguration.setJdbcPassword(
|
||||
currentProcessEngineConfiguration.getJdbcPassword());
|
||||
processEngineConfiguration.setJdbcDriver(currentProcessEngineConfiguration.getJdbcDriver());
|
||||
}
|
||||
processEngineConfiguration.setDatabaseType(currentProcessEngineConfiguration.getDatabaseType());
|
||||
processEngineConfiguration.setDatabaseSchemaUpdate(
|
||||
isMigrationContext
|
||||
? ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE
|
||||
: ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
|
||||
processEngineConfiguration.setAsyncExecutorActivate(!isMigrationContext);
|
||||
processEngineConfiguration.setAsyncExecutorCorePoolSize(
|
||||
workflowSettings.getExecutorConfiguration().getCorePoolSize());
|
||||
processEngineConfiguration.setAsyncExecutorMaxPoolSize(
|
||||
workflowSettings.getExecutorConfiguration().getMaxPoolSize());
|
||||
processEngineConfiguration.setAsyncExecutorThreadPoolQueueSize(
|
||||
workflowSettings.getExecutorConfiguration().getQueueSize());
|
||||
processEngineConfiguration.setAsyncExecutorAsyncJobLockTimeInMillis(
|
||||
workflowSettings.getExecutorConfiguration().getJobLockTimeInMillis());
|
||||
processEngineConfiguration.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(
|
||||
workflowSettings.getExecutorConfiguration().getTasksDuePerAcquisition());
|
||||
processEngineConfiguration.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(
|
||||
workflowSettings.getExecutorConfiguration().getAsyncJobAcquisitionInterval());
|
||||
processEngineConfiguration.setAsyncExecutorDefaultTimerJobAcquireWaitTime(
|
||||
workflowSettings.getExecutorConfiguration().getTimerJobAcquisitionInterval());
|
||||
processEngineConfiguration.setAsyncHistoryEnabled(true);
|
||||
processEngineConfiguration.setEnableHistoryCleaning(!isMigrationContext);
|
||||
processEngineConfiguration.setCleanInstancesEndedAfter(
|
||||
Duration.ofDays(
|
||||
workflowSettings.getHistoryCleanUpConfiguration().getCleanAfterNumberOfDays()));
|
||||
processEngineConfiguration.setHistoryCleaningTimeCycleConfig(
|
||||
workflowSettings.getHistoryCleanUpConfiguration().getTimeCycleConfig());
|
||||
|
||||
// Add Expression Manager
|
||||
processEngineConfiguration.setExpressionManager(new DefaultExpressionManager(expressionMap));
|
||||
|
|
@ -150,7 +212,21 @@ public class WorkflowHandler {
|
|||
// Add Global Failure Listener
|
||||
processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener()));
|
||||
|
||||
this.processEngine = processEngineConfiguration.buildProcessEngine();
|
||||
try {
|
||||
this.processEngine = processEngineConfiguration.buildProcessEngine();
|
||||
} catch (FlowableWrongDbException e) {
|
||||
String hint =
|
||||
isMigrationContext
|
||||
? String.format(
|
||||
"Flowable schema version mismatch during migration: DB has '%s', library expects '%s'. "
|
||||
+ "Re-running migrate should auto-heal any partial upgrade.",
|
||||
e.getDbVersion(), e.getLibraryVersion())
|
||||
: String.format(
|
||||
"Flowable schema not initialized or at unexpected version (DB: '%s', expected: '%s'). "
|
||||
+ "Run `openmetadata-ops.sh migrate` before starting the server.",
|
||||
e.getDbVersion(), e.getLibraryVersion());
|
||||
throw new IllegalStateException(hint, e);
|
||||
}
|
||||
|
||||
// Add SqlMapper
|
||||
processEngine
|
||||
|
|
|
|||
|
|
@ -954,6 +954,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||
LOG.info("Running the Native Migrations.");
|
||||
validateAndRunSystemDataMigrations(true);
|
||||
LOG.info("OpenMetadata Database Schema is Updated.");
|
||||
WorkflowHandler.initialize(config, true);
|
||||
LOG.info("create indexes.");
|
||||
searchRepository.createIndexes();
|
||||
searchRepository.createOrUpdateIndexTemplates();
|
||||
|
|
@ -1035,6 +1036,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||
LOG.info("Migrating the OpenMetadata Schema.");
|
||||
parseConfig();
|
||||
validateAndRunSystemDataMigrations(force);
|
||||
LOG.info("Running Flowable schema upgrade.");
|
||||
WorkflowHandler.initialize(config, true);
|
||||
LOG.info("Update Search Indexes.");
|
||||
searchRepository.updateIndexes();
|
||||
LOG.info("Update Index Templates.");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import javax.sql.DataSource;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class IdempotentDdlDataSourceTest {
|
||||
|
||||
private DataSource delegate;
|
||||
private Connection realConnection;
|
||||
private IdempotentDdlDataSource dataSource;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
delegate = mock(DataSource.class);
|
||||
realConnection = mock(Connection.class);
|
||||
when(delegate.getConnection()).thenReturn(realConnection);
|
||||
dataSource = new IdempotentDdlDataSource(delegate);
|
||||
}
|
||||
|
||||
@Test
|
||||
void createStatementNoArgIsWrapped() throws Exception {
|
||||
when(realConnection.createStatement()).thenReturn(mock(Statement.class));
|
||||
|
||||
Statement stmt = dataSource.getConnection().createStatement();
|
||||
|
||||
assertInstanceOf(IdempotentDdlStatement.class, stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
void createStatementTwoArgIsWrapped() throws Exception {
|
||||
when(realConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY))
|
||||
.thenReturn(mock(Statement.class));
|
||||
|
||||
Statement stmt =
|
||||
dataSource
|
||||
.getConnection()
|
||||
.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
|
||||
assertInstanceOf(IdempotentDdlStatement.class, stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
void createStatementThreeArgIsWrapped() throws Exception {
|
||||
when(realConnection.createStatement(
|
||||
ResultSet.TYPE_FORWARD_ONLY,
|
||||
ResultSet.CONCUR_READ_ONLY,
|
||||
ResultSet.HOLD_CURSORS_OVER_COMMIT))
|
||||
.thenReturn(mock(Statement.class));
|
||||
|
||||
Statement stmt =
|
||||
dataSource
|
||||
.getConnection()
|
||||
.createStatement(
|
||||
ResultSet.TYPE_FORWARD_ONLY,
|
||||
ResultSet.CONCUR_READ_ONLY,
|
||||
ResultSet.HOLD_CURSORS_OVER_COMMIT);
|
||||
|
||||
assertInstanceOf(IdempotentDdlStatement.class, stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
void sqlExceptionFromConnectionSurfacesDirectly() throws Exception {
|
||||
SQLException expected = new SQLException("connection refused");
|
||||
when(realConnection.createStatement()).thenThrow(expected);
|
||||
|
||||
Connection proxied = dataSource.getConnection();
|
||||
|
||||
SQLException actual = assertThrows(SQLException.class, proxied::createStatement);
|
||||
assertSame(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
void sqlExceptionFromNonCreateStatementMethodSurfacesDirectly() throws Exception {
|
||||
SQLException expected = new SQLException("autocommit failed");
|
||||
when(realConnection.getAutoCommit()).thenThrow(expected);
|
||||
|
||||
Connection proxied = dataSource.getConnection();
|
||||
|
||||
SQLException actual = assertThrows(SQLException.class, proxied::getAutoCommit);
|
||||
assertSame(expected, actual);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,245 @@
|
|||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Statement;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class IdempotentDdlStatementTest {
|
||||
|
||||
private Statement delegate;
|
||||
private Connection connection;
|
||||
private DatabaseMetaData meta;
|
||||
private IdempotentDdlStatement stmt;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
delegate = mock(Statement.class);
|
||||
connection = mock(Connection.class);
|
||||
meta = mock(DatabaseMetaData.class);
|
||||
when(connection.getMetaData()).thenReturn(meta);
|
||||
when(connection.getCatalog()).thenReturn("openmetadata");
|
||||
// Default: DB stores lowercase (PostgreSQL behaviour)
|
||||
when(meta.storesLowerCaseIdentifiers()).thenReturn(true);
|
||||
when(meta.storesUpperCaseIdentifiers()).thenReturn(false);
|
||||
stmt = new IdempotentDdlStatement(delegate, connection);
|
||||
}
|
||||
|
||||
// --- CREATE INDEX ---
|
||||
|
||||
@Test
|
||||
void skipsCreateIndexWhenIndexExists() throws Exception {
|
||||
ResultSet rs = mockResultSetWithIndexName("ACT_IDX_BYTEAR_DEPL");
|
||||
when(meta.getIndexInfo(
|
||||
eq("openmetadata"), isNull(), eq("act_ge_bytearray"), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(rs);
|
||||
|
||||
boolean result =
|
||||
stmt.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)");
|
||||
|
||||
assertFalse(result);
|
||||
verify(delegate, never()).execute(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void executesCreateIndexWhenIndexAbsent() throws Exception {
|
||||
ResultSet rs = emptyResultSet();
|
||||
when(meta.getIndexInfo(
|
||||
eq("openmetadata"), isNull(), eq("act_ge_bytearray"), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(rs);
|
||||
when(delegate.execute(anyString())).thenReturn(false);
|
||||
|
||||
stmt.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)");
|
||||
|
||||
verify(delegate)
|
||||
.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void skipsCreateUniqueIndex() throws Exception {
|
||||
ResultSet rs = mockResultSetWithIndexName("ACT_UNIQ_MEMB");
|
||||
when(meta.getIndexInfo(
|
||||
eq("openmetadata"), isNull(), eq("act_id_membership"), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(rs);
|
||||
|
||||
boolean result =
|
||||
stmt.execute(
|
||||
"CREATE UNIQUE INDEX ACT_UNIQ_MEMB ON ACT_ID_MEMBERSHIP (USER_ID_, GROUP_ID_)");
|
||||
|
||||
assertFalse(result);
|
||||
verify(delegate, never()).execute(anyString());
|
||||
}
|
||||
|
||||
// --- CREATE TABLE ---
|
||||
|
||||
@Test
|
||||
void skipsCreateTableWhenTableExists() throws Exception {
|
||||
ResultSet rs = singleRowResultSet();
|
||||
when(meta.getTables(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull()))
|
||||
.thenReturn(rs);
|
||||
|
||||
boolean result = stmt.execute("CREATE TABLE ACT_RU_ACTINST (ID_ varchar(64) NOT NULL)");
|
||||
|
||||
assertFalse(result);
|
||||
verify(delegate, never()).execute(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void executesCreateTableWhenTableAbsent() throws Exception {
|
||||
ResultSet rs = emptyResultSet();
|
||||
when(meta.getTables(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull()))
|
||||
.thenReturn(rs);
|
||||
when(delegate.execute(anyString())).thenReturn(false);
|
||||
|
||||
stmt.execute("CREATE TABLE ACT_RU_ACTINST (ID_ varchar(64) NOT NULL)");
|
||||
|
||||
verify(delegate).execute("CREATE TABLE ACT_RU_ACTINST (ID_ varchar(64) NOT NULL)");
|
||||
}
|
||||
|
||||
// --- ALTER TABLE ADD COLUMN ---
|
||||
|
||||
@Test
|
||||
void skipsAlterTableAddColumnWhenColumnExists() throws Exception {
|
||||
ResultSet rs = mockResultSetWithColumnName("completed_by_");
|
||||
when(meta.getColumns(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull()))
|
||||
.thenReturn(rs);
|
||||
|
||||
boolean result =
|
||||
stmt.execute("ALTER TABLE ACT_RU_ACTINST ADD COLUMN COMPLETED_BY_ varchar(255)");
|
||||
|
||||
assertFalse(result);
|
||||
verify(delegate, never()).execute(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void executesAlterTableAddColumnWhenColumnAbsent() throws Exception {
|
||||
ResultSet rs = emptyResultSet();
|
||||
when(meta.getColumns(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull()))
|
||||
.thenReturn(rs);
|
||||
when(delegate.execute(anyString())).thenReturn(false);
|
||||
|
||||
stmt.execute("ALTER TABLE ACT_RU_ACTINST ADD COLUMN COMPLETED_BY_ varchar(255)");
|
||||
|
||||
verify(delegate).execute("ALTER TABLE ACT_RU_ACTINST ADD COLUMN COMPLETED_BY_ varchar(255)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void doesNotMatchAlterTableAddConstraint() throws Exception {
|
||||
when(delegate.execute(anyString())).thenReturn(false);
|
||||
|
||||
stmt.execute("ALTER TABLE ACT_RU_TASK ADD CONSTRAINT PK_RU PRIMARY KEY (ID_)");
|
||||
|
||||
verify(delegate).execute("ALTER TABLE ACT_RU_TASK ADD CONSTRAINT PK_RU PRIMARY KEY (ID_)");
|
||||
verify(meta, never()).getColumns(anyString(), anyString(), anyString(), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void doesNotMatchAlterTableAddPrimaryKey() throws Exception {
|
||||
when(delegate.execute(anyString())).thenReturn(false);
|
||||
|
||||
stmt.execute("ALTER TABLE ACT_RU_TASK ADD PRIMARY KEY (ID_)");
|
||||
|
||||
verify(delegate).execute("ALTER TABLE ACT_RU_TASK ADD PRIMARY KEY (ID_)");
|
||||
verify(meta, never()).getColumns(anyString(), anyString(), anyString(), anyString());
|
||||
}
|
||||
|
||||
// --- executeUpdate overloads ---
|
||||
|
||||
@Test
|
||||
void skipsExecuteUpdateForExistingIndex() throws Exception {
|
||||
ResultSet rs = mockResultSetWithIndexName("ACT_IDX_BYTEAR_DEPL");
|
||||
when(meta.getIndexInfo(
|
||||
eq("openmetadata"), isNull(), eq("act_ge_bytearray"), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(rs);
|
||||
|
||||
int result =
|
||||
stmt.executeUpdate("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)");
|
||||
|
||||
assertEquals(0, result);
|
||||
verify(delegate, never()).executeUpdate(anyString());
|
||||
}
|
||||
|
||||
// --- Pass-through ---
|
||||
|
||||
@Test
|
||||
void passesThroughNonDdlStatements() throws Exception {
|
||||
String sql = "SELECT * FROM ACT_GE_PROPERTY WHERE NAME_ = 'common.schema.version'";
|
||||
when(delegate.execute(sql)).thenReturn(true);
|
||||
|
||||
stmt.execute(sql);
|
||||
|
||||
verify(delegate).execute(sql);
|
||||
}
|
||||
|
||||
@Test
|
||||
void skipsCreateIndexOnMysqlWithUpperCaseStoredIdentifiers() throws Exception {
|
||||
// MySQL lower_case_table_names=0: identifiers stored as-is (uppercase)
|
||||
when(meta.storesLowerCaseIdentifiers()).thenReturn(false);
|
||||
when(meta.storesUpperCaseIdentifiers()).thenReturn(true);
|
||||
|
||||
ResultSet rs = mockResultSetWithIndexName("ACT_IDX_BYTEAR_DEPL");
|
||||
when(meta.getIndexInfo(
|
||||
eq("openmetadata"), isNull(), eq("ACT_GE_BYTEARRAY"), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(rs);
|
||||
|
||||
boolean result =
|
||||
stmt.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)");
|
||||
|
||||
assertFalse(result);
|
||||
verify(delegate, never()).execute(anyString());
|
||||
}
|
||||
|
||||
// --- Helpers ---
|
||||
|
||||
private ResultSet mockResultSetWithIndexName(String name) throws Exception {
|
||||
ResultSet rs = mock(ResultSet.class);
|
||||
when(rs.next()).thenReturn(true, false);
|
||||
when(rs.getString("INDEX_NAME")).thenReturn(name);
|
||||
return rs;
|
||||
}
|
||||
|
||||
private ResultSet mockResultSetWithColumnName(String name) throws Exception {
|
||||
ResultSet rs = mock(ResultSet.class);
|
||||
when(rs.next()).thenReturn(true, false);
|
||||
when(rs.getString("COLUMN_NAME")).thenReturn(name);
|
||||
return rs;
|
||||
}
|
||||
|
||||
private ResultSet singleRowResultSet() throws Exception {
|
||||
ResultSet rs = mock(ResultSet.class);
|
||||
when(rs.next()).thenReturn(true);
|
||||
return rs;
|
||||
}
|
||||
|
||||
private ResultSet emptyResultSet() throws Exception {
|
||||
ResultSet rs = mock(ResultSet.class);
|
||||
when(rs.next()).thenReturn(false);
|
||||
return rs;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Copyright 2024 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.governance.workflows;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.mockConstruction;
|
||||
import static org.mockito.Mockito.mockStatic;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import org.flowable.common.engine.api.FlowableWrongDbException;
|
||||
import org.flowable.engine.ProcessEngine;
|
||||
import org.flowable.engine.ProcessEngineConfiguration;
|
||||
import org.flowable.engine.ProcessEngines;
|
||||
import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.MockedConstruction;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.openmetadata.schema.configuration.WorkflowSettings;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
|
||||
import org.openmetadata.service.jdbi3.HikariCPDataSourceFactory;
|
||||
import org.openmetadata.service.jdbi3.SystemRepository;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
class WorkflowHandlerSchemaUpdateTest {
|
||||
|
||||
@BeforeEach
|
||||
@AfterEach
|
||||
void resetWorkflowHandlerState() throws ReflectiveOperationException {
|
||||
setStaticField("initialized", false);
|
||||
setStaticField("instance", null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void runtimeModeWrapsFlowableWrongDbExceptionWithActionableMessage() {
|
||||
try (MockedConstruction<StandaloneProcessEngineConfiguration> engineMock =
|
||||
mockConstruction(
|
||||
StandaloneProcessEngineConfiguration.class,
|
||||
(mock, ctx) ->
|
||||
when(mock.buildProcessEngine())
|
||||
.thenThrow(new FlowableWrongDbException("7.2.0.2", "7.1.0.0")));
|
||||
MockedStatic<ProcessEngines> ignored = mockStatic(ProcessEngines.class);
|
||||
MockedStatic<Entity> entityMock = mockStatic(Entity.class);
|
||||
MockedStatic<PipelineServiceClientFactory> pscMock =
|
||||
mockStatic(PipelineServiceClientFactory.class)) {
|
||||
|
||||
setupEntityMock(entityMock);
|
||||
pscMock
|
||||
.when(() -> PipelineServiceClientFactory.createPipelineServiceClient(any()))
|
||||
.thenReturn(null);
|
||||
|
||||
IllegalStateException ex =
|
||||
assertThrows(
|
||||
IllegalStateException.class,
|
||||
() -> WorkflowHandler.initialize(buildMockConfig(), false));
|
||||
|
||||
assertTrue(ex.getMessage().contains("openmetadata-ops.sh migrate"));
|
||||
assertInstanceOf(FlowableWrongDbException.class, ex.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void migrationModeSetsDbSchemaUpdateTrue() {
|
||||
ProcessEngine mockEngine = mock(ProcessEngine.class, RETURNS_DEEP_STUBS);
|
||||
|
||||
try (MockedConstruction<StandaloneProcessEngineConfiguration> engineMock =
|
||||
mockConstruction(
|
||||
StandaloneProcessEngineConfiguration.class,
|
||||
(mock, ctx) -> when(mock.buildProcessEngine()).thenReturn(mockEngine));
|
||||
MockedStatic<ProcessEngines> ignored = mockStatic(ProcessEngines.class);
|
||||
MockedStatic<Entity> entityMock = mockStatic(Entity.class);
|
||||
MockedStatic<PipelineServiceClientFactory> pscMock =
|
||||
mockStatic(PipelineServiceClientFactory.class)) {
|
||||
|
||||
setupEntityMock(entityMock);
|
||||
pscMock
|
||||
.when(() -> PipelineServiceClientFactory.createPipelineServiceClient(any()))
|
||||
.thenReturn(null);
|
||||
|
||||
WorkflowHandler.initialize(buildMockConfig(), true);
|
||||
|
||||
StandaloneProcessEngineConfiguration engineConfig = engineMock.constructed().getLast();
|
||||
verify(engineConfig)
|
||||
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void runtimeModeSetsDbSchemaUpdateFalse() {
|
||||
try (MockedConstruction<StandaloneProcessEngineConfiguration> engineMock =
|
||||
mockConstruction(
|
||||
StandaloneProcessEngineConfiguration.class,
|
||||
(mock, ctx) ->
|
||||
when(mock.buildProcessEngine())
|
||||
.thenThrow(new FlowableWrongDbException("7.2.0.2", "7.1.0.0")));
|
||||
MockedStatic<ProcessEngines> ignored = mockStatic(ProcessEngines.class);
|
||||
MockedStatic<Entity> entityMock = mockStatic(Entity.class);
|
||||
MockedStatic<PipelineServiceClientFactory> pscMock =
|
||||
mockStatic(PipelineServiceClientFactory.class)) {
|
||||
|
||||
setupEntityMock(entityMock);
|
||||
pscMock
|
||||
.when(() -> PipelineServiceClientFactory.createPipelineServiceClient(any()))
|
||||
.thenReturn(null);
|
||||
|
||||
assertThrows(
|
||||
IllegalStateException.class, () -> WorkflowHandler.initialize(buildMockConfig(), false));
|
||||
|
||||
StandaloneProcessEngineConfiguration engineConfig = engineMock.constructed().getLast();
|
||||
verify(engineConfig)
|
||||
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
private void setupEntityMock(MockedStatic<Entity> entityMock) {
|
||||
SystemRepository systemRepository = mock(SystemRepository.class);
|
||||
WorkflowSettings workflowSettings = mock(WorkflowSettings.class, RETURNS_DEEP_STUBS);
|
||||
entityMock.when(Entity::getSystemRepository).thenReturn(systemRepository);
|
||||
lenient().when(systemRepository.getWorkflowSettingsOrDefault()).thenReturn(workflowSettings);
|
||||
}
|
||||
|
||||
private OpenMetadataApplicationConfig buildMockConfig() {
|
||||
OpenMetadataApplicationConfig config = mock(OpenMetadataApplicationConfig.class);
|
||||
HikariCPDataSourceFactory dsf = mock(HikariCPDataSourceFactory.class);
|
||||
lenient().when(config.getDataSourceFactory()).thenReturn(dsf);
|
||||
lenient().when(dsf.getUrl()).thenReturn("jdbc:postgresql://localhost:5432/openmetadata_db");
|
||||
lenient().when(dsf.getUser()).thenReturn("openmetadata_user");
|
||||
lenient().when(dsf.getPassword()).thenReturn("openmetadata_password");
|
||||
lenient().when(dsf.getDriverClass()).thenReturn("org.postgresql.Driver");
|
||||
lenient().when(config.getPipelineServiceClientConfiguration()).thenReturn(null);
|
||||
return config;
|
||||
}
|
||||
|
||||
private static void setStaticField(String fieldName, Object value)
|
||||
throws ReflectiveOperationException {
|
||||
Field field = WorkflowHandler.class.getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
field.set(null, value);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue