mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add authentication to zeppelin
This commit is contained in:
parent
1ca3d65a4e
commit
28f6bf7a48
6 changed files with 265 additions and 15 deletions
|
|
@ -28,6 +28,7 @@ import org.apache.zeppelin.notebook.Note;
|
|||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -58,10 +59,10 @@ public class ZeppelinHubRepo implements NotebookRepo {
|
|||
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
|
||||
|
||||
websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
|
||||
getZeppelinhubWebsocketUri(conf), token);
|
||||
getZeppelinhubWebsocketUri(conf), token, conf);
|
||||
websocketClient.start();
|
||||
}
|
||||
|
||||
|
||||
private String getZeppelinHubWsUri(URI api) throws IOException {
|
||||
URI apiRoot = api;
|
||||
String scheme = apiRoot.getScheme();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,228 @@
|
|||
package org.apache.zeppelin.notebook.repo.zeppelinhub.security;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.Key;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.crypto.Cipher;
|
||||
import javax.crypto.spec.IvParameterSpec;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
|
||||
import org.apache.commons.httpclient.NameValuePair;
|
||||
import org.apache.commons.httpclient.methods.GetMethod;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.apache.zeppelin.notebook.socket.Message.OP;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
/**
|
||||
* Authentication module.
|
||||
*
|
||||
*/
|
||||
public class Authentication implements Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Authentication.class);
|
||||
private String principal = "anonymous";
|
||||
private String ticket = "anonymous";
|
||||
private String roles = StringUtils.EMPTY;
|
||||
|
||||
private final HttpClient client;
|
||||
private String loginEndpoint;
|
||||
|
||||
// Cipher is an AES in CBC mode
|
||||
private static final String CIPHER_ALGORITHM = "AES";
|
||||
private static final String CIPHER_MODE = "AES/CBC/PKCS5PADDING";
|
||||
private static final String KEY = "AbtEr99DxsWWbJkP";
|
||||
private static final int ivSize = 16;
|
||||
|
||||
private static final String ZEPPELIN_CONF_ANONYMOUS_ALLOWED = "zeppelin.anonymous.allowed";
|
||||
private static final String ZEPPELINHUB_USER_KEY = "zeppelinhub.user.key";
|
||||
private String token;
|
||||
private boolean authEnabled;
|
||||
private boolean authenticated;
|
||||
String userKey;
|
||||
|
||||
private Gson gson = new Gson();
|
||||
private static Authentication instance = null;
|
||||
|
||||
public static Authentication initialize(String token, ZeppelinConfiguration conf) {
|
||||
if (instance == null && conf != null) {
|
||||
instance = new Authentication(token, conf);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static Authentication getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
private Authentication(String token, ZeppelinConfiguration conf) {
|
||||
MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
|
||||
client = new HttpClient(connectionManager);
|
||||
this.token = token;
|
||||
|
||||
authEnabled = !conf.getBoolean("ZEPPELIN_ALLOW_ANONYMOUS",
|
||||
ZEPPELIN_CONF_ANONYMOUS_ALLOWED, true);
|
||||
|
||||
userKey = conf.getString("ZEPPELINHUB_USER_KEY",
|
||||
ZEPPELINHUB_USER_KEY, "");
|
||||
|
||||
loginEndpoint = getLoginEndpoint(conf);
|
||||
}
|
||||
|
||||
public String getPrincipal() {
|
||||
return this.principal;
|
||||
}
|
||||
|
||||
public String getTicket() {
|
||||
return this.ticket;
|
||||
}
|
||||
|
||||
public String getRoles() {
|
||||
return this.roles;
|
||||
}
|
||||
|
||||
public boolean isAuthenticated() {
|
||||
return authenticated;
|
||||
}
|
||||
private String getLoginEndpoint(ZeppelinConfiguration conf) {
|
||||
int port = conf.getInt("ZEPPELIN_PORT", "zeppelin.server.port" , 8080);
|
||||
if (port <= 0) {
|
||||
port = 8080;
|
||||
}
|
||||
String scheme = "http";
|
||||
if (conf.useSsl()) {
|
||||
scheme = "https";
|
||||
}
|
||||
String endpoint = scheme + "://localhost:" + port + "/api/login";
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
public boolean authenticate() {
|
||||
if (authEnabled) {
|
||||
if (!StringUtils.isEmpty(userKey)) {
|
||||
String authKey = getAuthKey(userKey);
|
||||
Map<String, String> authCredentials = login(authKey, loginEndpoint);
|
||||
if (isEmptyMap(authCredentials)) {
|
||||
return false;
|
||||
}
|
||||
principal = authCredentials.containsKey("principal") ? authCredentials.get("principal")
|
||||
: principal;
|
||||
ticket = authCredentials.containsKey("ticket") ? authCredentials.get("ticket") : ticket;
|
||||
roles = authCredentials.containsKey("roles") ? authCredentials.get("roles") : roles;
|
||||
LOG.info("Authenticated into Zeppelin as {} and roles {}", principal, roles);
|
||||
return true;
|
||||
} else {
|
||||
LOG.warn("ZEPPELINHUB_USER_KEY isn't provided. Please provide your credentials"
|
||||
+ "for your instance in ZeppelinHub website and generate your key.");
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// returns login:password
|
||||
private String getAuthKey(String userKey) {
|
||||
LOG.debug("Encrypted user key is {}", userKey);
|
||||
if (StringUtils.isBlank(userKey)) {
|
||||
LOG.warn("ZEPPELINHUB_USER_KEY is blank");
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
//use hashed token as a salt
|
||||
String hashedToken = Integer.toString(token.hashCode());
|
||||
return decrypt(userKey, hashedToken);
|
||||
}
|
||||
|
||||
private String decrypt(String value, String initVector) {
|
||||
LOG.debug("IV is {}, IV length is {}", initVector, initVector.length());
|
||||
if (StringUtils.isBlank(value) || StringUtils.isBlank(initVector)) {
|
||||
LOG.error("String to decode or salt is not provided");
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
try {
|
||||
IvParameterSpec iv = generateIV(initVector);
|
||||
Key key = generateKey();
|
||||
|
||||
Cipher cipher = Cipher.getInstance(CIPHER_MODE);
|
||||
cipher.init(Cipher.DECRYPT_MODE, key, iv);
|
||||
|
||||
byte[] decryptedString = Base64.decodeBase64(toBytes(value));
|
||||
decryptedString = cipher.doFinal(decryptedString);
|
||||
return new String(decryptedString);
|
||||
} catch (GeneralSecurityException e) {
|
||||
LOG.error("Error when decrypting", e);
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, String> login(String authKey, String endpoint) {
|
||||
String[] credentials = authKey.split(":");
|
||||
if (credentials.length != 2) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
PostMethod post = new PostMethod(endpoint);
|
||||
post.addRequestHeader("Origin", "http://localhost");
|
||||
post.addParameter(new NameValuePair("userName", credentials[0]));
|
||||
post.addParameter(new NameValuePair("password", credentials[1]));
|
||||
try {
|
||||
int code = client.executeMethod(post);
|
||||
if (code == HttpStatus.SC_OK) {
|
||||
String content = post.getResponseBodyAsString();
|
||||
Map<String, Object> resp = gson.fromJson(content,
|
||||
new TypeToken<Map<String, Object>>() {}.getType());
|
||||
LOG.info("Received from Zeppelin LoginRestApi : " + content);
|
||||
return (Map<String, String>) resp.get("body");
|
||||
} else {
|
||||
LOG.error("Failed Zeppelin login {}, status code {}", endpoint, code);
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot login into Zeppelin", e);
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
}
|
||||
|
||||
private Key generateKey() {
|
||||
return new SecretKeySpec(toBytes(KEY), CIPHER_ALGORITHM);
|
||||
}
|
||||
|
||||
private byte[] toBytes(String value) {
|
||||
byte[] bytes;
|
||||
try {
|
||||
bytes = value.getBytes("UTF-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
LOG.warn("UTF-8 isn't supported ", e);
|
||||
bytes = value.getBytes();
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private IvParameterSpec generateIV(String ivString) {
|
||||
byte[] ivFromBytes = toBytes(ivString);
|
||||
byte[] iv16ToBytes = new byte[ivSize];
|
||||
System.arraycopy(ivFromBytes, 0, iv16ToBytes, 0, Math.min(ivFromBytes.length, ivSize));
|
||||
return new IvParameterSpec(iv16ToBytes);
|
||||
}
|
||||
|
||||
private boolean isEmptyMap(Map<String, String> map) {
|
||||
return map == null || map.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
authenticated = authenticate();
|
||||
LOG.info("Scheduled authentication status is {}", authenticated);
|
||||
}
|
||||
}
|
||||
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -30,9 +31,10 @@ public class Client {
|
|||
private final ZeppelinClient zeppelinClient;
|
||||
private static Client instance = null;
|
||||
|
||||
public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token) {
|
||||
public static Client initialize(String zeppelinUri, String zeppelinhubUri, String token,
|
||||
ZeppelinConfiguration conf) {
|
||||
if (instance == null) {
|
||||
instance = new Client(zeppelinUri, zeppelinhubUri, token);
|
||||
instance = new Client(zeppelinUri, zeppelinhubUri, token, conf);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
|
@ -41,10 +43,11 @@ public class Client {
|
|||
return instance;
|
||||
}
|
||||
|
||||
private Client(String zeppelinUri, String zeppelinhubUri, String token) {
|
||||
private Client(String zeppelinUri, String zeppelinhubUri, String token,
|
||||
ZeppelinConfiguration conf) {
|
||||
LOG.debug("Init Client");
|
||||
zeppelinhubClient = ZeppelinhubClient.initialize(zeppelinhubUri, token);
|
||||
zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token);
|
||||
zeppelinClient = ZeppelinClient.initialize(zeppelinUri, token, conf);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
|
|
|||
|
|
@ -25,8 +25,11 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
|
||||
import org.apache.zeppelin.notebook.socket.Message;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
|
@ -51,9 +54,12 @@ public class ZeppelinClient {
|
|||
private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
|
||||
private static ZeppelinClient instance = null;
|
||||
|
||||
public static ZeppelinClient initialize(String zeppelinUrl, String token) {
|
||||
private Authentication authModule;
|
||||
|
||||
public static ZeppelinClient initialize(String zeppelinUrl, String token,
|
||||
ZeppelinConfiguration conf) {
|
||||
if (instance == null) {
|
||||
instance = new ZeppelinClient(zeppelinUrl, token);
|
||||
instance = new ZeppelinClient(zeppelinUrl, token, conf);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
|
@ -62,12 +68,16 @@ public class ZeppelinClient {
|
|||
return instance;
|
||||
}
|
||||
|
||||
private ZeppelinClient(String zeppelinUrl, String token) {
|
||||
private ZeppelinClient(String zeppelinUrl, String token, ZeppelinConfiguration conf) {
|
||||
zeppelinWebsocketUrl = URI.create(zeppelinUrl);
|
||||
zeppelinhubToken = token;
|
||||
wsClient = createNewWebsocketClient();
|
||||
gson = new Gson();
|
||||
zeppelinConnectionMap = new ConcurrentHashMap<>();
|
||||
authModule = Authentication.initialize(token, conf);
|
||||
if (authModule != null) {
|
||||
SchedulerService.getInstance().addOnce(authModule, 10);
|
||||
}
|
||||
LOG.info("Initialized Zeppelin websocket client on {}", zeppelinWebsocketUrl);
|
||||
}
|
||||
|
||||
|
|
@ -104,11 +114,19 @@ public class ZeppelinClient {
|
|||
}
|
||||
|
||||
public String serialize(Message zeppelinMsg) {
|
||||
// TODO(khalid): handle authentication
|
||||
if (credentialsAvailable()) {
|
||||
zeppelinMsg.principal = authModule.getPrincipal();
|
||||
zeppelinMsg.ticket = authModule.getTicket();
|
||||
zeppelinMsg.roles = authModule.getRoles();
|
||||
}
|
||||
String msg = gson.toJson(zeppelinMsg);
|
||||
return msg;
|
||||
}
|
||||
|
||||
private boolean credentialsAvailable() {
|
||||
return Authentication.getInstance() != null && Authentication.getInstance().isAuthenticated();
|
||||
}
|
||||
|
||||
public Message deserialize(String zeppelinMessage) {
|
||||
if (StringUtils.isBlank(zeppelinMessage)) {
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ public class ZeppelinClientTest {
|
|||
return;
|
||||
}
|
||||
// Initialize and start Zeppelin client
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token");
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "dummy token", null);
|
||||
client.start();
|
||||
LOG.info("Zeppelin websocket client started");
|
||||
|
||||
|
|
@ -85,7 +85,7 @@ public class ZeppelinClientTest {
|
|||
public void zeppelinClientSingletonTest() {
|
||||
ZeppelinClient client1 = ZeppelinClient.getInstance();
|
||||
if (client1 == null) {
|
||||
client1 = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
|
||||
client1 = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
|
||||
}
|
||||
assertNotNull(client1);
|
||||
ZeppelinClient client2 = ZeppelinClient.getInstance();
|
||||
|
|
@ -98,7 +98,7 @@ public class ZeppelinClientTest {
|
|||
Message msg = new Message(OP.LIST_NOTES);
|
||||
msg.data = Maps.newHashMap();
|
||||
msg.data.put("key", "value");
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
|
||||
String serializedMsg = client.serialize(msg);
|
||||
Message deserializedMsg = client.deserialize(serializedMsg);
|
||||
assertEquals(msg.op, deserializedMsg.op);
|
||||
|
|
@ -111,7 +111,7 @@ public class ZeppelinClientTest {
|
|||
|
||||
@Test
|
||||
public void sendToZeppelinTest() {
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN");
|
||||
ZeppelinClient client = ZeppelinClient.initialize(validWebsocketUrl, "TOKEN", null);
|
||||
client.start();
|
||||
Message msg = new Message(OP.LIST_NOTES);
|
||||
msg.data = Maps.newHashMap();
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ public class ZeppelinhubClientTest {
|
|||
|
||||
@Test
|
||||
public void runAllParagraphTest() throws Exception {
|
||||
Client.initialize(validWebsocketUrl, validWebsocketUrl, "TOKEN");
|
||||
Client.initialize(validWebsocketUrl, validWebsocketUrl, "TOKEN", null);
|
||||
Client.getInstance().start();
|
||||
ZeppelinhubClient zeppelinhubClient = ZeppelinhubClient.getInstance();
|
||||
boolean runStatus = zeppelinhubClient.runAllParagraph("2AB7SY361", runNotebookMsg);
|
||||
|
|
|
|||
Loading…
Reference in a new issue