Revert changes to HiveInterpreter, Update tests

This commit is contained in:
Prasad Wagle 2016-05-11 14:24:51 -07:00
parent f2628e3a56
commit 623d42a199
13 changed files with 238 additions and 96 deletions

View file

@ -23,9 +23,11 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -36,9 +38,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.UserCredentials;
import org.apache.zeppelin.user.UsernamePassword;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,23 +74,30 @@ public class HiveInterpreter extends Interpreter {
static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY;
private final HashMap<String, Properties> propertiesMap;
private final Map<String, Statement> paragraphIdStatementMap;
private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap;
private final Map<String, Connection> paragraphIdConnectionMap;
static {
Interpreter.register(
"hql",
"hive",
HiveInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(COMMON_MAX_LINE, MAX_LINE_DEFAULT, "Maximum line of results")
.add(DEFAULT_DRIVER, "org.apache.hive.jdbc.HiveDriver", "Hive JDBC driver")
.add(DEFAULT_URL, "jdbc:hive2://localhost:10000", "The URL for HiveServer2.")
.add(DEFAULT_USER, "hive", "The hive user")
.add(DEFAULT_PASSWORD, "", "The password for the hive user").build());
"hql",
"hive",
HiveInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(COMMON_MAX_LINE, MAX_LINE_DEFAULT, "Maximum line of results")
.add(DEFAULT_DRIVER, "org.apache.hive.jdbc.HiveDriver", "Hive JDBC driver")
.add(DEFAULT_URL, "jdbc:hive2://localhost:10000", "The URL for HiveServer2.")
.add(DEFAULT_USER, "hive", "The hive user")
.add(DEFAULT_PASSWORD, "", "The password for the hive user").build());
}
public HiveInterpreter(Properties property) {
super(property);
propertiesMap = new HashMap<>();
propertyKeyUnusedConnectionListMap = new HashMap<>();
paragraphIdStatementMap = new HashMap<>();
paragraphIdConnectionMap = new HashMap<>();
}
public HashMap<String, Properties> getPropertiesMap() {
@ -137,25 +143,101 @@ public class HiveInterpreter extends Interpreter {
logger.debug("propertiesMap: {}", propertiesMap);
}
public InterpreterResult executeSql(String propertyKey, String sql,
String user, String password,
InterpreterContext interpreterContext) {
Connection connection = null;
Statement statement = null;
@Override
public void close() {
try {
for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) {
for (Connection c : connectionList) {
c.close();
}
}
for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
paragraphIdStatementMap.clear();
for (Connection connection : paragraphIdConnectionMap.values()) {
connection.close();
}
paragraphIdConnectionMap.clear();
} catch (SQLException e) {
logger.error("Error while closing...", e);
}
}
public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException {
Connection connection = null;
if (propertyKey == null || propertiesMap.get(propertyKey) == null) {
return null;
}
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey);
if (0 != connectionList.size()) {
connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
if (null != connection && connection.isClosed()) {
connection.close();
connection = null;
}
}
}
if (null == connection) {
Properties properties = propertiesMap.get(propertyKey);
Class.forName(properties.getProperty(DRIVER_KEY));
String url = properties.getProperty(URL_KEY);
if (user != null) {
String user = properties.getProperty(USER_KEY);
String password = properties.getProperty(PASSWORD_KEY);
if (null != user && null != password) {
connection = DriverManager.getConnection(url, user, password);
} else {
connection = DriverManager.getConnection(url, properties);
}
if (connection == null) {
return null;
}
}
return connection;
}
public Statement getStatement(String propertyKey, String paragraphId)
throws SQLException, ClassNotFoundException {
Connection connection;
if (paragraphIdConnectionMap.containsKey(paragraphId)) {
// Never enter for now.
connection = paragraphIdConnectionMap.get(paragraphId);
} else {
connection = getConnection(propertyKey);
}
if (connection == null) {
return null;
}
Statement statement = connection.createStatement();
if (isStatementClosed(statement)) {
connection = getConnection(propertyKey);
statement = connection.createStatement();
}
paragraphIdConnectionMap.put(paragraphId, connection);
paragraphIdStatementMap.put(paragraphId, statement);
return statement;
}
private boolean isStatementClosed(Statement statement) {
try {
return statement.isClosed();
} catch (Throwable t) {
logger.debug("{} doesn't support isClosed method", statement);
return false;
}
}
public InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
String paragraphId = interpreterContext.getParagraphId();
try {
Statement statement = getStatement(propertyKey, paragraphId);
if (statement == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
@ -207,14 +289,13 @@ public class HiveInterpreter extends Interpreter {
msg.append(updateCount).append(NEWLINE);
}
} finally {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
try {
if (resultSet != null) {
resultSet.close();
}
statement.close();
}
if (connection != null) {
connection.close();
} finally {
moveConnectionToUnused(propertyKey, paragraphId);
}
}
@ -226,6 +307,21 @@ public class HiveInterpreter extends Interpreter {
}
}
private void moveConnectionToUnused(String propertyKey, String paragraphId) {
if (paragraphIdConnectionMap.containsKey(paragraphId)) {
Connection connection = paragraphIdConnectionMap.remove(paragraphId);
if (null != connection) {
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
propertyKeyUnusedConnectionListMap.get(propertyKey).add(connection);
} else {
ArrayList<Connection> connectionList = new ArrayList<>();
connectionList.add(connection);
propertyKeyUnusedConnectionListMap.put(propertyKey, connectionList);
}
}
}
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
String propertyKey = getPropertyKey(cmd);
@ -236,27 +332,9 @@ public class HiveInterpreter extends Interpreter {
cmd = cmd.trim();
logger.info("PropertyKey: {} User: {} SQL command: '{}'", propertyKey,
contextInterpreter.getAuthenticationInfo().getUser(), cmd);
logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
UsernamePassword usernamePassword = null;
String username = null;
String password = null;
AuthenticationInfo authenticationInfo = contextInterpreter.getAuthenticationInfo();
UserCredentials userCredentials = authenticationInfo.getUserCredentials();
logger.info(userCredentials.toString());
if (userCredentials != null) {
usernamePassword = userCredentials.getUsernamePassword("hive(" + propertyKey + ")");
}
if (usernamePassword != null) {
username = usernamePassword.getUsername();
password = usernamePassword.getPassword();
}
if (username == null) {
username = authenticationInfo.getUser();
}
return executeSql(propertyKey, cmd, username, password, contextInterpreter);
return executeSql(propertyKey, cmd, contextInterpreter);
}
private int getMaxResult() {
@ -280,13 +358,19 @@ public class HiveInterpreter extends Interpreter {
}
}
public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException {
return null;
@Override
public void cancel(InterpreterContext context) {
String paragraphId = context.getParagraphId();
try {
paragraphIdStatementMap.get(paragraphId).cancel();
} catch (SQLException e) {
logger.error("Error while cancelling...", e);
}
}
@Override
public void close() {
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
@ -294,19 +378,10 @@ public class HiveInterpreter extends Interpreter {
return 0;
}
@Override
public void cancel(InterpreterContext context) {};
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
HiveInterpreter.class.getName() + this.hashCode(), 50);
HiveInterpreter.class.getName() + this.hashCode(), 10);
}
@Override

View file

@ -471,6 +471,7 @@
<exclude>**/zeppelin-distribution/src/bin_license/**</exclude>
<exclude>conf/interpreter.json</exclude>
<exclude>conf/notebook-authorization.json</exclude>
<exclude>conf/credentials.json</exclude>
<exclude>conf/zeppelin-env.sh</exclude>
<exclude>spark-*-bin*/**</exclude>
<exclude>.spark-dist/**</exclude>

View file

@ -33,15 +33,16 @@ import java.util.Map;
public class Credentials {
private static final Logger LOG = LoggerFactory.getLogger(Credentials.class);
private static Credentials credentials = null;
private Map<String, UserCredentials> credentialsMap;
private Gson gson;
private Boolean credentialsPersist = true;
File credentialsFile;
private Credentials(Boolean credentialsPersist, String credentialsPath) {
public Credentials(Boolean credentialsPersist, String credentialsPath) {
this.credentialsPersist = credentialsPersist;
credentialsFile = new File(credentialsPath);
if (credentialsPath != null) {
credentialsFile = new File(credentialsPath);
}
credentialsMap = new HashMap<>();
if (credentialsPersist) {
GsonBuilder builder = new GsonBuilder();
@ -51,17 +52,6 @@ public class Credentials {
}
}
public static synchronized void initCredentials(Boolean credentialsPersist,
String credentialsPath) {
if (credentials == null) {
credentials = new Credentials(credentialsPersist, credentialsPath);
}
}
public static Credentials getCredentials() {
return credentials;
}
public UserCredentials getUserCredentials(String username) {
UserCredentials uc = credentialsMap.get(username);
if (uc == null) {
@ -108,7 +98,7 @@ public class Credentials {
private void saveToFile() throws IOException {
String jsonString;
synchronized (credentials) {
synchronized (credentialsMap) {
CredentialsInfoSaving info = new CredentialsInfoSaving();
info.credentialsMap = credentialsMap;
jsonString = gson.toJson(info);

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.user;
import static org.junit.Assert.*;
import org.junit.Test;
import java.io.IOException;
public class CredentialsTest {
@Test
public void testDefaultProperty() throws IOException {
Credentials credentials = new Credentials(false, null);
UserCredentials userCredentials = new UserCredentials();
UsernamePassword up1 = new UsernamePassword("user2", "password");
userCredentials.putUsernamePassword("hive(vertica)", up1);
credentials.putUserCredentials("user1", userCredentials);
UserCredentials uc2 = credentials.getUserCredentials("user1");
UsernamePassword up2 = uc2.getUsernamePassword("hive(vertica)");
assertEquals(up1.getUsername(), up2.getUsername());
assertEquals(up1.getPassword(), up2.getPassword());
}
}

View file

@ -50,7 +50,11 @@ public class CredentialRestApi {
private HttpServletRequest servReq;
public CredentialRestApi() {
this.credentials = Credentials.getCredentials();
}
public CredentialRestApi(Credentials credentials) {
this.credentials = credentials;
}
/**

View file

@ -68,6 +68,7 @@ public class ZeppelinServer extends Application {
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
private DependencyResolver depResolver;
public ZeppelinServer() throws Exception {
@ -81,9 +82,10 @@ public class ZeppelinServer extends Application {
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();
this.notebookAuthorization = new NotebookAuthorization(conf);
this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf,
notebookRepo, schedulerFactory, replFactory, notebookWsServer,
notebookIndex, notebookAuthorization);
notebookIndex, notebookAuthorization, credentials);
}
public static void main(String[] args) throws InterruptedException {
@ -91,9 +93,6 @@ public class ZeppelinServer extends Application {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);
Credentials.initCredentials(conf.credentialsPersist(),
conf.getCredentialsPath());
jettyWebServer = setupJettyServer(conf);
ContextHandlerCollection contexts = new ContextHandlerCollection();
@ -296,7 +295,7 @@ public class ZeppelinServer extends Application {
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);
singletons.add(interpreterApi);
CredentialRestApi credentialApi = new CredentialRestApi();
CredentialRestApi credentialApi = new CredentialRestApi(credentials);
singletons.add(credentialApi);
SecurityRestApi securityApi = new SecurityRestApi();

View file

@ -40,6 +40,7 @@ import org.apache.zeppelin.search.SearchService;
import com.google.gson.Gson;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +71,7 @@ public class Note implements Serializable, JobListener {
private transient NotebookRepo repo;
private transient SearchService index;
private transient ScheduledFuture delayedPersist;
private transient Credentials credentials;
/**
* note configurations.
@ -89,11 +91,12 @@ public class Note implements Serializable, JobListener {
public Note() {}
public Note(NotebookRepo repo, NoteInterpreterLoader replLoader,
JobListenerFactory jlFactory, SearchService noteIndex) {
JobListenerFactory jlFactory, SearchService noteIndex, Credentials credentials) {
this.repo = repo;
this.replLoader = replLoader;
this.jobListenerFactory = jlFactory;
this.index = noteIndex;
this.credentials = credentials;
generateId();
}
@ -145,6 +148,15 @@ public class Note implements Serializable, JobListener {
this.index = index;
}
public Credentials getCredentials() {
return credentials;
};
public void setCredentials(Credentials credentials) {
this.credentials = credentials;
}
@SuppressWarnings("rawtypes")
public Map<String, List<AngularObject>> getAngularObjects() {
return angularObjects;

View file

@ -43,6 +43,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.user.Credentials;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
@ -78,6 +79,7 @@ public class Notebook {
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
/**
* Main constructor \w manual Dependency Injection
@ -97,7 +99,8 @@ public class Notebook {
SchedulerFactory schedulerFactory,
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory,
SearchService notebookIndex,
NotebookAuthorization notebookAuthorization) throws IOException, SchedulerException {
NotebookAuthorization notebookAuthorization,
Credentials credentials) throws IOException, SchedulerException {
this.conf = conf;
this.notebookRepo = notebookRepo;
this.schedulerFactory = schedulerFactory;
@ -105,6 +108,7 @@ public class Notebook {
this.jobListenerFactory = jobListenerFactory;
this.notebookIndex = notebookIndex;
this.notebookAuthorization = notebookAuthorization;
this.credentials = credentials;
quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
quartzSched = quertzSchedFact.getScheduler();
quartzSched.start();
@ -146,7 +150,7 @@ public class Notebook {
*/
public Note createNote(List<String> interpreterIds) throws IOException {
NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
Note note = new Note(notebookRepo, intpLoader, jobListenerFactory, notebookIndex);
Note note = new Note(notebookRepo, intpLoader, jobListenerFactory, notebookIndex, credentials);
intpLoader.setNoteId(note.id());
synchronized (notes) {
notes.put(note.id(), note);
@ -335,6 +339,7 @@ public class Notebook {
//Manually inject ALL dependencies, as DI constructor was NOT used
note.setIndex(this.notebookIndex);
note.setCredentials(this.credentials);
NoteInterpreterLoader replLoader = new NoteInterpreterLoader(replFactory);
note.setReplLoader(replLoader);

View file

@ -337,9 +337,12 @@ public class Paragraph extends Job implements Serializable, Cloneable {
final Paragraph self = this;
Credentials credentials = Credentials.getCredentials();
UserCredentials userCredentials = credentials.getUserCredentials(authenticationInfo.getUser());
authenticationInfo.setUserCredentials(userCredentials);
Credentials credentials = note.getCredentials();
if (authenticationInfo != null) {
UserCredentials userCredentials = credentials.getUserCredentials(
authenticationInfo.getUser());
authenticationInfo.setUserCredentials(userCredentials);
}
InterpreterContext interpreterContext = new InterpreterContext(
note.id(),

View file

@ -46,6 +46,7 @@ import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.user.Credentials;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -65,9 +66,12 @@ public class NotebookTest implements JobListenerFactory{
private NotebookRepo notebookRepo;
private InterpreterFactory factory;
private DependencyResolver depResolver;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
@Before
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
new File(tmpDir, "conf").mkdirs();
@ -90,8 +94,12 @@ public class NotebookTest implements JobListenerFactory{
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search, notebookAuthorization);
notebookAuthorization = new NotebookAuthorization(conf);
credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search,
notebookAuthorization, credentials);
}
@After
@ -175,7 +183,7 @@ public class NotebookTest implements JobListenerFactory{
Notebook notebook2 = new Notebook(
conf, notebookRepo, schedulerFactory,
new InterpreterFactory(conf, null, null, null, depResolver), this, null, null);
new InterpreterFactory(conf, null, null, null, depResolver), this, null, null, null);
assertEquals(1, notebook2.getAllNotes().size());
}

View file

@ -42,6 +42,7 @@ import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.search.LuceneSearch;
import org.apache.zeppelin.user.Credentials;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -63,6 +64,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
private DependencyResolver depResolver;
private SearchService search;
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSyncTest.class);
@Before
@ -97,7 +99,9 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);
notebookAuthorization = new NotebookAuthorization(conf);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search, notebookAuthorization);
credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search,
notebookAuthorization, credentials);
}
@After
@ -222,7 +226,8 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
ZeppelinConfiguration vConf = ZeppelinConfiguration.create();
NotebookRepoSync vRepoSync = new NotebookRepoSync(vConf);
Notebook vNotebookSync = new Notebook(vConf, vRepoSync, schedulerFactory, factory, this, search, null);
Notebook vNotebookSync = new Notebook(vConf, vRepoSync, schedulerFactory, factory, this, search,
notebookAuthorization, credentials);
// one git versioned storage initialized
assertThat(vRepoSync.getRepoCount()).isEqualTo(1);

View file

@ -80,7 +80,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search, null);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, search, null, null);
}
@After

View file

@ -286,7 +286,7 @@ public class LuceneSearchTest {
}
private Note newNote(String name) {
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex);
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex, null);
note.setName(name);
return note;
}