add authentication to zeppelin

This commit is contained in:
Khalid Huseynov 2016-05-14 19:17:31 +09:00
parent 1ca3d65a4e
commit 28f6bf7a48
6 changed files with 265 additions and 15 deletions

View file

@ -28,6 +28,7 @@ import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,10 +59,10 @@ public class ZeppelinHubRepo implements NotebookRepo {
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
getZeppelinhubWebsocketUri(conf), token);
getZeppelinhubWebsocketUri(conf), token, conf);
websocketClient.start();
}
private String getZeppelinHubWsUri(URI api) throws IOException {
URI apiRoot = api;
String scheme = apiRoot.getScheme();

View file

@ -0,0 +1,228 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.security;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.security.Key;
import java.util.Collections;
import java.util.Map;
import javax.crypto.Cipher;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* Authentication module.
*
*/
public class Authentication implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Authentication.class);
private String principal = "anonymous";
private String ticket = "anonymous";
private String roles = StringUtils.EMPTY;
private final HttpClient client;
private String loginEndpoint;
// Cipher is an AES in CBC mode
private static final String CIPHER_ALGORITHM = "AES";
private static final String CIPHER_MODE = "AES/CBC/PKCS5PADDING";
private static final String KEY = "AbtEr99DxsWWbJkP";
private static final int ivSize = 16;
private static final String ZEPPELIN_CONF_ANONYMOUS_ALLOWED = "zeppelin.anonymous.allowed";
private static final String ZEPPELINHUB_USER_KEY = "zeppelinhub.user.key";
private String token;
private boolean authEnabled;
private boolean authenticated;
String userKey;
private Gson gson = new Gson();
private static Authentication instance = null;
public static Authentication initialize(String token, ZeppelinConfiguration conf) {
if (instance == null && conf != null) {
instance = new Authentication(token, conf);
}
return instance;
}
public static Authentication getInstance() {
return instance;
}
private Authentication(String token, ZeppelinConfiguration conf) {
MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
client = new HttpClient(connectionManager);
this.token = token;
authEnabled = !conf.getBoolean("ZEPPELIN_ALLOW_ANONYMOUS",
ZEPPELIN_CONF_ANONYMOUS_ALLOWED, true);
userKey = conf.getString("ZEPPELINHUB_USER_KEY",
ZEPPELINHUB_USER_KEY, "");
loginEndpoint = getLoginEndpoint(conf);
}
public String getPrincipal() {
return this.principal;
}
public String getTicket() {
return this.ticket;
}
public String getRoles() {
return this.roles;
}
public boolean isAuthenticated() {
return authenticated;
}
private String getLoginEndpoint(ZeppelinConfiguration conf) {
int port = conf.getInt("ZEPPELIN_PORT", "zeppelin.server.port" , 8080);
if (port <= 0) {
port = 8080;
}
String scheme = "http";
if (conf.useSsl()) {
scheme = "https";
}
String endpoint = scheme + "://localhost:" + port + "/api/login";
return endpoint;
}
public boolean authenticate() {
if (authEnabled) {
if (!StringUtils.isEmpty(userKey)) {
String authKey = getAuthKey(userKey);
Map<String, String> authCredentials = login(authKey, loginEndpoint);
if (isEmptyMap(authCredentials)) {
return false;
}
principal = authCredentials.containsKey("principal") ? authCredentials.get("principal")
: principal;
ticket = authCredentials.containsKey("ticket") ? authCredentials.get("ticket") : ticket;
roles = authCredentials.containsKey("roles") ? authCredentials.get("roles") : roles;
LOG.info("Authenticated into Zeppelin as {} and roles {}", principal, roles);
return true;
} else {
LOG.warn("ZEPPELINHUB_USER_KEY isn't provided. Please provide your credentials"
+ "for your instance in ZeppelinHub website and generate your key.");
}
}
return false;
}
// returns login:password
private String getAuthKey(String userKey) {
LOG.debug("Encrypted user key is {}", userKey);
if (StringUtils.isBlank(userKey)) {
LOG.warn("ZEPPELINHUB_USER_KEY is blank");
return StringUtils.EMPTY;
}
//use hashed token as a salt
String hashedToken = Integer.toString(token.hashCode());
return decrypt(userKey, hashedToken);
}
private String decrypt(String value, String initVector) {
LOG.debug("IV is {}, IV length is {}", initVector, initVector.length());
if (StringUtils.isBlank(value) || StringUtils.isBlank(initVector)) {
LOG.error("String to decode or salt is not provided");
return StringUtils.EMPTY;
}
try {
IvParameterSpec iv = generateIV(initVector);
Key key = generateKey();
Cipher cipher = Cipher.getInstance(CIPHER_MODE);
cipher.init(Cipher.DECRYPT_MODE, key, iv);
byte[] decryptedString = Base64.decodeBase64(toBytes(value));
decryptedString = cipher.doFinal(decryptedString);
return new String(decryptedString);
} catch (GeneralSecurityException e) {
LOG.error("Error when decrypting", e);
return StringUtils.EMPTY;
}
}
@SuppressWarnings("unchecked")
private Map<String, String> login(String authKey, String endpoint) {
String[] credentials = authKey.split(":");
if (credentials.length != 2) {
return Collections.emptyMap();
}
PostMethod post = new PostMethod(endpoint);
post.addRequestHeader("Origin", "http://localhost");
post.addParameter(new NameValuePair("userName", credentials[0]));
post.addParameter(new NameValuePair("password", credentials[1]));
try {
int code = client.executeMethod(post);
if (code == HttpStatus.SC_OK) {
String content = post.getResponseBodyAsString();
Map<String, Object> resp = gson.fromJson(content,
new TypeToken<Map<String, Object>>() {}.getType());
LOG.info("Received from Zeppelin LoginRestApi : " + content);
return (Map<String, String>) resp.get("body");
} else {
LOG.error("Failed Zeppelin login {}, status code {}", endpoint, code);
return Collections.emptyMap();
}
} catch (IOException e) {
LOG.error("Cannot login into Zeppelin", e);
return Collections.emptyMap();
}
}
private Key generateKey() {
return new SecretKeySpec(toBytes(KEY), CIPHER_ALGORITHM);
}
private byte[] toBytes(String value) {
byte[] bytes;
try {
bytes = value.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
LOG.warn("UTF-8 isn't supported ", e);
bytes = value.getBytes();
}
return bytes;
}
private IvParameterSpec generateIV(String ivString) {
byte[] ivFromBytes = toBytes(ivString);
byte[] iv16ToBytes = new byte[ivSize];
System.arraycopy(ivFromBytes, 0, iv16ToBytes, 0, Math.min(ivFromBytes.length, ivSize));
return new IvParameterSpec(iv16ToBytes);
}
private boolean isEmptyMap(Map<String, String> map) {
return map == null || map.isEmpty();
}
@Override
public void run() {
authenticated = authenticate();
LOG.info("Scheduled authentication status is {}", authenticated);
}
}

View file

@ -16,6 +16,7 @@
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.socket.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,9 +31,10 @@ public class Client {
private final ZeppelinClient zeppelinClient;
private static Client instance = null;
public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token) {
public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token,
ZeppelinConfiguration conf) {
if (instance == null) {
instance = new Client(zeppelinUri, zeppelinhubUri, token);
instance = new Client(zeppelinUri, zeppelinhubUri, token, conf);
}
return instance;
}
@ -41,10 +43,11 @@ public class Client {
return instance;
}
private Client(String zeppelinUri, String zeppelinhubUri, String token) {
private Client(String zeppelinUri, String zeppelinhubUri, String token,
ZeppelinConfiguration conf) {
LOG.debug("Init Client");
zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token);
zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token);
zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token, conf);
}
public void start() {

View file

@ -25,8 +25,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.socket.Message;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
@ -51,9 +54,12 @@ public class ZeppelinClient {
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
private static ZeppelinClient instance = null;
public static ZeppelinClient initialize(String zeppelinUrl, String token) {
private Authentication authModule;
public static ZeppelinClient initialize(String zeppelinUrl, String token,
ZeppelinConfiguration conf) {
if (instance == null) {
instance = new ZeppelinClient(zeppelinUrl, token);
instance = new ZeppelinClient(zeppelinUrl, token, conf);
}
return instance;
}
@ -62,12 +68,16 @@ public class ZeppelinClient {
return instance;
}
private ZeppelinClient(String zeppelinUrl, String token) {
private ZeppelinClient(String zeppelinUrl, String token, ZeppelinConfiguration conf) {
zeppelinWebsocketUrl = URI.create(zeppelinUrl);
zeppelinhubToken = token;
wsClient = createNewWebsocketClient();
gson = new Gson();
zeppelinConnectionMap = new ConcurrentHashMap<>();
authModule = Authentication.initialize(token, conf);
if (authModule != null) {
SchedulerService.getInstance().addOnce(authModule, 10);
}
LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
}
@ -104,11 +114,19 @@ public class ZeppelinClient {
}
public String serialize(Message zeppelinMsg) {
// TODO(khalid): handle authentication
if (credentialsAvailable()) {
zeppelinMsg.principal = authModule.getPrincipal();
zeppelinMsg.ticket = authModule.getTicket();
zeppelinMsg.roles = authModule.getRoles();
}
String msg = gson.toJson(zeppelinMsg);
return msg;
}
private boolean credentialsAvailable() {
return Authentication.getInstance() != null && Authentication.getInstance().isAuthenticated();
}
public Message deserialize(String zeppelinMessage) {
if (StringUtils.isBlank(zeppelinMessage)) {
return null;

View file

@ -53,7 +53,7 @@ public class ZeppelinClientTest {
return;
}
// Initialize and start Zeppelin client
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token");
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token", null);
client.start();
LOG.info("Zeppelin websocket client started");
@ -85,7 +85,7 @@ public class ZeppelinClientTest {
public void zeppelinClientSingletonTest() {
ZeppelinClient client1 = ZeppelinClient.getInstance();
if (client1 == null) {
client1 = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
client1 = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
}
assertNotNull(client1);
ZeppelinClient client2 = ZeppelinClient.getInstance();
@ -98,7 +98,7 @@ public class ZeppelinClientTest {
Message msg = new Message(OP.LIST_NOTES);
msg.data = Maps.newHashMap();
msg.data.put("key", "value");
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
String serializedMsg = client.serialize(msg);
Message deserializedMsg = client.deserialize(serializedMsg);
assertEquals(msg.op, deserializedMsg.op);
@ -111,7 +111,7 @@ public class ZeppelinClientTest {
@Test
public void sendToZeppelinTest() {
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
client.start();
Message msg = new Message(OP.LIST_NOTES);
msg.data = Maps.newHashMap();

View file

@ -59,7 +59,7 @@ public class ZeppelinhubClientTest {
@Test
public void runAllParagraphTest() throws Exception {
Client.initialize(validWebsocketUrl, validWebsocketUrl, "TOKEN");
Client.initialize(validWebsocketUrl, validWebsocketUrl, "TOKEN", null);
Client.getInstance().start();
ZeppelinhubClient zeppelinhubClient = ZeppelinhubClient.getInstance();
boolean runStatus = zeppelinhubClient.runAllParagraph("2AB7SY361", runNotebookMsg);