mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-804 Refactoring registration mechanism on Interpreters
- Added a new initialization mechanism to use interpreter-setting.json - Adjusted new mechanism to SparkInterpreter for verification
This commit is contained in:
parent
a87d45ec04
commit
ca7b96c4c4
6 changed files with 274 additions and 46 deletions
|
|
@ -80,6 +80,7 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
|
|||
public class SparkInterpreter extends Interpreter {
|
||||
public static Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);
|
||||
|
||||
/*
|
||||
static {
|
||||
Interpreter.register(
|
||||
"spark",
|
||||
|
|
@ -111,6 +112,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
.build()
|
||||
);
|
||||
}
|
||||
*/
|
||||
|
||||
private ZeppelinContext z;
|
||||
private SparkILoop interpreter;
|
||||
|
|
|
|||
57
spark/src/main/resources/interpreter-setting.json
Normal file
57
spark/src/main/resources/interpreter-setting.json
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
[
|
||||
{
|
||||
"interpreterGroup": "spark",
|
||||
"interpreterName": "spark",
|
||||
"interpreterClassName": "org.apache.zeppelin.spark.SparkInterpreter",
|
||||
"properties": {
|
||||
"spark.executor.memory": {
|
||||
"envName": null,
|
||||
"propertyName": "spark.executor.memory",
|
||||
"defaultValue": "",
|
||||
"description": "Executor memory per worker instance. ex) 512m, 32g"
|
||||
},
|
||||
"args": {
|
||||
"envName": null,
|
||||
"propertyName": null,
|
||||
"defaultValue": "",
|
||||
"description": "spark commandline args"
|
||||
},
|
||||
"zeppelin.spark.useHiveContext": {
|
||||
"envName": "ZEPPELIN_SPARK_USEHIVECONTEXT",
|
||||
"propertyName": "zeppelin.spark.useHiveContext",
|
||||
"defaultValue": "true",
|
||||
"description": "Use HiveContext instead of SQLContext if it is true."
|
||||
},
|
||||
"spark.app.name": {
|
||||
"envName": "SPARK_APP_NAME",
|
||||
"propertyName": "spark.app.name",
|
||||
"defaultValue": "Zeppelin",
|
||||
"description": "The name of spark application."
|
||||
},
|
||||
"zeppelin.spark.printREPLOutput": {
|
||||
"envName": null,
|
||||
"propertyName": null,
|
||||
"defaultValue": "true",
|
||||
"description": "Print REPL output"
|
||||
},
|
||||
"spark.cores.max": {
|
||||
"envName": null,
|
||||
"propertyName": "spark.cores.max",
|
||||
"defaultValue": "",
|
||||
"description": "Total number of cores to use. Empty value uses all available core."
|
||||
},
|
||||
"zeppelin.spark.maxResult": {
|
||||
"envName": "ZEPPELIN_SPARK_MAXRESULT",
|
||||
"propertyName": "zeppelin.spark.maxResult",
|
||||
"defaultValue": "1000",
|
||||
"description": "Max number of SparkSQL result to display."
|
||||
},
|
||||
"master": {
|
||||
"envName": "MASTER",
|
||||
"propertyName": "spark.master",
|
||||
"defaultValue": "local[*]",
|
||||
"description": "Spark master uri. ex) spark://masterhost:7077"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -129,6 +130,7 @@ public abstract class Interpreter {
|
|||
protected Properties property;
|
||||
|
||||
public Interpreter(Properties property) {
|
||||
logger.debug("Properties: {}", property);
|
||||
this.property = property;
|
||||
}
|
||||
|
||||
|
|
@ -140,13 +142,16 @@ public abstract class Interpreter {
|
|||
Properties p = new Properties();
|
||||
p.putAll(property);
|
||||
|
||||
Map<String, InterpreterProperty> defaultProperties = Interpreter
|
||||
.findRegisteredInterpreterByClassName(getClassName()).getProperties();
|
||||
for (String k : defaultProperties.keySet()) {
|
||||
if (!p.containsKey(k)) {
|
||||
String value = defaultProperties.get(k).getDefaultValue();
|
||||
if (value != null) {
|
||||
p.put(k, defaultProperties.get(k).getDefaultValue());
|
||||
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
|
||||
getClassName());
|
||||
if (null != registeredInterpreter) {
|
||||
Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties();
|
||||
for (String k : defaultProperties.keySet()) {
|
||||
if (!p.containsKey(k)) {
|
||||
String value = defaultProperties.get(k).getValue();
|
||||
if (value != null) {
|
||||
p.put(k, defaultProperties.get(k).getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -155,17 +160,7 @@ public abstract class Interpreter {
|
|||
}
|
||||
|
||||
public String getProperty(String key) {
|
||||
if (property.containsKey(key)) {
|
||||
return property.getProperty(key);
|
||||
}
|
||||
|
||||
Map<String, InterpreterProperty> defaultProperties = Interpreter
|
||||
.findRegisteredInterpreterByClassName(getClassName()).getProperties();
|
||||
if (defaultProperties.containsKey(key)) {
|
||||
return defaultProperties.get(key).getDefaultValue();
|
||||
}
|
||||
|
||||
return null;
|
||||
return getProperty().getProperty(key);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -228,8 +223,11 @@ public abstract class Interpreter {
|
|||
* Represent registered interpreter class
|
||||
*/
|
||||
public static class RegisteredInterpreter {
|
||||
private String name;
|
||||
@SerializedName("interpreterGroup")
|
||||
private String group;
|
||||
@SerializedName("interpreterName")
|
||||
private String name;
|
||||
@SerializedName("interpreterClassName")
|
||||
private String className;
|
||||
private Map<String, InterpreterProperty> properties;
|
||||
private String path;
|
||||
|
|
@ -267,6 +265,10 @@ public abstract class Interpreter {
|
|||
return path;
|
||||
}
|
||||
|
||||
public String getInterpreterKey() {
|
||||
return getGroup() + "." + getName();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -287,10 +289,16 @@ public abstract class Interpreter {
|
|||
register(name, group, className, new HashMap<String, InterpreterProperty>());
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static void register(String name, String group, String className,
|
||||
Map<String, InterpreterProperty> properties) {
|
||||
registeredInterpreters.put(group + "." + name, new RegisteredInterpreter(
|
||||
name, group, className, properties));
|
||||
Map<String, InterpreterProperty> properties) {
|
||||
logger.error("Static initialization is deprecated. You should change it to use " +
|
||||
"interpreter-setting.json in your jar or interpreter/{interpreter}/interpreter-setting.json");
|
||||
register(new RegisteredInterpreter(name, group, className, properties));
|
||||
}
|
||||
|
||||
public static void register(RegisteredInterpreter registeredInterpreter) {
|
||||
registeredInterpreters.put(registeredInterpreter.getInterpreterKey(), registeredInterpreter);
|
||||
}
|
||||
|
||||
public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) {
|
||||
|
|
|
|||
|
|
@ -21,16 +21,39 @@ package org.apache.zeppelin.interpreter;
|
|||
* Represent property of interpreter
|
||||
*/
|
||||
public class InterpreterProperty {
|
||||
String envName;
|
||||
String propertyName;
|
||||
String defaultValue;
|
||||
String description;
|
||||
|
||||
public InterpreterProperty(String defaultValue,
|
||||
String description) {
|
||||
super();
|
||||
public InterpreterProperty(String envName, String propertyName, String defaultValue,
|
||||
String description) {
|
||||
this.envName = envName;
|
||||
this.propertyName = propertyName;
|
||||
this.defaultValue = defaultValue;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public InterpreterProperty(String defaultValue, String description) {
|
||||
this(null, null, defaultValue, description);
|
||||
}
|
||||
|
||||
public String getEnvName() {
|
||||
return envName;
|
||||
}
|
||||
|
||||
public void setEnvName(String envName) {
|
||||
this.envName = envName;
|
||||
}
|
||||
|
||||
public String getPropertyName() {
|
||||
return propertyName;
|
||||
}
|
||||
|
||||
public void setPropertyName(String propertyName) {
|
||||
this.propertyName = propertyName;
|
||||
}
|
||||
|
||||
public String getDefaultValue() {
|
||||
return defaultValue;
|
||||
}
|
||||
|
|
@ -46,4 +69,28 @@ public class InterpreterProperty {
|
|||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
//TODO(jongyoul): Remove SparkInterpreter's getSystemDefault method
|
||||
if (envName != null && !envName.isEmpty()) {
|
||||
String envValue = System.getenv().get(envName);
|
||||
if (envValue != null) {
|
||||
return envValue;
|
||||
}
|
||||
}
|
||||
|
||||
if (propertyName != null && !propertyName.isEmpty()) {
|
||||
String propValue = System.getProperty(propertyName);
|
||||
if (propValue != null) {
|
||||
return propValue;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("{envName=%s, propertyName=%s, defaultValue=%s, description=%20s", envName,
|
||||
propertyName, defaultValue, description);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
private static final long serialVersionUID = 4749305895693848035L;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class);
|
||||
private static ZeppelinConfiguration conf;
|
||||
private String interpreterJson;
|
||||
|
||||
public ZeppelinConfiguration(URL url) throws ConfigurationException {
|
||||
setDelimiterParsingDisabled(true);
|
||||
|
|
@ -342,6 +343,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
|
||||
}
|
||||
|
||||
public String getInterpreterJson() {
|
||||
return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON);
|
||||
}
|
||||
|
||||
public String getInterpreterSettingPath() {
|
||||
return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
|
||||
}
|
||||
|
|
@ -480,6 +485,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
|
||||
+ "org.apache.zeppelin.jdbc.JDBCInterpreter,"
|
||||
+ "org.apache.zeppelin.hbase.HbaseInterpreter"),
|
||||
ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
|
||||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
|
||||
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.interpreter;
|
|||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.lang.NullArgumentException;
|
||||
|
|
@ -45,9 +46,13 @@ import org.sonatype.aether.repository.RemoteRepository;
|
|||
import java.io.*;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
|
@ -112,39 +117,95 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
|
||||
private void init() throws InterpreterException, IOException, RepositoryException {
|
||||
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
|
||||
String interpreterJson = conf.getInterpreterJson();
|
||||
|
||||
// Load classes
|
||||
Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {}.getType();
|
||||
ArrayList<RegisteredInterpreter> registeredInterpreters;
|
||||
|
||||
Boolean registered;
|
||||
File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles();
|
||||
if (interpreterDirs != null) {
|
||||
for (File path : interpreterDirs) {
|
||||
logger.info("Reading " + path.getAbsolutePath());
|
||||
URL[] urls = null;
|
||||
registered = false;
|
||||
String absolutePath = path.getAbsolutePath();
|
||||
logger.info("Reading " + absolutePath);
|
||||
|
||||
Path jsonPath = Paths.get(absolutePath, interpreterJson);
|
||||
if (Files.exists(jsonPath)) {
|
||||
logger.info("Reading {}", jsonPath.toString());
|
||||
FileReader fileReader = new FileReader(jsonPath.toString());
|
||||
registeredInterpreters = gson.fromJson(fileReader, registeredInterpreterListType);
|
||||
|
||||
registered = registerInterpreters(registeredInterpreters, absolutePath);
|
||||
if (registered) {
|
||||
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
|
||||
logger.info("Interpreter {} found. class={}",
|
||||
registeredInterpreter.getInterpreterKey(),
|
||||
registeredInterpreter.getClassName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
URLClassLoader ccl;
|
||||
try {
|
||||
urls = recursiveBuildLibList(path);
|
||||
URL[] urls = recursiveBuildLibList(path);
|
||||
ccl = new URLClassLoader(urls, oldcl);
|
||||
if (!registered) {
|
||||
|
||||
InputStream inputStream = ccl.getResourceAsStream(interpreterJson);
|
||||
|
||||
if (null != inputStream) {
|
||||
logger.info("Reading {} from resources in {}", interpreterJson, absolutePath);
|
||||
registeredInterpreters = gson.fromJson(new InputStreamReader(inputStream),
|
||||
registeredInterpreterListType);
|
||||
registered = registerInterpreters(registeredInterpreters, absolutePath);
|
||||
if (registered) {
|
||||
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
|
||||
logger.info("Interpreter {} found. class={}",
|
||||
registeredInterpreter.getInterpreterKey(),
|
||||
registeredInterpreter.getClassName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!registered) {
|
||||
for (String className : interpreterClassList) {
|
||||
try {
|
||||
// Load classes
|
||||
Class.forName(className, true, ccl);
|
||||
Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
|
||||
for (String interpreterKey : interpreterKeys) {
|
||||
if (className.equals(
|
||||
Interpreter.registeredInterpreters.get(interpreterKey).getClassName())) {
|
||||
Interpreter.registeredInterpreters.get(interpreterKey).setPath(absolutePath);
|
||||
logger.info("Interpreter " + interpreterKey + " found. class=" + className);
|
||||
cleanCl.put(absolutePath, ccl);
|
||||
registered = true;
|
||||
}
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
// nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (MalformedURLException e1) {
|
||||
logger.error("Can't load jars ", e1);
|
||||
}
|
||||
URLClassLoader ccl = new URLClassLoader(urls, oldcl);
|
||||
|
||||
for (String className : interpreterClassList) {
|
||||
try {
|
||||
Class.forName(className, true, ccl);
|
||||
Set<String> keys = Interpreter.registeredInterpreters.keySet();
|
||||
for (String intName : keys) {
|
||||
if (className.equals(
|
||||
Interpreter.registeredInterpreters.get(intName).getClassName())) {
|
||||
Interpreter.registeredInterpreters.get(intName).setPath(path.getAbsolutePath());
|
||||
logger.info("Interpreter " + intName + " found. class=" + className);
|
||||
cleanCl.put(path.getAbsolutePath(), ccl);
|
||||
}
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
// nothing to do
|
||||
}
|
||||
if (!registered) {
|
||||
logger.info("Failed to register Interpreter from {}", absolutePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (RegisteredInterpreter registeredInterpreter:
|
||||
Interpreter.registeredInterpreters.values()) {
|
||||
logger.debug("Registered: {} -> {}. Properties: {}",
|
||||
registeredInterpreter.getInterpreterKey(), registeredInterpreter.getClassName(),
|
||||
registeredInterpreter.getProperties());
|
||||
}
|
||||
|
||||
loadFromFile();
|
||||
|
||||
// if no interpreter settings are loaded, create default set
|
||||
|
|
@ -175,7 +236,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
|
||||
for (String k : info.getProperties().keySet()) {
|
||||
p.put(k, info.getProperties().get(k).getDefaultValue());
|
||||
p.put(k, info.getProperties().get(k).getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -201,6 +262,32 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
|
||||
String absolutePath) {
|
||||
if (validateRegisterInterpreters(registeredInterpreters)) {
|
||||
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
|
||||
registeredInterpreter.setPath(absolutePath);
|
||||
Interpreter.register(registeredInterpreter);
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean validateRegisterInterpreters(List<RegisteredInterpreter> registeredInterpreters) {
|
||||
Boolean isValid = true;
|
||||
|
||||
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
|
||||
isValid &= null != registeredInterpreter.getGroup() &&
|
||||
null != registeredInterpreter.getName() &&
|
||||
null != registeredInterpreter.getClassName();
|
||||
}
|
||||
|
||||
return isValid;
|
||||
}
|
||||
|
||||
private void loadFromFile() throws IOException {
|
||||
GsonBuilder builder = new GsonBuilder();
|
||||
builder.setPrettyPrinting();
|
||||
|
|
@ -745,6 +832,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
throws InterpreterException {
|
||||
logger.info("Create repl {} from {}", className, dirName);
|
||||
|
||||
updatePropertiesFromRegisteredInterpreter(property, className);
|
||||
|
||||
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
|
||||
|
|
@ -806,6 +895,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
|
||||
updatePropertiesFromRegisteredInterpreter(property, className);
|
||||
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout,
|
||||
|
|
@ -813,6 +905,22 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
return intp;
|
||||
}
|
||||
|
||||
private Properties updatePropertiesFromRegisteredInterpreter(Properties properties,
|
||||
String className) {
|
||||
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
|
||||
className);
|
||||
if (null != registeredInterpreter) {
|
||||
Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties();
|
||||
for (String key : defaultProperties.keySet()) {
|
||||
if (!properties.containsKey(key)) {
|
||||
properties.setProperty(key, defaultProperties.get(key).getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
|
||||
URL[] urls = new URL[0];
|
||||
|
|
|
|||
Loading…
Reference in a new issue