mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-70: Add jdbc driver name as configuration property
This commit is contained in:
parent
841d67ca4a
commit
60544a1856
2 changed files with 112 additions and 60 deletions
|
|
@ -40,8 +40,6 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class PostgreSqlInterpreter extends Interpreter {
|
||||
|
||||
private static final String UPDATE_COUNT_HEADER = "Update Count";
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(PostgreSqlInterpreter.class);
|
||||
|
||||
private static final char WITESPACE = ' ';
|
||||
|
|
@ -49,12 +47,17 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
private static final char TAB = '\t';
|
||||
private static final String TABLE_MAGIC_TAG = "%table ";
|
||||
private static final String EXPLAIN_PREDICATE = "EXPLAIN ";
|
||||
private static final String UPDATE_COUNT_HEADER = "Update Count";
|
||||
|
||||
private static final String POSTGRESQL_DRIVER_NAME = "org.postgresql.Driver";
|
||||
static final String DEFAULT_JDBC_URL = "jdbc:postgresql://localhost:5432/";
|
||||
static final String DEFAULT_JDBC_USER_PASSWORD = "";
|
||||
static final String DEFAULT_JDBC_USER_NAME = "gpadmin";
|
||||
static final String DEFAULT_JDBC_DRIVER_NAME = "org.postgresql.Driver";
|
||||
|
||||
static final String POSTGRESQL_SERVER_URL = "postgresql.url";
|
||||
static final String POSTGRESQL_SERVER_USER = "postgresql.user";
|
||||
static final String POSTGRESQL_SERVER_PASSWORD = "postgresql.password";
|
||||
static final String POSTGRESQL_SERVER_DRIVER_NAME = "postgresql.driver.name";
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
|
|
@ -62,46 +65,42 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
"psql",
|
||||
PostgreSqlInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(POSTGRESQL_SERVER_URL, "jdbc:postgresql://localhost:5432/",
|
||||
"The URL for PostgreSQL.")
|
||||
.add(POSTGRESQL_SERVER_USER, "ambari", "The PostgreSQL user")
|
||||
.add(POSTGRESQL_SERVER_PASSWORD, "", "The password for the PostgreSQL user").build());
|
||||
.add(POSTGRESQL_SERVER_URL, DEFAULT_JDBC_URL, "The URL for PostgreSQL.")
|
||||
.add(POSTGRESQL_SERVER_USER, DEFAULT_JDBC_USER_NAME, "The PostgreSQL user name")
|
||||
.add(POSTGRESQL_SERVER_PASSWORD, DEFAULT_JDBC_USER_PASSWORD,
|
||||
"The PostgreSQL user password")
|
||||
.add(POSTGRESQL_SERVER_DRIVER_NAME, DEFAULT_JDBC_DRIVER_NAME, "JDBC Driver Name")
|
||||
.build());
|
||||
}
|
||||
|
||||
private Connection jdbcConnection;
|
||||
private Exception exceptionOnConnect;
|
||||
private Statement currentStatement;
|
||||
private Exception exceptionOnConnect;
|
||||
|
||||
public PostgreSqlInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
// Test only method
|
||||
protected Connection getJdbcConnection() {
|
||||
return jdbcConnection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
logger.info("Jdbc open connection called!");
|
||||
|
||||
logger.info("Open psql connection!");
|
||||
|
||||
try {
|
||||
Class.forName(POSTGRESQL_DRIVER_NAME);
|
||||
} catch (ClassNotFoundException e) {
|
||||
logger.error("Cannot open connection", e);
|
||||
exceptionOnConnect = e;
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
String driverName = getProperty(POSTGRESQL_SERVER_DRIVER_NAME);
|
||||
String url = getProperty(POSTGRESQL_SERVER_URL);
|
||||
String user = getProperty(POSTGRESQL_SERVER_USER);
|
||||
String password = getProperty(POSTGRESQL_SERVER_PASSWORD);
|
||||
|
||||
Class.forName(driverName);
|
||||
|
||||
jdbcConnection = DriverManager.getConnection(url, user, password);
|
||||
|
||||
exceptionOnConnect = null;
|
||||
logger.info("Successfully created Jdbc connection");
|
||||
} catch (SQLException e) {
|
||||
logger.info("Successfully created psql connection");
|
||||
|
||||
} catch (ClassNotFoundException | SQLException e) {
|
||||
logger.error("Cannot open connection", e);
|
||||
exceptionOnConnect = e;
|
||||
}
|
||||
|
|
@ -109,6 +108,9 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
logger.info("Close psql connection!");
|
||||
|
||||
try {
|
||||
if (getJdbcConnection() != null) {
|
||||
getJdbcConnection().close();
|
||||
|
|
@ -122,16 +124,16 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
|
||||
private InterpreterResult executeSql(String sql) {
|
||||
try {
|
||||
|
||||
|
||||
if (exceptionOnConnect != null) {
|
||||
return new InterpreterResult(Code.ERROR, exceptionOnConnect.getMessage());
|
||||
}
|
||||
|
||||
|
||||
currentStatement = getJdbcConnection().createStatement();
|
||||
|
||||
|
||||
StringBuilder msg = null;
|
||||
boolean isTableType = false;
|
||||
|
||||
|
||||
if (StringUtils.containsIgnoreCase(sql, EXPLAIN_PREDICATE)) {
|
||||
msg = new StringBuilder();
|
||||
} else {
|
||||
|
|
@ -153,20 +155,21 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
if (i > 1) {
|
||||
msg.append(TAB);
|
||||
}
|
||||
msg.append(clean(isTableType, md.getColumnName(i)));
|
||||
msg.append(replaceReservedChars(isTableType, md.getColumnName(i)));
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
|
||||
while (resultSet.next()) {
|
||||
for (int i = 1; i < md.getColumnCount() + 1; i++) {
|
||||
msg.append(clean(isTableType, resultSet.getString(i)));
|
||||
msg.append(replaceReservedChars(isTableType, resultSet.getString(i)));
|
||||
if (i != md.getColumnCount()) {
|
||||
msg.append(TAB);
|
||||
}
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
}
|
||||
} else { // it is an update count or there are no results
|
||||
} else {
|
||||
// Response contains either an update count or there are no results.
|
||||
int updateCount = currentStatement.getUpdateCount();
|
||||
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
|
||||
msg.append(updateCount).append(NEWLINE);
|
||||
|
|
@ -182,8 +185,8 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
InterpreterResult rett = new InterpreterResult(Code.SUCCESS, msg.toString());
|
||||
return rett;
|
||||
return new InterpreterResult(Code.SUCCESS, msg.toString());
|
||||
|
||||
} catch (SQLException ex) {
|
||||
logger.error("Cannot run " + sql, ex);
|
||||
return new InterpreterResult(Code.ERROR, ex.getMessage());
|
||||
|
|
@ -193,7 +196,7 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
/**
|
||||
* For %table response replace Tab and Newline characters from the content.
|
||||
*/
|
||||
private String clean(boolean isTableResponseType, String str) {
|
||||
private String replaceReservedChars(boolean isTableResponseType, String str) {
|
||||
return (!isTableResponseType) ? str : str.replace(TAB, WITESPACE).replace(NEWLINE, WITESPACE);
|
||||
}
|
||||
|
||||
|
|
@ -235,4 +238,9 @@ public class PostgreSqlInterpreter extends Interpreter {
|
|||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Test only method
|
||||
protected Connection getJdbcConnection() {
|
||||
return jdbcConnection;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,10 @@
|
|||
*/
|
||||
package org.apache.zeppelin.postgresql;
|
||||
|
||||
import static org.apache.zeppelin.postgresql.PostgreSqlInterpreter.*;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
|
@ -34,48 +37,72 @@ import com.mockrunner.mock.jdbc.MockResultSet;
|
|||
* PostgreSQL interpreter unit tests
|
||||
*/
|
||||
public class PostgreSqlInterpreterTest extends BasicJDBCTestCaseAdapter {
|
||||
|
||||
|
||||
private PostgreSqlInterpreter psqlInterpreter = null;
|
||||
private MockResultSet result = null;
|
||||
|
||||
|
||||
@Before
|
||||
public void beforeTest() {
|
||||
MockConnection connection = getJDBCMockObjectFactory().getMockConnection();
|
||||
|
||||
StatementResultSetHandler statementHandler = connection.getStatementResultSetHandler();
|
||||
|
||||
StatementResultSetHandler statementHandler = connection.getStatementResultSetHandler();
|
||||
result = statementHandler.createResultSet();
|
||||
statementHandler.prepareGlobalResultSet(result);
|
||||
|
||||
psqlInterpreter = spy(new PostgreSqlInterpreter(new Properties()));
|
||||
when(psqlInterpreter.getJdbcConnection()).thenReturn(connection);
|
||||
when(psqlInterpreter.getJdbcConnection()).thenReturn(connection);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOpenClose() throws SQLException {
|
||||
Properties props = new Properties();
|
||||
props.put(PostgreSqlInterpreter.POSTGRESQL_SERVER_URL, "url");
|
||||
props.put(PostgreSqlInterpreter.POSTGRESQL_SERVER_USER, "user");
|
||||
props.put(PostgreSqlInterpreter.POSTGRESQL_SERVER_PASSWORD, "pass");
|
||||
|
||||
PostgreSqlInterpreter psqlInterpreter = spy(new PostgreSqlInterpreter(props));
|
||||
|
||||
when(psqlInterpreter.getJdbcConnection()).thenReturn(getJDBCMockObjectFactory().getMockConnection());
|
||||
public void testDefaultProperties() throws SQLException {
|
||||
|
||||
PostgreSqlInterpreter psqlInterpreter = new PostgreSqlInterpreter(new Properties());
|
||||
|
||||
assertEquals(DEFAULT_JDBC_DRIVER_NAME, psqlInterpreter.getProperty(POSTGRESQL_SERVER_DRIVER_NAME));
|
||||
assertEquals(DEFAULT_JDBC_URL, psqlInterpreter.getProperty(POSTGRESQL_SERVER_URL));
|
||||
assertEquals(DEFAULT_JDBC_USER_NAME, psqlInterpreter.getProperty(POSTGRESQL_SERVER_USER));
|
||||
assertEquals(DEFAULT_JDBC_USER_PASSWORD, psqlInterpreter.getProperty(POSTGRESQL_SERVER_PASSWORD));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectionClose() throws SQLException {
|
||||
|
||||
PostgreSqlInterpreter psqlInterpreter = spy(new PostgreSqlInterpreter(new Properties()));
|
||||
|
||||
when(psqlInterpreter.getJdbcConnection()).thenReturn(
|
||||
getJDBCMockObjectFactory().getMockConnection());
|
||||
|
||||
psqlInterpreter.open();
|
||||
psqlInterpreter.close();
|
||||
|
||||
|
||||
verifyAllResultSetsClosed();
|
||||
verifyAllStatementsClosed();
|
||||
verifyConnectionClosed();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testStatementCancel() throws SQLException {
|
||||
|
||||
PostgreSqlInterpreter psqlInterpreter = spy(new PostgreSqlInterpreter(new Properties()));
|
||||
|
||||
when(psqlInterpreter.getJdbcConnection()).thenReturn(
|
||||
getJDBCMockObjectFactory().getMockConnection());
|
||||
|
||||
psqlInterpreter.cancel(null);
|
||||
|
||||
verifyAllResultSetsClosed();
|
||||
verifyAllStatementsClosed();
|
||||
assertFalse("Cancel operation should not close the connection", psqlInterpreter
|
||||
.getJdbcConnection().isClosed());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectQuery() throws SQLException {
|
||||
|
||||
String sqlQuery = "select * from t";
|
||||
|
||||
result.addColumn("col1", new String[]{"val11", "val12"});
|
||||
result.addColumn("col2", new String[]{"val21", "val22"});
|
||||
|
||||
result.addColumn("col1", new String[] {"val11", "val12"});
|
||||
result.addColumn("col2", new String[] {"val21", "val22"});
|
||||
|
||||
InterpreterResult interpreterResult = psqlInterpreter.interpret(sqlQuery, null);
|
||||
|
||||
|
|
@ -85,17 +112,16 @@ public class PostgreSqlInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
|
||||
verifySQLStatementExecuted(sqlQuery);
|
||||
verifyAllResultSetsClosed();
|
||||
verifyAllStatementsClosed();
|
||||
verifyAllStatementsClosed();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSelectQueryWithSpecialCharacters() throws SQLException {
|
||||
|
||||
String sqlQuery = "select * from t";
|
||||
|
||||
result.addColumn("co\tl1", new String[]{"val11", "va\tl1\n2"});
|
||||
result.addColumn("co\nl2", new String[]{"v\nal21", "val\t22"});
|
||||
|
||||
result.addColumn("co\tl1", new String[] {"val11", "va\tl1\n2"});
|
||||
result.addColumn("co\nl2", new String[] {"v\nal21", "val\t22"});
|
||||
|
||||
InterpreterResult interpreterResult = psqlInterpreter.interpret(sqlQuery, null);
|
||||
|
||||
|
|
@ -112,8 +138,8 @@ public class PostgreSqlInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
public void testExplainQuery() throws SQLException {
|
||||
|
||||
String sqlQuery = "explain select * from t";
|
||||
|
||||
result.addColumn("col1", new String[]{"val11", "val12"});
|
||||
|
||||
result.addColumn("col1", new String[] {"val11", "val12"});
|
||||
|
||||
InterpreterResult interpreterResult = psqlInterpreter.interpret(sqlQuery, null);
|
||||
|
||||
|
|
@ -125,4 +151,22 @@ public class PostgreSqlInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
verifyAllResultSetsClosed();
|
||||
verifyAllStatementsClosed();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplainQueryWithSpecialCharachters() throws SQLException {
|
||||
|
||||
String sqlQuery = "explain select * from t";
|
||||
|
||||
result.addColumn("co\tl\n1", new String[] {"va\nl11", "va\tl\n12"});
|
||||
|
||||
InterpreterResult interpreterResult = psqlInterpreter.interpret(sqlQuery, null);
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, interpreterResult.type());
|
||||
assertEquals("co\tl\n1\nva\nl11\nva\tl\n12\n", interpreterResult.message());
|
||||
|
||||
verifySQLStatementExecuted(sqlQuery);
|
||||
verifyAllResultSetsClosed();
|
||||
verifyAllStatementsClosed();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue