Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-1549

This commit is contained in:
hyonzin 2016-10-24 20:06:43 +09:00
commit 22aecb314d
7 changed files with 171 additions and 37 deletions

View file

@ -21,7 +21,6 @@ import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -38,7 +37,7 @@ import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.pdx.PdxInstance;
/**
* Apache Geode OQL Interpreter (http://geode.incubator.apache.org)
* Apache Geode OQL Interpreter (http://geode.apache.org)
*
* <ul>
* <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li>
@ -87,30 +86,12 @@ public class GeodeOqlInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
public static final String DEFAULT_PORT = "10334";
public static final String DEFAULT_HOST = "localhost";
public static final String DEFAULT_MAX_RESULT = "1000";
private static final char NEWLINE = '\n';
private static final char TAB = '\t';
private static final char WHITESPACE = ' ';
private static final String TABLE_MAGIC_TAG = "%table ";
public static final String LOCATOR_HOST = "geode.locator.host";
public static final String LOCATOR_PORT = "geode.locator.port";
public static final String MAX_RESULT = "geode.max.result";
static {
Interpreter.register(
"oql",
"geode",
GeodeOqlInterpreter.class.getName(),
new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
.add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port")
.add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build());
}
private ClientCache clientCache = null;
private QueryService queryService = null;
private Exception exceptionOnConnect;
@ -122,8 +103,8 @@ public class GeodeOqlInterpreter extends Interpreter {
protected ClientCache getClientCache() {
String locatorHost = getProperty(LOCATOR_HOST);
int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT));
String locatorHost = getProperty("geode.locator.host");
int locatorPort = Integer.valueOf(getProperty("geode.locator.port"));
ClientCache clientCache =
new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create();
@ -139,7 +120,7 @@ public class GeodeOqlInterpreter extends Interpreter {
close();
try {
maxResult = Integer.valueOf(getProperty(MAX_RESULT));
maxResult = Integer.valueOf(getProperty("geode.max.result"));
clientCache = getClientCache();
queryService = clientCache.getQueryService();

View file

@ -0,0 +1,30 @@
[
{
"group": "geode",
"name": "oql",
"className": "org.apache.zeppelin.geode.GeodeOqlInterpreter",
"properties": {
"geode.locator.host": {
"envName": null,
"propertyName": "geode.locator.host",
"defaultValue": "localhost",
"description": "The Geode Locator Host."
},
"geode.locator.port": {
"envName": null,
"propertyName": "geode.locator.port",
"defaultValue": "10334",
"description": "The Geode Locator Port."
},
"geode.max.result": {
"envName": null,
"propertyName": "geode.max.result",
"defaultValue": "1000",
"description": "Max number of OQL result to display."
}
},
"editor": {
"language": "sql"
}
}
]

View file

@ -58,9 +58,9 @@ public class GeodeOqlInterpreterTest {
public void testOpenCommandIndempotency() {
Properties properties = new Properties();
properties.put(LOCATOR_HOST, DEFAULT_HOST);
properties.put(LOCATOR_PORT, DEFAULT_PORT);
properties.put(MAX_RESULT, DEFAULT_MAX_RESULT);
properties.put("geode.locator.host", "localhost");
properties.put("geode.locator.port", "10334");
properties.put("geode.max.result", "1000");
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties));

View file

@ -18,13 +18,20 @@
package org.apache.zeppelin.user;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/***
*
*/
public class AuthenticationInfo {
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationInfo.class);
String user;
String ticket;
UserCredentials userCredentials;
public static final AuthenticationInfo ANONYMOUS = new AuthenticationInfo("anonymous",
"anonymous");
public AuthenticationInfo() {}
@ -66,4 +73,17 @@ public class AuthenticationInfo {
this.userCredentials = userCredentials;
}
public static boolean isAnonymous(AuthenticationInfo subject) {
if (subject == null) {
LOG.warn("Subject is null, assuming anonymous. "
+ "Not recommended to use subject as null except in tests");
return true;
}
return subject.isAnonymous();
}
public boolean isAnonymous() {
return ANONYMOUS.equals(this) || "anonymous".equalsIgnoreCase(this.getUser())
|| StringUtils.isEmpty(this.getUser());
}
}

View file

@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.httpclient.HttpClient;
@ -36,6 +37,7 @@ import org.apache.shiro.authc.UsernamePasswordToken;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.zeppelin.server.ZeppelinServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -135,6 +137,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
}
responseBody = put.getResponseBodyAsString();
put.releaseConnection();
} catch (IOException e) {
LOG.error("Cannot login user", e);
throw new AuthenticationException(e.getMessage());
@ -147,6 +150,13 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
throw new AuthenticationException("Cannot login to ZeppelinHub");
}
/* TODO(khalid): add proper roles and add listener */
HashSet<String> userAndRoles = new HashSet<String>();
userAndRoles.add(account.login);
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
return account;
}

View file

@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@ -90,14 +92,6 @@ public class NotebookRepoSync implements NotebookRepo {
LOG.info("No storage could be initialized, using default {} storage", defaultStorage);
initializeDefaultStorage(conf);
}
if (getRepoCount() > 1) {
try {
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
sync(0, 1, subject);
} catch (IOException e) {
LOG.warn("Failed to sync with secondary storage on start {}", e);
}
}
}
@SuppressWarnings("static-access")
@ -170,6 +164,10 @@ public class NotebookRepoSync implements NotebookRepo {
/* TODO(khalid): handle case when removing from secondary storage fails */
}
void remove(int repoIndex, String noteId, AuthenticationInfo subject) throws IOException {
getRepo(repoIndex).remove(noteId, subject);
}
/**
* Copies new/updated notes from source to destination storage
*
@ -195,7 +193,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : pushNoteIds) {
LOG.info("ID : " + id);
}
pushNotes(subject, pushNoteIds, srcRepo, dstRepo);
pushNotes(subject, pushNoteIDs, srcRepo, dstRepo, false);
} else {
LOG.info("Nothing to push");
}
@ -205,7 +203,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : pullNoteIds) {
LOG.info("ID : " + id);
}
pushNotes(subject, pullNoteIds, dstRepo, srcRepo);
pushNotes(subject, pullNoteIDs, dstRepo, srcRepo, true);
} else {
LOG.info("Nothing to pull");
}
@ -228,16 +226,43 @@ public class NotebookRepoSync implements NotebookRepo {
}
private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo,
NotebookRepo remoteRepo) {
NotebookRepo remoteRepo, boolean setPermissions) {
for (String id : ids) {
try {
remoteRepo.save(localRepo.get(id, subject), subject);
if (setPermissions && emptyNoteAcl(id)) {
makePrivate(id, subject);
}
} catch (IOException e) {
LOG.error("Failed to push note to storage, moving onto next one", e);
}
}
}
private boolean emptyNoteAcl(String noteId) {
NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance();
return notebookAuthorization.getOwners(noteId).isEmpty()
&& notebookAuthorization.getReaders(noteId).isEmpty()
&& notebookAuthorization.getWriters(noteId).isEmpty();
}
private void makePrivate(String noteId, AuthenticationInfo subject) {
if (AuthenticationInfo.isAnonymous(subject)) {
LOG.info("User is anonymous, permissions are not set for pulled notes");
return;
}
NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance();
Set<String> users = notebookAuthorization.getOwners(noteId);
users.add(subject.getUser());
notebookAuthorization.setOwners(noteId, users);
users = notebookAuthorization.getReaders(noteId);
users.add(subject.getUser());
notebookAuthorization.setReaders(noteId, users);
users = notebookAuthorization.getWriters(noteId);
users.add(subject.getUser());
notebookAuthorization.setWriters(noteId, users);
}
private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo)
throws IOException {
for (String id : ids) {

View file

@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -314,6 +316,72 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
notebookRepoSync.remove(note.getId(), anonymous);
}
@Test
public void testSyncWithAcl() throws IOException {
/* scenario 1 - note exists with acl on main storage */
AuthenticationInfo user1 = new AuthenticationInfo("user1");
Note note = notebookSync.createNote(user1);
assertEquals(0, note.getParagraphs().size());
// saved on both storages
assertEquals(1, notebookRepoSync.list(0, null).size());
assertEquals(1, notebookRepoSync.list(1, null).size());
/* check that user1 is the only owner */
NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
Set<String> entity = new HashSet<String>();
entity.add(user1.getUser());
assertEquals(true, authInfo.isOwner(note.getId(), entity));
assertEquals(1, authInfo.getOwners(note.getId()).size());
assertEquals(0, authInfo.getReaders(note.getId()).size());
assertEquals(0, authInfo.getWriters(note.getId()).size());
/* update note and save on secondary storage */
Paragraph p1 = note.addParagraph();
p1.setText("hello world");
assertEquals(1, note.getParagraphs().size());
notebookRepoSync.save(1, note, null);
/* check paragraph isn't saved into first storage */
assertEquals(0, notebookRepoSync.get(0,
notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size());
/* check paragraph is saved into second storage */
assertEquals(1, notebookRepoSync.get(1,
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());
/* now sync by user1 */
notebookRepoSync.sync(user1);
/* check that note updated and acl are same on main storage*/
assertEquals(1, notebookRepoSync.get(0,
notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size());
assertEquals(true, authInfo.isOwner(note.getId(), entity));
assertEquals(1, authInfo.getOwners(note.getId()).size());
assertEquals(0, authInfo.getReaders(note.getId()).size());
assertEquals(0, authInfo.getWriters(note.getId()).size());
/* scenario 2 - note doesn't exist on main storage */
/* remove from main storage */
notebookRepoSync.remove(0, note.getId(), user1);
assertEquals(0, notebookRepoSync.list(0, null).size());
assertEquals(1, notebookRepoSync.list(1, null).size());
authInfo.removeNote(note.getId());
assertEquals(0, authInfo.getOwners(note.getId()).size());
assertEquals(0, authInfo.getReaders(note.getId()).size());
assertEquals(0, authInfo.getWriters(note.getId()).size());
/* now sync - should bring note from secondary storage with added acl */
notebookRepoSync.sync(user1);
assertEquals(1, notebookRepoSync.list(0, null).size());
assertEquals(1, notebookRepoSync.list(1, null).size());
assertEquals(1, authInfo.getOwners(note.getId()).size());
assertEquals(1, authInfo.getReaders(note.getId()).size());
assertEquals(1, authInfo.getWriters(note.getId()).size());
assertEquals(true, authInfo.isOwner(note.getId(), entity));
assertEquals(true, authInfo.isReader(note.getId(), entity));
assertEquals(true, authInfo.isWriter(note.getId(), entity));
}
static void delete(File file){
if(file.isFile()) file.delete();
else if(file.isDirectory()){