Change to make spark web UI accesible from interpreters page

This commit is contained in:
karuppayya 2016-11-08 23:20:09 +05:30
parent c9adf7161f
commit a1304a2288
15 changed files with 120 additions and 7 deletions

View file

@ -27,8 +27,6 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Joiner;
@ -45,6 +43,7 @@ import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Pool;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.SparkUI;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -57,6 +56,7 @@ import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -803,6 +803,10 @@ public class SparkInterpreter extends Interpreter {
sparkSession = getSparkSession();
}
sc = getSparkContext();
RemoteInterpreterEventClient eventClient = getInterpreterGroup().getEventClient();
if (eventClient != null) {
eventClient.onMetaInfodReceived(getSparkUIUrl());
}
if (sc.getPoolForName("fair").isEmpty()) {
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
int minimumShare = 0;
@ -938,6 +942,17 @@ public class SparkInterpreter extends Interpreter {
numReferenceOfSparkContext.incrementAndGet();
}
private Map<String, String> getSparkUIUrl() {
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
SparkUI sparkUi = sparkUiOption.get();
String sparkWebUrl = sparkUi.appUIAddress();
Map<String, String> infos = new java.util.HashMap<>();
if (sparkWebUrl != null) {
infos.put("url", sparkWebUrl);
}
return infos;
}
private Results.Result interpret(String line) {
return (Results.Result) Utils.invokeMethod(
intp,

View file

@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.scheduler.Scheduler;
@ -50,6 +51,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
private RemoteInterpreterEventClient eventClient;
// map [notebook session, Interpreters in the group], to support per note session interpreters
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
// List<Interpreter>>();
@ -82,6 +85,11 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
allInterpreterGroups.put(id, this);
}
public InterpreterGroup(String interpreterGroupId, RemoteInterpreterEventClient eventClient) {
this(interpreterGroupId);
this.eventClient = eventClient;
}
private static String generateId() {
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
+ new Random().nextInt();
@ -280,4 +288,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public void setAngularRegistryPushed(boolean angularRegistryPushed) {
this.angularRegistryPushed = angularRegistryPushed;
}
public RemoteInterpreterEventClient getEventClient() {
return eventClient;
}
}

View file

@ -279,6 +279,11 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
gson.toJson(appendOutput)));
}
public void onMetaInfodReceived(Map<String, String> infos) {
sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS,
gson.toJson(infos)));
}
/**
* Wait for eventQueue becomes empty
*/

View file

@ -195,6 +195,15 @@ public class RemoteInterpreterEventPoller extends Thread {
String status = appStatusUpdate.get("status");
appListener.onStatusChange(noteId, paragraphId, appId, status);
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
// on output update
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
}.getType());
String id = interpreterGroup.getId();
int indexOfColon = id.indexOf(":");
String settingId = id.substring(0, indexOfColon);
listener.onMetaInfosReceived(settingId, metaInfos);
}
logger.debug("Event from remoteproceess {}", event.getType());
} catch (Exception e) {

View file

@ -16,10 +16,13 @@
*/
package org.apache.zeppelin.interpreter.remote;
import java.util.Map;
/**
* Event from remoteInterpreterProcess
*/
public interface RemoteInterpreterProcessListener {
public void onOutputAppend(String noteId, String paragraphId, String output);
public void onOutputUpdated(String noteId, String paragraphId, String output);
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
}

View file

@ -153,7 +153,7 @@ public class RemoteInterpreterServer
public void createInterpreter(String interpreterGroupId, String noteId, String
className, Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
interpreterGroup = new InterpreterGroup(interpreterGroupId, eventClient);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);

View file

@ -24,9 +24,6 @@
package org.apache.zeppelin.interpreter.thrift;
import java.util.Map;
import java.util.HashMap;
import org.apache.thrift.TEnum;
public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
NO_OP(1),
@ -39,7 +36,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
OUTPUT_APPEND(8),
OUTPUT_UPDATE(9),
ANGULAR_REGISTRY_PUSH(10),
APP_STATUS_UPDATE(11);
APP_STATUS_UPDATE(11),
META_INFOS(12);
private final int value;
@ -82,6 +80,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return ANGULAR_REGISTRY_PUSH;
case 11:
return APP_STATUS_UPDATE;
case 12:
return META_INFOS;
default:
return null;
}

View file

@ -29,6 +29,7 @@ import org.junit.Test;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -154,4 +155,9 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
public void onOutputUpdated(String noteId, String paragraphId, String output) {
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
}
}

View file

@ -299,4 +299,9 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
public void onOutputUpdated(String noteId, String paragraphId, String output) {
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
}
}

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.rest;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -215,6 +216,24 @@ public class InterpreterRestApi {
return new JsonResponse(Status.CREATED).build();
}
/**
* get the property value
*/
@GET
@Path("getmetainfos/{settingId}")
public Response getMetaInfo(@PathParam("settingId") String settingId,
@PathParam("name") String propName) {
String url = null;
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
Map<String, String> infos = interpreterSetting.getInfos();
if (infos != null) {
url = infos.get("url");
}
Map<String, String> respMap = new HashMap<>();
respMap.put("url", url);
return new JsonResponse<>(Status.OK, respMap).build();
}
/**
* Delete repository
*

View file

@ -1759,5 +1759,12 @@ public class NotebookServer extends WebSocketServlet implements
.put("interpreterSettings", availableSettings)));
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
InterpreterSetting interpreterSetting = notebook().getInterpreterFactory()
.get(settingId);
interpreterSetting.setInfos(metaInfos);
}
}

View file

@ -691,6 +691,22 @@
getRepositories();
};
$scope.showSparkUI = function(settingId) {
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId)
.success(function(data, status, headers, config) {
var url = data.body.url;
if (!url) {
BootstrapDialog.alert({
message: 'No spark application running'
});
return;
}
window.open(url, '_blank');
}).error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
});
};
init();
}

View file

@ -128,6 +128,10 @@ limitations under the License.
</h3>
<span style="float:right" ng-show="!valueform.$visible" >
<button class="btn btn-default btn-xs"
ng-click="showSparkUI(setting.id)"
ng-show="setting.group == 'spark'">
<span class="fa fa-external-link"></span> spark ui</button>
<button class="btn btn-default btn-xs"
ng-click="valueform.$show();
copyOriginInterpreterSettingProperties(setting.id)">

View file

@ -982,6 +982,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// Check if dependency in specified path is changed
// If it did, overwrite old dependency jar with new one
if (intpSetting != null) {
//clean up metaInfos
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
stopJobAllInterpreter(intpSetting);

View file

@ -44,6 +44,8 @@ public class InterpreterSetting {
private String name;
// always be null in case of InterpreterSettingRef
private String group;
private transient Map<String, String> infos;
/**
* properties can be either Properties or Map<String, InterpreterProperty>
* properties should be:
@ -276,4 +278,12 @@ public class InterpreterSetting {
public void setErrorReason(String errorReason) {
this.errorReason = errorReason;
}
public void setInfos(Map<String, String> infos) {
this.infos = infos;
}
public Map<String, String> getInfos() {
return infos;
}
}