fix impersonation

This commit is contained in:
astroshim 2016-11-11 17:15:10 +09:00
parent 99621818ae
commit a2f568707d
3 changed files with 279 additions and 160 deletions

View file

@ -105,16 +105,12 @@ public class JDBCInterpreter extends Interpreter {
static final String EMPTY_COLUMN_VALUE = "";
private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use";
private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection";
private final String DBCP_STRING = "jdbc:apache:commons:dbcp:";
private final HashMap<String, Properties> propertiesMap;
private final Map<String, Statement> paragraphIdStatementMap;
private final Map<String, PoolingDriver> poolingDriverMap;
private final HashMap<String, Properties> basePropretiesMap;
private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap;
private final Map<String, SqlCompleter> propertyKeySqlCompleterMap;
private static final Function<CharSequence, InterpreterCompletion> sequenceToStringTransformer =
@ -128,14 +124,13 @@ public class JDBCInterpreter extends Interpreter {
public JDBCInterpreter(Properties property) {
super(property);
propertiesMap = new HashMap<>();
paragraphIdStatementMap = new HashMap<>();
poolingDriverMap = new HashMap<>();
jdbcUserConfigurationsMap = new HashMap<>();
propertyKeySqlCompleterMap = new HashMap<>();
basePropretiesMap = new HashMap<>();
}
public HashMap<String, Properties> getPropertiesMap() {
return propertiesMap;
return basePropretiesMap;
}
@Override
@ -145,21 +140,22 @@ public class JDBCInterpreter extends Interpreter {
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
logger.info("key: {}, value: {}", keyValue[0], keyValue[1]);
Properties prefixProperties;
if (propertiesMap.containsKey(keyValue[0])) {
prefixProperties = propertiesMap.get(keyValue[0]);
if (basePropretiesMap.containsKey(keyValue[0])) {
prefixProperties = basePropretiesMap.get(keyValue[0]);
} else {
prefixProperties = new Properties();
propertiesMap.put(keyValue[0], prefixProperties);
basePropretiesMap.put(keyValue[0], prefixProperties);
}
prefixProperties.put(keyValue[1], property.getProperty(propertyKey));
}
}
Set<String> removeKeySet = new HashSet<>();
for (String key : propertiesMap.keySet()) {
for (String key : basePropretiesMap.keySet()) {
if (!COMMON_KEY.equals(key)) {
Properties properties = propertiesMap.get(key);
Properties properties = basePropretiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
@ -169,15 +165,14 @@ public class JDBCInterpreter extends Interpreter {
}
for (String key : removeKeySet) {
propertiesMap.remove(key);
basePropretiesMap.remove(key);
}
logger.debug("propertiesMap: {}", propertiesMap);
logger.debug("JDBC PropretiesMap: {}", basePropretiesMap);
if (!StringUtils.isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
JDBCSecurityImpl.createSecureConfiguration(property);
}
for (String propertyKey : propertiesMap.keySet()) {
for (String propertyKey : basePropretiesMap.keySet()) {
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(null));
}
}
@ -199,127 +194,24 @@ public class JDBCInterpreter extends Interpreter {
return completer;
}
private boolean isConnectionInPool(String driverName) {
return poolingDriverMap.containsKey(driverName);
}
private void createConnectionPool(String url, String propertyKey, Properties properties) {
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(url, properties);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, null);
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
PoolingDriver driver = new PoolingDriver();
driver.registerPool(propertyKey, connectionPool);
poolingDriverMap.put(propertyKey, driver);
}
private Connection getConnectionFromPool(String url, String propertyKey, Properties properties)
throws SQLException {
if (!isConnectionInPool(propertyKey)) {
createConnectionPool(url, propertyKey, properties);
}
return DriverManager.getConnection(DBCP_STRING + propertyKey);
}
public Connection getConnection(String propertyKey, String user)
throws ClassNotFoundException, SQLException, InterpreterException {
Connection connection = null;
if (propertyKey == null || propertiesMap.get(propertyKey) == null) {
return null;
}
if (null == connection) {
final Properties properties = (Properties) propertiesMap.get(propertyKey).clone();
logger.info(properties.getProperty(DRIVER_KEY));
Class.forName(properties.getProperty(DRIVER_KEY));
final String url = properties.getProperty(URL_KEY);
if (StringUtils.isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
connection = getConnectionFromPool(url, propertyKey, properties);
} else {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
switch (authType) {
case KERBEROS:
if (user == null) {
connection = getConnectionFromPool(url, propertyKey, properties);
} else {
if ("hive".equalsIgnoreCase(propertyKey)) {
connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user,
propertyKey, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getCurrentUser());
} catch (Exception e) {
logger.error("Error in createProxyUser", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
final String poolKey = propertyKey;
try {
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return getConnectionFromPool(url, poolKey, properties);
}
});
} catch (Exception e) {
logger.error("Error in doAs", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
}
}
break;
default:
connection = getConnectionFromPool(url, propertyKey, properties);
}
}
}
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(connection));
return connection;
}
private void initStatementMap() {
for (Statement statement : paragraphIdStatementMap.values()) {
for (JDBCUserConfigurations configurations : jdbcUserConfigurationsMap.values()) {
try {
statement.close();
configurations.initStatementMap();
} catch (Exception e) {
logger.error("Error while closing paragraphIdStatementMap statement...", e);
}
}
paragraphIdStatementMap.clear();
}
private void initConnectionPoolMap() throws SQLException {
Iterator<String> it = poolingDriverMap.keySet().iterator();
while (it.hasNext()) {
String driverName = it.next();
poolingDriverMap.get(driverName).closePool(driverName);
it.remove();
private void initConnectionPoolMap() {
for (JDBCUserConfigurations configurations : jdbcUserConfigurationsMap.values()) {
try {
configurations.initConnectionPoolMap();
} catch (Exception e) {
logger.error("Error while closing initConnectionPoolMap...", e);
}
}
poolingDriverMap.clear();
}
private void saveStatement(String key, Statement statement) throws SQLException {
paragraphIdStatementMap.put(key, statement);
statement.setMaxRows(getMaxResult());
}
private void removeStatement(String key) {
paragraphIdStatementMap.remove(key);
}
@Override
@ -332,15 +224,6 @@ public class JDBCInterpreter extends Interpreter {
}
}
private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
String replName) {
UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials();
if (uc != null) {
return uc.getUsernamePassword(replName);
}
return null;
}
private String getEntityName(String replName) {
StringBuffer entityName = new StringBuffer();
entityName.append(INTERPRETER_NAME);
@ -349,30 +232,168 @@ public class JDBCInterpreter extends Interpreter {
return entityName.toString();
}
private boolean existAccountInProperty() {
private String getJDBCDriverName(String user, String propertyKey) {
StringBuffer driverName = new StringBuffer();
driverName.append(DBCP_STRING);
driverName.append(propertyKey);
driverName.append(user);
return driverName.toString();
}
private boolean existAccountInBaseProperty() {
return property.containsKey(JDBC_DEFAULT_USER_KEY) &&
property.containsKey(JDBC_DEFAULT_PASSWORD_KEY);
}
public void setAccountOfCredential(String propertyKey, InterpreterContext interpreterContext) {
private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
String replName) {
UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials();
if (uc != null) {
return uc.getUsernamePassword(replName);
}
return null;
}
private JDBCUserConfigurations getJDBCConfiguration(String user) {
JDBCUserConfigurations jdbcUserConfigurations =
jdbcUserConfigurationsMap.get(user);
if (jdbcUserConfigurations == null) {
jdbcUserConfigurations = new JDBCUserConfigurations();
jdbcUserConfigurationsMap.put(user, jdbcUserConfigurations);
}
return jdbcUserConfigurations;
}
public void setUserProperty(String propertyKey, InterpreterContext interpreterContext)
throws SQLException {
String user = interpreterContext.getAuthenticationInfo().getUser();
JDBCUserConfigurations jdbcUserConfigurations =
getJDBCConfiguration(user);
jdbcUserConfigurations.setPropertyMap(propertyKey, basePropretiesMap.get(propertyKey));
if (existAccountInBaseProperty()) {
return;
}
jdbcUserConfigurations.cleanUserProperty(propertyKey);
UsernamePassword usernamePassword = getUsernamePassword(interpreterContext,
getEntityName(interpreterContext.getReplName()));
if (usernamePassword != null && !existAccountInProperty()) {
propertiesMap.get(propertyKey).setProperty("user", usernamePassword.getUsername());
propertiesMap.get(propertyKey).setProperty("password", usernamePassword.getPassword());
if (usernamePassword != null) {
jdbcUserConfigurations.setUserProperty(propertyKey, usernamePassword);
} else {
PoolingDriver poolingDriver = jdbcUserConfigurations.removeDBDriverPool(propertyKey);
poolingDriver.closePool(propertyKey + user);
}
}
private void createConnectionPool(String url, String user, String propertyKey,
Properties properties) throws SQLException, ClassNotFoundException {
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(url, properties);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, null);
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
Class.forName(properties.getProperty(DRIVER_KEY));
PoolingDriver driver = new PoolingDriver();
driver.registerPool(propertyKey + user, connectionPool);
jdbcUserConfigurationsMap.get(user).saveDBDriverPool(propertyKey, driver);
}
private Connection getConnectionFromPool(String url, String user, String propertyKey,
Properties properties) throws SQLException, ClassNotFoundException {
String jdbcDriver = getJDBCDriverName(user, propertyKey);
if (!jdbcUserConfigurationsMap.get(user).isConnectionInDBDriverPool(propertyKey)) {
createConnectionPool(url, user, propertyKey, properties);
}
return DriverManager.getConnection(jdbcDriver);
}
public Connection getConnection(String propertyKey, InterpreterContext interpreterContext)
throws ClassNotFoundException, SQLException, InterpreterException {
final String user = interpreterContext.getAuthenticationInfo().getUser();
Connection connection;
if (propertyKey == null || basePropretiesMap.get(propertyKey) == null) {
return null;
}
JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
setUserProperty(propertyKey, interpreterContext);
final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey);
final String url = properties.getProperty(URL_KEY);
if (StringUtils.isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
connection = getConnectionFromPool(url, user, propertyKey, properties);
} else {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
switch (authType) {
case KERBEROS:
if (user == null) {
connection = getConnectionFromPool(url, user, propertyKey, properties);
} else {
if ("hive".equalsIgnoreCase(propertyKey)) {
connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user,
user, propertyKey, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getCurrentUser());
} catch (Exception e) {
logger.error("Error in createProxyUser", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
final String poolKey = propertyKey;
try {
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return getConnectionFromPool(url, user, poolKey, properties);
}
});
} catch (Exception e) {
logger.error("Error in doAs", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
}
}
break;
default:
connection = getConnectionFromPool(url, user, propertyKey, properties);
}
}
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(connection));
return connection;
}
private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
String paragraphId = interpreterContext.getParagraphId();
String user = interpreterContext.getAuthenticationInfo().getUser();
Connection connection;
Statement statement;
ResultSet resultSet = null;
JDBCUserConfigurations jdbcUserConfigurations = jdbcUserConfigurationsMap.get(user);
try {
setAccountOfCredential(propertyKey, interpreterContext);
connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
connection = getConnection(propertyKey, interpreterContext);
if (connection == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
}
@ -393,8 +414,7 @@ public class JDBCInterpreter extends Interpreter {
}
try {
saveStatement(paragraphId +
interpreterContext.getAuthenticationInfo().getUser(), statement);
jdbcUserConfigurations.saveStatement(paragraphId, statement);
boolean isResultSetAvailable = statement.execute(sql);
@ -452,8 +472,7 @@ public class JDBCInterpreter extends Interpreter {
connection.close();
} catch (SQLException e) { /*ignored*/ }
}
removeStatement(paragraphId +
interpreterContext.getAuthenticationInfo().getUser());
jdbcUserConfigurations.removeStatement(paragraphId);
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
@ -465,8 +484,8 @@ public class JDBCInterpreter extends Interpreter {
String errorMsg = new String(baos.toByteArray(), StandardCharsets.UTF_8);
try {
PoolingDriver driver = poolingDriverMap.remove(propertyKey);
driver.closePool(propertyKey);
PoolingDriver poolingDriver = jdbcUserConfigurations.removeDBDriverPool(propertyKey);
poolingDriver.closePool(propertyKey + user);
} catch (SQLException e1) {
e1.printStackTrace();
}
@ -502,12 +521,12 @@ public class JDBCInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
logger.info("Cancel current query statement.");
String paragraphId = context.getParagraphId();
JDBCUserConfigurations jdbcUserConfigurations =
getJDBCConfiguration(context.getAuthenticationInfo().getUser());
try {
paragraphIdStatementMap.get(paragraphId + context.getAuthenticationInfo().getUser()).cancel();
jdbcUserConfigurations.cancelStatement(paragraphId);
} catch (SQLException e) {
logger.error("Error while cancelling...", e);
}
@ -564,7 +583,7 @@ public class JDBCInterpreter extends Interpreter {
public int getMaxResult() {
return Integer.valueOf(
propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
basePropretiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
}
boolean isConcurrentExecution() {

View file

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.zeppelin.jdbc;
import org.apache.commons.dbcp2.PoolingDriver;
import org.apache.zeppelin.user.UsernamePassword;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
/**
* UserConfigurations for JDBC impersonation.
*/
public class JDBCUserConfigurations {
private final Map<String, Statement> paragraphIdStatementMap;
private final Map<String, PoolingDriver> poolingDriverMap;
private final HashMap<String, Properties> propertiesMap;
public JDBCUserConfigurations() {
paragraphIdStatementMap = new HashMap<>();
poolingDriverMap = new HashMap<>();
propertiesMap = new HashMap<>();
}
public void initStatementMap() throws SQLException {
for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
paragraphIdStatementMap.clear();
}
public void initConnectionPoolMap() throws SQLException {
Iterator<String> it = poolingDriverMap.keySet().iterator();
while (it.hasNext()) {
String driverName = it.next();
poolingDriverMap.get(driverName).closePool(driverName);
it.remove();
}
poolingDriverMap.clear();
}
public void setPropertyMap(String key, Properties properties) {
Properties p = (Properties) properties.clone();
propertiesMap.put(key, p);
}
public Properties getPropertyMap(String key) {
return propertiesMap.get(key);
}
public void cleanUserProperty(String propertyKey) {
propertiesMap.get(propertyKey).remove("user");
propertiesMap.get(propertyKey).remove("password");
}
public void setUserProperty(String propertyKey, UsernamePassword usernamePassword) {
propertiesMap.get(propertyKey).setProperty("user", usernamePassword.getUsername());
propertiesMap.get(propertyKey).setProperty("password", usernamePassword.getPassword());
}
public void saveStatement(String key, Statement statement) throws SQLException {
paragraphIdStatementMap.put(key, statement);
}
public void cancelStatement(String key) throws SQLException {
paragraphIdStatementMap.get(key).cancel();
}
public void removeStatement(String key) {
paragraphIdStatementMap.remove(key);
}
public void saveDBDriverPool(String key, PoolingDriver driver) throws SQLException {
poolingDriverMap.put(key, driver);
}
public PoolingDriver removeDBDriverPool(String key) throws SQLException {
return poolingDriverMap.remove(key);
}
public boolean isConnectionInDBDriverPool(String key) {
return poolingDriverMap.containsKey(key);
}
}

View file

@ -293,7 +293,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterContext itCtx1 = new InterpreterContext("", "1", replName1, "", "", authInfo, null, null, null, null,
null, null);
jdbcInterpreter1.setAccountOfCredential("default", itCtx1);
jdbcInterpreter1.setUserProperty("default", itCtx1);
assertEquals(properties.getProperty("default.user"),
jdbcInterpreter1.getPropertiesMap().get("default").getProperty("user"));
@ -314,7 +314,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterContext itCtx2 = new InterpreterContext("", "1", replName1, "", "", authInfo, null, null, null, null,
null, null);
jdbcInterpreter2.setAccountOfCredential("default", itCtx2);
jdbcInterpreter2.setUserProperty("default", itCtx2);
assertEquals(up.getUsername(),
jdbcInterpreter2.getPropertiesMap().get("default").getProperty("user"));
@ -336,7 +336,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterContext itCtx3 = new InterpreterContext("", "1", replName2, "", "", authInfo, null, null, null, null,
null, null);
jdbcInterpreter3.setAccountOfCredential("default", itCtx3);
jdbcInterpreter3.setUserProperty("default", itCtx3);
assertNull(jdbcInterpreter3.getPropertiesMap().get("default").getProperty("user"));
assertNull(jdbcInterpreter3.getPropertiesMap().get("default").getProperty("password"));