Address feedback

This commit is contained in:
karuppayya 2016-11-09 21:58:58 +05:30
parent 4d971961e2
commit 55c45c9c9e
11 changed files with 101 additions and 34 deletions

View file

@ -315,6 +315,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(context);
if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");

View file

@ -56,7 +56,6 @@ import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -112,6 +111,7 @@ public class SparkInterpreter extends Interpreter {
private SparkOutputStream out;
private SparkDependencyResolver dep;
private String sparkUrl;
/**
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
@ -803,10 +803,6 @@ public class SparkInterpreter extends Interpreter {
sparkSession = getSparkSession();
}
sc = getSparkContext();
RemoteInterpreterEventClient eventClient = getInterpreterGroup().getEventClient();
if (eventClient != null) {
eventClient.onMetaInfodReceived(getSparkUIUrl());
}
if (sc.getPoolForName("fair").isEmpty()) {
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
int minimumShare = 0;
@ -942,15 +938,11 @@ public class SparkInterpreter extends Interpreter {
numReferenceOfSparkContext.incrementAndGet();
}
private Map<String, String> getSparkUIUrl() {
private String getSparkUIUrl() {
Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
SparkUI sparkUi = sparkUiOption.get();
String sparkWebUrl = sparkUi.appUIAddress();
Map<String, String> infos = new java.util.HashMap<>();
if (sparkWebUrl != null) {
infos.put("url", sparkWebUrl);
}
return infos;
return sparkWebUrl;
}
private Results.Result interpret(String line) {
@ -961,6 +953,18 @@ public class SparkInterpreter extends Interpreter {
new Object[] {line});
}
public void populateSparkWebUrl(InterpreterContext ctx) {
if (sparkUrl == null) {
String url = getSparkUIUrl();
Map<String, String> infos = new java.util.HashMap<>();
if (url != null) {
infos.put("url", url);
logger.info("Sending metainfos to Zeppelin server: {}", infos.toString());
ctx.getClient().onMetaInfosReceived(infos);
}
}
}
private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
@ -1100,7 +1104,7 @@ public class SparkInterpreter extends Interpreter {
return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
+ " is not supported");
}
populateSparkWebUrl(context);
z.setInterpreterContext(context);
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);

View file

@ -97,6 +97,7 @@ public class SparkRInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
getSparkInterpreter().populateSparkWebUrl(interpreterContext);
String imageWidth = getProperty("zeppelin.R.image.width");
String[] sl = lines.split("\n");

View file

@ -96,6 +96,7 @@ public class SparkSqlInterpreter extends Interpreter {
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}
sparkInterpreter.populateSparkWebUrl(context);
sqlc = getSparkInterpreter().getSQLContext();
SparkContext sc = sqlc.sparkContext();
if (concurrentSQL()) {

View file

@ -20,10 +20,12 @@ package org.apache.zeppelin.interpreter;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.resource.ResourcePool;
/**
@ -57,6 +59,7 @@ public class InterpreterContext {
private ResourcePool resourcePool;
private List<InterpreterContextRunner> runners;
private String className;
private RemoteEventClientWrapper client;
public InterpreterContext(String noteId,
String paragraphId,
@ -83,6 +86,22 @@ public class InterpreterContext {
this.out = out;
}
public InterpreterContext(String noteId,
String paragraphId,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry,
ResourcePool resourcePool,
List<InterpreterContextRunner> contextRunners,
InterpreterOutput output,
RemoteInterpreterEventClient eventClient) {
this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui,
angularObjectRegistry, resourcePool, contextRunners, output);
this.client = new RemoteEventClient(eventClient);
}
public String getNoteId() {
return noteId;
@ -131,4 +150,8 @@ public class InterpreterContext {
public void setClassName(String className) {
this.className = className;
}
public RemoteEventClientWrapper getClient() {
return client;
}
}

View file

@ -22,7 +22,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.scheduler.Scheduler;
@ -51,8 +50,6 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
private RemoteInterpreterEventClient eventClient;
// map [notebook session, Interpreters in the group], to support per note session interpreters
//Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
// List<Interpreter>>();
@ -85,11 +82,6 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
allInterpreterGroups.put(id, this);
}
public InterpreterGroup(String interpreterGroupId, RemoteInterpreterEventClient eventClient) {
this(interpreterGroupId);
this.eventClient = eventClient;
}
private static String generateId() {
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
+ new Random().nextInt();
@ -288,8 +280,4 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public void setAngularRegistryPushed(boolean angularRegistryPushed) {
this.angularRegistryPushed = angularRegistryPushed;
}
public RemoteInterpreterEventClient getEventClient() {
return eventClient;
}
}

View file

@ -0,0 +1,24 @@
package org.apache.zeppelin.interpreter.remote;
import java.util.Map;
/**
*
* Wrapper arnd RemoteInterpreterEventClient
* to expose methods in the client
*
*/
public class RemoteEventClient implements RemoteEventClientWrapper {
private RemoteInterpreterEventClient client;
public RemoteEventClient(RemoteInterpreterEventClient client) {
this.client = client;
}
@Override
public void onMetaInfosReceived(Map<String, String> infos) {
client.onMetaInfodReceived(infos);
}
}

View file

@ -0,0 +1,15 @@
package org.apache.zeppelin.interpreter.remote;
import java.util.Map;
/**
*
* Wrapper interface for RemoterInterpreterEventClient
* to expose only required methods from EventClient
*
*/
public interface RemoteEventClientWrapper {
public void onMetaInfosReceived(Map<String, String> infos);
}

View file

@ -153,7 +153,7 @@ public class RemoteInterpreterServer
public void createInterpreter(String interpreterGroupId, String noteId, String
className, Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId, eventClient);
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
@ -552,7 +552,7 @@ public class RemoteInterpreterServer
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
interpreterGroup.getResourcePool(),
contextRunners, output);
contextRunners, output, eventClient);
}

View file

@ -22,6 +22,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@ -29,6 +31,7 @@ import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@ -217,20 +220,27 @@ public class InterpreterRestApi {
}
/**
* get the property value
* get the metainfo property value
*/
@GET
@Path("getmetainfos/{settingId}")
public Response getMetaInfo(@PathParam("settingId") String settingId,
@PathParam("name") String propName) {
String url = null;
public Response getMetaInfo(@Context HttpServletRequest req,
@PathParam("settingId") String settingId) {
String propName = req.getParameter("propName");
if (propName == null) {
return new JsonResponse<>(Status.BAD_REQUEST).build();
}
String propValue = null;
InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
Map<String, String> infos = interpreterSetting.getInfos();
if (infos != null) {
url = infos.get("url");
propValue = infos.get(propName);
}
Map<String, String> respMap = new HashMap<>();
respMap.put("url", url);
respMap.put(propName, propValue);
logger.debug("Get meta info");
logger.debug("Interpretersetting Id: {}, property Name:{}, property value: {}", settingId,
propName, propValue);
return new JsonResponse<>(Status.OK, respMap).build();
}

View file

@ -692,7 +692,7 @@
};
$scope.showSparkUI = function(settingId) {
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId)
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/getmetainfos/' + settingId + '?propName=url')
.success(function(data, status, headers, config) {
var url = data.body.url;
if (!url) {