mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-1549
This commit is contained in:
commit
22aecb314d
7 changed files with 171 additions and 37 deletions
|
|
@ -21,7 +21,6 @@ import java.util.Properties;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
|
@ -38,7 +37,7 @@ import com.gemstone.gemfire.cache.query.Struct;
|
|||
import com.gemstone.gemfire.pdx.PdxInstance;
|
||||
|
||||
/**
|
||||
* Apache Geode OQL Interpreter (http://geode.incubator.apache.org)
|
||||
* Apache Geode OQL Interpreter (http://geode.apache.org)
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li>
|
||||
|
|
@ -87,30 +86,12 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
|
||||
private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
|
||||
|
||||
public static final String DEFAULT_PORT = "10334";
|
||||
public static final String DEFAULT_HOST = "localhost";
|
||||
public static final String DEFAULT_MAX_RESULT = "1000";
|
||||
|
||||
private static final char NEWLINE = '\n';
|
||||
private static final char TAB = '\t';
|
||||
private static final char WHITESPACE = ' ';
|
||||
|
||||
private static final String TABLE_MAGIC_TAG = "%table ";
|
||||
|
||||
public static final String LOCATOR_HOST = "geode.locator.host";
|
||||
public static final String LOCATOR_PORT = "geode.locator.port";
|
||||
public static final String MAX_RESULT = "geode.max.result";
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"oql",
|
||||
"geode",
|
||||
GeodeOqlInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
|
||||
.add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port")
|
||||
.add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build());
|
||||
}
|
||||
|
||||
private ClientCache clientCache = null;
|
||||
private QueryService queryService = null;
|
||||
private Exception exceptionOnConnect;
|
||||
|
|
@ -122,8 +103,8 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
|
||||
protected ClientCache getClientCache() {
|
||||
|
||||
String locatorHost = getProperty(LOCATOR_HOST);
|
||||
int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT));
|
||||
String locatorHost = getProperty("geode.locator.host");
|
||||
int locatorPort = Integer.valueOf(getProperty("geode.locator.port"));
|
||||
|
||||
ClientCache clientCache =
|
||||
new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create();
|
||||
|
|
@ -139,7 +120,7 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
close();
|
||||
|
||||
try {
|
||||
maxResult = Integer.valueOf(getProperty(MAX_RESULT));
|
||||
maxResult = Integer.valueOf(getProperty("geode.max.result"));
|
||||
|
||||
clientCache = getClientCache();
|
||||
queryService = clientCache.getQueryService();
|
||||
|
|
|
|||
30
geode/src/main/resources/interpreter-setting.json
Normal file
30
geode/src/main/resources/interpreter-setting.json
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
[
|
||||
{
|
||||
"group": "geode",
|
||||
"name": "oql",
|
||||
"className": "org.apache.zeppelin.geode.GeodeOqlInterpreter",
|
||||
"properties": {
|
||||
"geode.locator.host": {
|
||||
"envName": null,
|
||||
"propertyName": "geode.locator.host",
|
||||
"defaultValue": "localhost",
|
||||
"description": "The Geode Locator Host."
|
||||
},
|
||||
"geode.locator.port": {
|
||||
"envName": null,
|
||||
"propertyName": "geode.locator.port",
|
||||
"defaultValue": "10334",
|
||||
"description": "The Geode Locator Port."
|
||||
},
|
||||
"geode.max.result": {
|
||||
"envName": null,
|
||||
"propertyName": "geode.max.result",
|
||||
"defaultValue": "1000",
|
||||
"description": "Max number of OQL result to display."
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
"language": "sql"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
@ -58,9 +58,9 @@ public class GeodeOqlInterpreterTest {
|
|||
public void testOpenCommandIndempotency() {
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put(LOCATOR_HOST, DEFAULT_HOST);
|
||||
properties.put(LOCATOR_PORT, DEFAULT_PORT);
|
||||
properties.put(MAX_RESULT, DEFAULT_MAX_RESULT);
|
||||
properties.put("geode.locator.host", "localhost");
|
||||
properties.put("geode.locator.port", "10334");
|
||||
properties.put("geode.max.result", "1000");
|
||||
|
||||
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties));
|
||||
|
||||
|
|
|
|||
|
|
@ -18,13 +18,20 @@
|
|||
|
||||
package org.apache.zeppelin.user;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/***
|
||||
*
|
||||
*/
|
||||
public class AuthenticationInfo {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationInfo.class);
|
||||
String user;
|
||||
String ticket;
|
||||
UserCredentials userCredentials;
|
||||
public static final AuthenticationInfo ANONYMOUS = new AuthenticationInfo("anonymous",
|
||||
"anonymous");
|
||||
|
||||
public AuthenticationInfo() {}
|
||||
|
||||
|
|
@ -66,4 +73,17 @@ public class AuthenticationInfo {
|
|||
this.userCredentials = userCredentials;
|
||||
}
|
||||
|
||||
public static boolean isAnonymous(AuthenticationInfo subject) {
|
||||
if (subject == null) {
|
||||
LOG.warn("Subject is null, assuming anonymous. "
|
||||
+ "Not recommended to use subject as null except in tests");
|
||||
return true;
|
||||
}
|
||||
return subject.isAnonymous();
|
||||
}
|
||||
|
||||
public boolean isAnonymous() {
|
||||
return ANONYMOUS.equals(this) || "anonymous".equalsIgnoreCase(this.getUser())
|
||||
|| StringUtils.isEmpty(this.getUser());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
|
|
@ -36,6 +37,7 @@ import org.apache.shiro.authc.UsernamePasswordToken;
|
|||
import org.apache.shiro.authz.AuthorizationInfo;
|
||||
import org.apache.shiro.realm.AuthorizingRealm;
|
||||
import org.apache.shiro.subject.PrincipalCollection;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -135,6 +137,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
|
|||
}
|
||||
responseBody = put.getResponseBodyAsString();
|
||||
put.releaseConnection();
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot login user", e);
|
||||
throw new AuthenticationException(e.getMessage());
|
||||
|
|
@ -147,6 +150,13 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
|
|||
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
|
||||
throw new AuthenticationException("Cannot login to ZeppelinHub");
|
||||
}
|
||||
|
||||
/* TODO(khalid): add proper roles and add listener */
|
||||
HashSet<String> userAndRoles = new HashSet<String>();
|
||||
userAndRoles.add(account.login);
|
||||
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
|
||||
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
|
||||
|
||||
return account;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,8 +24,10 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
|
|
@ -90,14 +92,6 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
LOG.info("No storage could be initialized, using default {} storage", defaultStorage);
|
||||
initializeDefaultStorage(conf);
|
||||
}
|
||||
if (getRepoCount() > 1) {
|
||||
try {
|
||||
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
|
||||
sync(0, 1, subject);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to sync with secondary storage on start {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
|
|
@ -170,6 +164,10 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
/* TODO(khalid): handle case when removing from secondary storage fails */
|
||||
}
|
||||
|
||||
void remove(int repoIndex, String noteId, AuthenticationInfo subject) throws IOException {
|
||||
getRepo(repoIndex).remove(noteId, subject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copies new/updated notes from source to destination storage
|
||||
*
|
||||
|
|
@ -195,7 +193,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (String id : pushNoteIds) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
pushNotes(subject, pushNoteIds, srcRepo, dstRepo);
|
||||
pushNotes(subject, pushNoteIDs, srcRepo, dstRepo, false);
|
||||
} else {
|
||||
LOG.info("Nothing to push");
|
||||
}
|
||||
|
|
@ -205,7 +203,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (String id : pullNoteIds) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
pushNotes(subject, pullNoteIds, dstRepo, srcRepo);
|
||||
pushNotes(subject, pullNoteIDs, dstRepo, srcRepo, true);
|
||||
} else {
|
||||
LOG.info("Nothing to pull");
|
||||
}
|
||||
|
|
@ -228,16 +226,43 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
}
|
||||
|
||||
private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo,
|
||||
NotebookRepo remoteRepo) {
|
||||
NotebookRepo remoteRepo, boolean setPermissions) {
|
||||
for (String id : ids) {
|
||||
try {
|
||||
remoteRepo.save(localRepo.get(id, subject), subject);
|
||||
if (setPermissions && emptyNoteAcl(id)) {
|
||||
makePrivate(id, subject);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to push note to storage, moving onto next one", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean emptyNoteAcl(String noteId) {
|
||||
NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance();
|
||||
return notebookAuthorization.getOwners(noteId).isEmpty()
|
||||
&& notebookAuthorization.getReaders(noteId).isEmpty()
|
||||
&& notebookAuthorization.getWriters(noteId).isEmpty();
|
||||
}
|
||||
|
||||
private void makePrivate(String noteId, AuthenticationInfo subject) {
|
||||
if (AuthenticationInfo.isAnonymous(subject)) {
|
||||
LOG.info("User is anonymous, permissions are not set for pulled notes");
|
||||
return;
|
||||
}
|
||||
NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance();
|
||||
Set<String> users = notebookAuthorization.getOwners(noteId);
|
||||
users.add(subject.getUser());
|
||||
notebookAuthorization.setOwners(noteId, users);
|
||||
users = notebookAuthorization.getReaders(noteId);
|
||||
users.add(subject.getUser());
|
||||
notebookAuthorization.setReaders(noteId, users);
|
||||
users = notebookAuthorization.getWriters(noteId);
|
||||
users.add(subject.getUser());
|
||||
notebookAuthorization.setWriters(noteId, users);
|
||||
}
|
||||
|
||||
private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo)
|
||||
throws IOException {
|
||||
for (String id : ids) {
|
||||
|
|
|
|||
|
|
@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
|
|
@ -314,6 +316,72 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
notebookRepoSync.remove(note.getId(), anonymous);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncWithAcl() throws IOException {
|
||||
/* scenario 1 - note exists with acl on main storage */
|
||||
AuthenticationInfo user1 = new AuthenticationInfo("user1");
|
||||
Note note = notebookSync.createNote(user1);
|
||||
assertEquals(0, note.getParagraphs().size());
|
||||
|
||||
// saved on both storages
|
||||
assertEquals(1, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
|
||||
/* check that user1 is the only owner */
|
||||
NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
|
||||
Set<String> entity = new HashSet<String>();
|
||||
entity.add(user1.getUser());
|
||||
assertEquals(true, authInfo.isOwner(note.getId(), entity));
|
||||
assertEquals(1, authInfo.getOwners(note.getId()).size());
|
||||
assertEquals(0, authInfo.getReaders(note.getId()).size());
|
||||
assertEquals(0, authInfo.getWriters(note.getId()).size());
|
||||
|
||||
/* update note and save on secondary storage */
|
||||
Paragraph p1 = note.addParagraph();
|
||||
p1.setText("hello world");
|
||||
assertEquals(1, note.getParagraphs().size());
|
||||
notebookRepoSync.save(1, note, null);
|
||||
|
||||
/* check paragraph isn't saved into first storage */
|
||||
assertEquals(0, notebookRepoSync.get(0,
|
||||
notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size());
|
||||
/* check paragraph is saved into second storage */
|
||||
assertEquals(1, notebookRepoSync.get(1,
|
||||
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());
|
||||
|
||||
/* now sync by user1 */
|
||||
notebookRepoSync.sync(user1);
|
||||
|
||||
/* check that note updated and acl are same on main storage*/
|
||||
assertEquals(1, notebookRepoSync.get(0,
|
||||
notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size());
|
||||
assertEquals(true, authInfo.isOwner(note.getId(), entity));
|
||||
assertEquals(1, authInfo.getOwners(note.getId()).size());
|
||||
assertEquals(0, authInfo.getReaders(note.getId()).size());
|
||||
assertEquals(0, authInfo.getWriters(note.getId()).size());
|
||||
|
||||
/* scenario 2 - note doesn't exist on main storage */
|
||||
/* remove from main storage */
|
||||
notebookRepoSync.remove(0, note.getId(), user1);
|
||||
assertEquals(0, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
authInfo.removeNote(note.getId());
|
||||
assertEquals(0, authInfo.getOwners(note.getId()).size());
|
||||
assertEquals(0, authInfo.getReaders(note.getId()).size());
|
||||
assertEquals(0, authInfo.getWriters(note.getId()).size());
|
||||
|
||||
/* now sync - should bring note from secondary storage with added acl */
|
||||
notebookRepoSync.sync(user1);
|
||||
assertEquals(1, notebookRepoSync.list(0, null).size());
|
||||
assertEquals(1, notebookRepoSync.list(1, null).size());
|
||||
assertEquals(1, authInfo.getOwners(note.getId()).size());
|
||||
assertEquals(1, authInfo.getReaders(note.getId()).size());
|
||||
assertEquals(1, authInfo.getWriters(note.getId()).size());
|
||||
assertEquals(true, authInfo.isOwner(note.getId(), entity));
|
||||
assertEquals(true, authInfo.isReader(note.getId(), entity));
|
||||
assertEquals(true, authInfo.isWriter(note.getId(), entity));
|
||||
}
|
||||
|
||||
static void delete(File file){
|
||||
if(file.isFile()) file.delete();
|
||||
else if(file.isDirectory()){
|
||||
|
|
|
|||
Loading…
Reference in a new issue