mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Address feedback
This commit is contained in:
parent
4d971961e2
commit
55c45c9c9e
11 changed files with 101 additions and 34 deletions
|
|
@ -315,6 +315,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
sparkInterpreter.populateSparkWebUrl(context);
|
||||
if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
|
||||
return new InterpreterResult(Code.ERROR, "Spark "
|
||||
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
|
||||
|
|
|
|||
|
|
@ -56,7 +56,6 @@ import org.apache.zeppelin.interpreter.InterpreterUtils;
|
|||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -112,6 +111,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
private SparkOutputStream out;
|
||||
private SparkDependencyResolver dep;
|
||||
private String sparkUrl;
|
||||
|
||||
/**
|
||||
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
|
||||
|
|
@ -803,10 +803,6 @@ public class SparkInterpreter extends Interpreter {
|
|||
sparkSession = getSparkSession();
|
||||
}
|
||||
sc = getSparkContext();
|
||||
RemoteInterpreterEventClient eventClient = getInterpreterGroup().getEventClient();
|
||||
if (eventClient != null) {
|
||||
eventClient.onMetaInfodReceived(getSparkUIUrl());
|
||||
}
|
||||
if (sc.getPoolForName("fair").isEmpty()) {
|
||||
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
|
||||
int minimumShare = 0;
|
||||
|
|
@ -942,15 +938,11 @@ public class SparkInterpreter extends Interpreter {
|
|||
numReferenceOfSparkContext.incrementAndGet();
|
||||
}
|
||||
|
||||
private Map<String, String> getSparkUIUrl() {
|
||||
private String getSparkUIUrl() {
|
||||
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
|
||||
SparkUI sparkUi = sparkUiOption.get();
|
||||
String sparkWebUrl = sparkUi.appUIAddress();
|
||||
Map<String, String> infos = new java.util.HashMap<>();
|
||||
if (sparkWebUrl != null) {
|
||||
infos.put("url", sparkWebUrl);
|
||||
}
|
||||
return infos;
|
||||
return sparkWebUrl;
|
||||
}
|
||||
|
||||
private Results.Result interpret(String line) {
|
||||
|
|
@ -961,6 +953,18 @@ public class SparkInterpreter extends Interpreter {
|
|||
new Object[] {line});
|
||||
}
|
||||
|
||||
public void populateSparkWebUrl(InterpreterContext ctx) {
|
||||
if (sparkUrl == null) {
|
||||
String url = getSparkUIUrl();
|
||||
Map<String, String> infos = new java.util.HashMap<>();
|
||||
if (url != null) {
|
||||
infos.put("url", url);
|
||||
logger.info("Sending metainfos to Zeppelin server: {}", infos.toString());
|
||||
ctx.getClient().onMetaInfosReceived(infos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<File> currentClassPath() {
|
||||
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
|
||||
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
|
||||
|
|
@ -1100,7 +1104,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
|
||||
+ " is not supported");
|
||||
}
|
||||
|
||||
populateSparkWebUrl(context);
|
||||
z.setInterpreterContext(context);
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
|
|
|
|||
|
|
@ -97,6 +97,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
|
||||
|
||||
getSparkInterpreter().populateSparkWebUrl(interpreterContext);
|
||||
String imageWidth = getProperty("zeppelin.R.image.width");
|
||||
|
||||
String[] sl = lines.split("\n");
|
||||
|
|
|
|||
|
|
@ -96,6 +96,7 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
|
||||
}
|
||||
|
||||
sparkInterpreter.populateSparkWebUrl(context);
|
||||
sqlc = getSparkInterpreter().getSQLContext();
|
||||
SparkContext sc = sqlc.sparkContext();
|
||||
if (concurrentSQL()) {
|
||||
|
|
|
|||
|
|
@ -20,10 +20,12 @@ package org.apache.zeppelin.interpreter;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.zeppelin.annotation.ZeppelinApi;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
||||
/**
|
||||
|
|
@ -57,6 +59,7 @@ public class InterpreterContext {
|
|||
private ResourcePool resourcePool;
|
||||
private List<InterpreterContextRunner> runners;
|
||||
private String className;
|
||||
private RemoteEventClientWrapper client;
|
||||
|
||||
public InterpreterContext(String noteId,
|
||||
String paragraphId,
|
||||
|
|
@ -83,6 +86,22 @@ public class InterpreterContext {
|
|||
this.out = out;
|
||||
}
|
||||
|
||||
public InterpreterContext(String noteId,
|
||||
String paragraphId,
|
||||
String paragraphTitle,
|
||||
String paragraphText,
|
||||
AuthenticationInfo authenticationInfo,
|
||||
Map<String, Object> config,
|
||||
GUI gui,
|
||||
AngularObjectRegistry angularObjectRegistry,
|
||||
ResourcePool resourcePool,
|
||||
List<InterpreterContextRunner> contextRunners,
|
||||
InterpreterOutput output,
|
||||
RemoteInterpreterEventClient eventClient) {
|
||||
this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui,
|
||||
angularObjectRegistry, resourcePool, contextRunners, output);
|
||||
this.client = new RemoteEventClient(eventClient);
|
||||
}
|
||||
|
||||
public String getNoteId() {
|
||||
return noteId;
|
||||
|
|
@ -131,4 +150,8 @@ public class InterpreterContext {
|
|||
public void setClassName(String className) {
|
||||
this.className = className;
|
||||
}
|
||||
|
||||
public RemoteEventClientWrapper getClient() {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -51,8 +50,6 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
ResourcePool resourcePool;
|
||||
boolean angularRegistryPushed = false;
|
||||
|
||||
private RemoteInterpreterEventClient eventClient;
|
||||
|
||||
// map [notebook session, Interpreters in the group], to support per note session interpreters
|
||||
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
|
||||
// List<Interpreter>>();
|
||||
|
|
@ -85,11 +82,6 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
allInterpreterGroups.put(id, this);
|
||||
}
|
||||
|
||||
public InterpreterGroup(String interpreterGroupId, RemoteInterpreterEventClient eventClient) {
|
||||
this(interpreterGroupId);
|
||||
this.eventClient = eventClient;
|
||||
}
|
||||
|
||||
private static String generateId() {
|
||||
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
|
||||
+ new Random().nextInt();
|
||||
|
|
@ -288,8 +280,4 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public void setAngularRegistryPushed(boolean angularRegistryPushed) {
|
||||
this.angularRegistryPushed = angularRegistryPushed;
|
||||
}
|
||||
|
||||
public RemoteInterpreterEventClient getEventClient() {
|
||||
return eventClient;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,24 @@
|
|||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
* Wrapper arnd RemoteInterpreterEventClient
|
||||
* to expose methods in the client
|
||||
*
|
||||
*/
|
||||
public class RemoteEventClient implements RemoteEventClientWrapper {
|
||||
|
||||
private RemoteInterpreterEventClient client;
|
||||
|
||||
public RemoteEventClient(RemoteInterpreterEventClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetaInfosReceived(Map<String, String> infos) {
|
||||
client.onMetaInfodReceived(infos);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
* Wrapper interface for RemoterInterpreterEventClient
|
||||
* to expose only required methods from EventClient
|
||||
*
|
||||
*/
|
||||
public interface RemoteEventClientWrapper {
|
||||
|
||||
public void onMetaInfosReceived(Map<String, String> infos);
|
||||
|
||||
}
|
||||
|
|
@ -153,7 +153,7 @@ public class RemoteInterpreterServer
|
|||
public void createInterpreter(String interpreterGroupId, String noteId, String
|
||||
className, Map<String, String> properties) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId, eventClient);
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
|
||||
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
|
||||
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
|
||||
|
|
@ -552,7 +552,7 @@ public class RemoteInterpreterServer
|
|||
gson.fromJson(ric.getGui(), GUI.class),
|
||||
interpreterGroup.getAngularObjectRegistry(),
|
||||
interpreterGroup.getResourcePool(),
|
||||
contextRunners, output);
|
||||
contextRunners, output, eventClient);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
|
|
@ -29,6 +31,7 @@ import javax.ws.rs.PUT;
|
|||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
|
|
@ -217,20 +220,27 @@ public class InterpreterRestApi {
|
|||
}
|
||||
|
||||
/**
|
||||
* get the property value
|
||||
* get the metainfo property value
|
||||
*/
|
||||
@GET
|
||||
@Path("getmetainfos/{settingId}")
|
||||
public Response getMetaInfo(@PathParam("settingId") String settingId,
|
||||
@PathParam("name") String propName) {
|
||||
String url = null;
|
||||
public Response getMetaInfo(@Context HttpServletRequest req,
|
||||
@PathParam("settingId") String settingId) {
|
||||
String propName = req.getParameter("propName");
|
||||
if (propName == null) {
|
||||
return new JsonResponse<>(Status.BAD_REQUEST).build();
|
||||
}
|
||||
String propValue = null;
|
||||
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
|
||||
Map<String, String> infos = interpreterSetting.getInfos();
|
||||
if (infos != null) {
|
||||
url = infos.get("url");
|
||||
propValue = infos.get(propName);
|
||||
}
|
||||
Map<String, String> respMap = new HashMap<>();
|
||||
respMap.put("url", url);
|
||||
respMap.put(propName, propValue);
|
||||
logger.debug("Get meta info");
|
||||
logger.debug("Interpretersetting Id: {}, property Name:{}, property value: {}", settingId,
|
||||
propName, propValue);
|
||||
return new JsonResponse<>(Status.OK, respMap).build();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -692,7 +692,7 @@
|
|||
};
|
||||
|
||||
$scope.showSparkUI = function(settingId) {
|
||||
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId)
|
||||
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId + '?propName=url')
|
||||
.success(function(data, status, headers, config) {
|
||||
var url = data.body.url;
|
||||
if (!url) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue