mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Change to make spark web UI accesible from interpreters page
This commit is contained in:
parent
c9adf7161f
commit
a1304a2288
15 changed files with 120 additions and 7 deletions
|
|
@ -27,8 +27,6 @@ import java.lang.reflect.Method;
|
|||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
|
@ -45,6 +43,7 @@ import org.apache.spark.scheduler.ActiveJob;
|
|||
import org.apache.spark.scheduler.DAGScheduler;
|
||||
import org.apache.spark.scheduler.Pool;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.ui.SparkUI;
|
||||
import org.apache.spark.ui.jobs.JobProgressListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
|
|
@ -57,6 +56,7 @@ import org.apache.zeppelin.interpreter.InterpreterUtils;
|
|||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -803,6 +803,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
sparkSession = getSparkSession();
|
||||
}
|
||||
sc = getSparkContext();
|
||||
RemoteInterpreterEventClient eventClient = getInterpreterGroup().getEventClient();
|
||||
if (eventClient != null) {
|
||||
eventClient.onMetaInfodReceived(getSparkUIUrl());
|
||||
}
|
||||
if (sc.getPoolForName("fair").isEmpty()) {
|
||||
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
|
||||
int minimumShare = 0;
|
||||
|
|
@ -938,6 +942,17 @@ public class SparkInterpreter extends Interpreter {
|
|||
numReferenceOfSparkContext.incrementAndGet();
|
||||
}
|
||||
|
||||
private Map<String, String> getSparkUIUrl() {
|
||||
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
|
||||
SparkUI sparkUi = sparkUiOption.get();
|
||||
String sparkWebUrl = sparkUi.appUIAddress();
|
||||
Map<String, String> infos = new java.util.HashMap<>();
|
||||
if (sparkWebUrl != null) {
|
||||
infos.put("url", sparkWebUrl);
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
private Results.Result interpret(String line) {
|
||||
return (Results.Result) Utils.invokeMethod(
|
||||
intp,
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -50,6 +51,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
ResourcePool resourcePool;
|
||||
boolean angularRegistryPushed = false;
|
||||
|
||||
private RemoteInterpreterEventClient eventClient;
|
||||
|
||||
// map [notebook session, Interpreters in the group], to support per note session interpreters
|
||||
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
|
||||
// List<Interpreter>>();
|
||||
|
|
@ -82,6 +85,11 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
allInterpreterGroups.put(id, this);
|
||||
}
|
||||
|
||||
public InterpreterGroup(String interpreterGroupId, RemoteInterpreterEventClient eventClient) {
|
||||
this(interpreterGroupId);
|
||||
this.eventClient = eventClient;
|
||||
}
|
||||
|
||||
private static String generateId() {
|
||||
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
|
||||
+ new Random().nextInt();
|
||||
|
|
@ -280,4 +288,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public void setAngularRegistryPushed(boolean angularRegistryPushed) {
|
||||
this.angularRegistryPushed = angularRegistryPushed;
|
||||
}
|
||||
|
||||
public RemoteInterpreterEventClient getEventClient() {
|
||||
return eventClient;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -279,6 +279,11 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
gson.toJson(appendOutput)));
|
||||
}
|
||||
|
||||
public void onMetaInfodReceived(Map<String, String> infos) {
|
||||
sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS,
|
||||
gson.toJson(infos)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for eventQueue becomes empty
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -195,6 +195,15 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String status = appStatusUpdate.get("status");
|
||||
|
||||
appListener.onStatusChange(noteId, paragraphId, appId, status);
|
||||
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
|
||||
// on output update
|
||||
Map<String, String> metaInfos = gson.fromJson(event.getData(),
|
||||
new TypeToken<Map<String, String>>() {
|
||||
}.getType());
|
||||
String id = interpreterGroup.getId();
|
||||
int indexOfColon = id.indexOf(":");
|
||||
String settingId = id.substring(0, indexOfColon);
|
||||
listener.onMetaInfosReceived(settingId, metaInfos);
|
||||
}
|
||||
logger.debug("Event from remoteproceess {}", event.getType());
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -16,10 +16,13 @@
|
|||
*/
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Event from remoteInterpreterProcess
|
||||
*/
|
||||
public interface RemoteInterpreterProcessListener {
|
||||
public void onOutputAppend(String noteId, String paragraphId, String output);
|
||||
public void onOutputUpdated(String noteId, String paragraphId, String output);
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -153,7 +153,7 @@ public class RemoteInterpreterServer
|
|||
public void createInterpreter(String interpreterGroupId, String noteId, String
|
||||
className, Map<String, String> properties) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId, eventClient);
|
||||
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
|
||||
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
|
||||
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
|
||||
|
|
|
|||
|
|
@ -24,9 +24,6 @@
|
|||
package org.apache.zeppelin.interpreter.thrift;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.thrift.TEnum;
|
||||
|
||||
public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
||||
NO_OP(1),
|
||||
|
|
@ -39,7 +36,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
OUTPUT_APPEND(8),
|
||||
OUTPUT_UPDATE(9),
|
||||
ANGULAR_REGISTRY_PUSH(10),
|
||||
APP_STATUS_UPDATE(11);
|
||||
APP_STATUS_UPDATE(11),
|
||||
META_INFOS(12);
|
||||
|
||||
private final int value;
|
||||
|
||||
|
|
@ -82,6 +80,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
return ANGULAR_REGISTRY_PUSH;
|
||||
case 11:
|
||||
return APP_STATUS_UPDATE;
|
||||
case 12:
|
||||
return META_INFOS;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -154,4 +155,9 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
public void onOutputUpdated(String noteId, String paragraphId, String output) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -299,4 +299,9 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
public void onOutputUpdated(String noteId, String paragraphId, String output) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.rest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
|
@ -215,6 +216,24 @@ public class InterpreterRestApi {
|
|||
return new JsonResponse(Status.CREATED).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* get the property value
|
||||
*/
|
||||
@GET
|
||||
@Path("getmetainfos/{settingId}")
|
||||
public Response getMetaInfo(@PathParam("settingId") String settingId,
|
||||
@PathParam("name") String propName) {
|
||||
String url = null;
|
||||
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
|
||||
Map<String, String> infos = interpreterSetting.getInfos();
|
||||
if (infos != null) {
|
||||
url = infos.get("url");
|
||||
}
|
||||
Map<String, String> respMap = new HashMap<>();
|
||||
respMap.put("url", url);
|
||||
return new JsonResponse<>(Status.OK, respMap).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete repository
|
||||
*
|
||||
|
|
|
|||
|
|
@ -1759,5 +1759,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.put("interpreterSettings", availableSettings)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
|
||||
InterpreterSetting interpreterSetting = notebook().getInterpreterFactory()
|
||||
.get(settingId);
|
||||
interpreterSetting.setInfos(metaInfos);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -691,6 +691,22 @@
|
|||
getRepositories();
|
||||
};
|
||||
|
||||
$scope.showSparkUI = function(settingId) {
|
||||
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId)
|
||||
.success(function(data, status, headers, config) {
|
||||
var url = data.body.url;
|
||||
if (!url) {
|
||||
BootstrapDialog.alert({
|
||||
message: 'No spark application running'
|
||||
});
|
||||
return;
|
||||
}
|
||||
window.open(url, '_blank');
|
||||
}).error(function(data, status, headers, config) {
|
||||
console.log('Error %o %o', status, data.message);
|
||||
});
|
||||
};
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -128,6 +128,10 @@ limitations under the License.
|
|||
|
||||
</h3>
|
||||
<span style="float:right" ng-show="!valueform.$visible" >
|
||||
<button class="btn btn-default btn-xs"
|
||||
ng-click="showSparkUI(setting.id)"
|
||||
ng-show="setting.group == 'spark'">
|
||||
<span class="fa fa-external-link"></span> spark ui</button>
|
||||
<button class="btn btn-default btn-xs"
|
||||
ng-click="valueform.$show();
|
||||
copyOriginInterpreterSettingProperties(setting.id)">
|
||||
|
|
|
|||
|
|
@ -982,6 +982,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
// Check if dependency in specified path is changed
|
||||
// If it did, overwrite old dependency jar with new one
|
||||
if (intpSetting != null) {
|
||||
//clean up metaInfos
|
||||
intpSetting.setInfos(null);
|
||||
copyDependenciesFromLocalPath(intpSetting);
|
||||
|
||||
stopJobAllInterpreter(intpSetting);
|
||||
|
|
|
|||
|
|
@ -44,6 +44,8 @@ public class InterpreterSetting {
|
|||
private String name;
|
||||
// always be null in case of InterpreterSettingRef
|
||||
private String group;
|
||||
private transient Map<String, String> infos;
|
||||
|
||||
/**
|
||||
* properties can be either Properties or Map<String, InterpreterProperty>
|
||||
* properties should be:
|
||||
|
|
@ -276,4 +278,12 @@ public class InterpreterSetting {
|
|||
public void setErrorReason(String errorReason) {
|
||||
this.errorReason = errorReason;
|
||||
}
|
||||
|
||||
public void setInfos(Map<String, String> infos) {
|
||||
this.infos = infos;
|
||||
}
|
||||
|
||||
public Map<String, String> getInfos() {
|
||||
return infos;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue