ZEPPELIN-804 Refactoring registration mechanism on Interpreters

- Added a new initialization mechanism to use interpreter-setting.json
- Adjusted new mechanism to SparkInterpreter for verification
This commit is contained in:
Jongyoul Lee 2016-04-14 12:03:26 +09:00
parent a87d45ec04
commit ca7b96c4c4
6 changed files with 274 additions and 46 deletions

View file

@ -80,6 +80,7 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
public class SparkInterpreter extends Interpreter {
public static Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);
/*
static {
Interpreter.register(
"spark",
@ -111,6 +112,7 @@ public class SparkInterpreter extends Interpreter {
.build()
);
}
*/
private ZeppelinContext z;
private SparkILoop interpreter;

View file

@ -0,0 +1,57 @@
[
{
"interpreterGroup": "spark",
"interpreterName": "spark",
"interpreterClassName": "org.apache.zeppelin.spark.SparkInterpreter",
"properties": {
"spark.executor.memory": {
"envName": null,
"propertyName": "spark.executor.memory",
"defaultValue": "",
"description": "Executor memory per worker instance. ex) 512m, 32g"
},
"args": {
"envName": null,
"propertyName": null,
"defaultValue": "",
"description": "spark commandline args"
},
"zeppelin.spark.useHiveContext": {
"envName": "ZEPPELIN_SPARK_USEHIVECONTEXT",
"propertyName": "zeppelin.spark.useHiveContext",
"defaultValue": "true",
"description": "Use HiveContext instead of SQLContext if it is true."
},
"spark.app.name": {
"envName": "SPARK_APP_NAME",
"propertyName": "spark.app.name",
"defaultValue": "Zeppelin",
"description": "The name of spark application."
},
"zeppelin.spark.printREPLOutput": {
"envName": null,
"propertyName": null,
"defaultValue": "true",
"description": "Print REPL output"
},
"spark.cores.max": {
"envName": null,
"propertyName": "spark.cores.max",
"defaultValue": "",
"description": "Total number of cores to use. Empty value uses all available core."
},
"zeppelin.spark.maxResult": {
"envName": "ZEPPELIN_SPARK_MAXRESULT",
"propertyName": "zeppelin.spark.maxResult",
"defaultValue": "1000",
"description": "Max number of SparkSQL result to display."
},
"master": {
"envName": "MASTER",
"propertyName": "spark.master",
"defaultValue": "local[*]",
"description": "Spark master uri. ex) spark://masterhost:7077"
}
}
}
]

View file

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.google.gson.annotations.SerializedName;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -129,6 +130,7 @@ public abstract class Interpreter {
protected Properties property;
public Interpreter(Properties property) {
logger.debug("Properties: {}", property);
this.property = property;
}
@ -140,13 +142,16 @@ public abstract class Interpreter {
Properties p = new Properties();
p.putAll(property);
Map<String, InterpreterProperty> defaultProperties = Interpreter
.findRegisteredInterpreterByClassName(getClassName()).getProperties();
for (String k : defaultProperties.keySet()) {
if (!p.containsKey(k)) {
String value = defaultProperties.get(k).getDefaultValue();
if (value != null) {
p.put(k, defaultProperties.get(k).getDefaultValue());
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
getClassName());
if (null != registeredInterpreter) {
Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties();
for (String k : defaultProperties.keySet()) {
if (!p.containsKey(k)) {
String value = defaultProperties.get(k).getValue();
if (value != null) {
p.put(k, defaultProperties.get(k).getValue());
}
}
}
}
@ -155,17 +160,7 @@ public abstract class Interpreter {
}
public String getProperty(String key) {
if (property.containsKey(key)) {
return property.getProperty(key);
}
Map<String, InterpreterProperty> defaultProperties = Interpreter
.findRegisteredInterpreterByClassName(getClassName()).getProperties();
if (defaultProperties.containsKey(key)) {
return defaultProperties.get(key).getDefaultValue();
}
return null;
return getProperty().getProperty(key);
}
@ -228,8 +223,11 @@ public abstract class Interpreter {
* Represent registered interpreter class
*/
public static class RegisteredInterpreter {
private String name;
@SerializedName("interpreterGroup")
private String group;
@SerializedName("interpreterName")
private String name;
@SerializedName("interpreterClassName")
private String className;
private Map<String, InterpreterProperty> properties;
private String path;
@ -267,6 +265,10 @@ public abstract class Interpreter {
return path;
}
public String getInterpreterKey() {
return getGroup() + "." + getName();
}
}
/**
@ -287,10 +289,16 @@ public abstract class Interpreter {
register(name, group, className, new HashMap<String, InterpreterProperty>());
}
@Deprecated
public static void register(String name, String group, String className,
Map<String, InterpreterProperty> properties) {
registeredInterpreters.put(group + "." + name, new RegisteredInterpreter(
name, group, className, properties));
Map<String, InterpreterProperty> properties) {
logger.error("Static initialization is deprecated. You should change it to use " +
"interpreter-setting.json in your jar or interpreter/{interpreter}/interpreter-setting.json");
register(new RegisteredInterpreter(name, group, className, properties));
}
public static void register(RegisteredInterpreter registeredInterpreter) {
registeredInterpreters.put(registeredInterpreter.getInterpreterKey(), registeredInterpreter);
}
public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) {

View file

@ -21,16 +21,39 @@ package org.apache.zeppelin.interpreter;
* Represent property of interpreter
*/
public class InterpreterProperty {
String envName;
String propertyName;
String defaultValue;
String description;
public InterpreterProperty(String defaultValue,
String description) {
super();
public InterpreterProperty(String envName, String propertyName, String defaultValue,
String description) {
this.envName = envName;
this.propertyName = propertyName;
this.defaultValue = defaultValue;
this.description = description;
}
public InterpreterProperty(String defaultValue, String description) {
this(null, null, defaultValue, description);
}
public String getEnvName() {
return envName;
}
public void setEnvName(String envName) {
this.envName = envName;
}
public String getPropertyName() {
return propertyName;
}
public void setPropertyName(String propertyName) {
this.propertyName = propertyName;
}
public String getDefaultValue() {
return defaultValue;
}
@ -46,4 +69,28 @@ public class InterpreterProperty {
public void setDescription(String description) {
this.description = description;
}
public String getValue() {
//TODO(jongyoul): Remove SparkInterpreter's getSystemDefault method
if (envName != null && !envName.isEmpty()) {
String envValue = System.getenv().get(envName);
if (envValue != null) {
return envValue;
}
}
if (propertyName != null && !propertyName.isEmpty()) {
String propValue = System.getProperty(propertyName);
if (propValue != null) {
return propValue;
}
}
return defaultValue;
}
@Override
public String toString() {
return String.format("{envName=%s, propertyName=%s, defaultValue=%s, description=%20s", envName,
propertyName, defaultValue, description);
}
}

View file

@ -39,6 +39,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
private static final long serialVersionUID = 4749305895693848035L;
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class);
private static ZeppelinConfiguration conf;
private String interpreterJson;
public ZeppelinConfiguration(URL url) throws ConfigurationException {
setDelimiterParsingDisabled(true);
@ -342,6 +343,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
}
public String getInterpreterJson() {
return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON);
}
public String getInterpreterSettingPath() {
return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
}
@ -480,6 +485,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
+ "org.apache.zeppelin.jdbc.JDBCInterpreter,"
+ "org.apache.zeppelin.hbase.HbaseInterpreter"),
ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.interpreter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.NullArgumentException;
@ -45,9 +46,13 @@ import org.sonatype.aether.repository.RemoteRepository;
import java.io.*;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
/**
@ -112,39 +117,95 @@ public class InterpreterFactory implements InterpreterGroupFactory {
private void init() throws InterpreterException, IOException, RepositoryException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
String interpreterJson = conf.getInterpreterJson();
// Load classes
Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {}.getType();
ArrayList<RegisteredInterpreter> registeredInterpreters;
Boolean registered;
File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles();
if (interpreterDirs != null) {
for (File path : interpreterDirs) {
logger.info("Reading " + path.getAbsolutePath());
URL[] urls = null;
registered = false;
String absolutePath = path.getAbsolutePath();
logger.info("Reading " + absolutePath);
Path jsonPath = Paths.get(absolutePath, interpreterJson);
if (Files.exists(jsonPath)) {
logger.info("Reading {}", jsonPath.toString());
FileReader fileReader = new FileReader(jsonPath.toString());
registeredInterpreters = gson.fromJson(fileReader, registeredInterpreterListType);
registered = registerInterpreters(registeredInterpreters, absolutePath);
if (registered) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
logger.info("Interpreter {} found. class={}",
registeredInterpreter.getInterpreterKey(),
registeredInterpreter.getClassName());
}
}
}
URLClassLoader ccl;
try {
urls = recursiveBuildLibList(path);
URL[] urls = recursiveBuildLibList(path);
ccl = new URLClassLoader(urls, oldcl);
if (!registered) {
InputStream inputStream = ccl.getResourceAsStream(interpreterJson);
if (null != inputStream) {
logger.info("Reading {} from resources in {}", interpreterJson, absolutePath);
registeredInterpreters = gson.fromJson(new InputStreamReader(inputStream),
registeredInterpreterListType);
registered = registerInterpreters(registeredInterpreters, absolutePath);
if (registered) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
logger.info("Interpreter {} found. class={}",
registeredInterpreter.getInterpreterKey(),
registeredInterpreter.getClassName());
}
}
}
}
if (!registered) {
for (String className : interpreterClassList) {
try {
// Load classes
Class.forName(className, true, ccl);
Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
for (String interpreterKey : interpreterKeys) {
if (className.equals(
Interpreter.registeredInterpreters.get(interpreterKey).getClassName())) {
Interpreter.registeredInterpreters.get(interpreterKey).setPath(absolutePath);
logger.info("Interpreter " + interpreterKey + " found. class=" + className);
cleanCl.put(absolutePath, ccl);
registered = true;
}
}
} catch (ClassNotFoundException e) {
// nothing to do
}
}
}
} catch (MalformedURLException e1) {
logger.error("Can't load jars ", e1);
}
URLClassLoader ccl = new URLClassLoader(urls, oldcl);
for (String className : interpreterClassList) {
try {
Class.forName(className, true, ccl);
Set<String> keys = Interpreter.registeredInterpreters.keySet();
for (String intName : keys) {
if (className.equals(
Interpreter.registeredInterpreters.get(intName).getClassName())) {
Interpreter.registeredInterpreters.get(intName).setPath(path.getAbsolutePath());
logger.info("Interpreter " + intName + " found. class=" + className);
cleanCl.put(path.getAbsolutePath(), ccl);
}
}
} catch (ClassNotFoundException e) {
// nothing to do
}
if (!registered) {
logger.info("Failed to register Interpreter from {}", absolutePath);
}
}
}
for (RegisteredInterpreter registeredInterpreter:
Interpreter.registeredInterpreters.values()) {
logger.debug("Registered: {} -> {}. Properties: {}",
registeredInterpreter.getInterpreterKey(), registeredInterpreter.getClassName(),
registeredInterpreter.getProperties());
}
loadFromFile();
// if no interpreter settings are loaded, create default set
@ -175,7 +236,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
for (String k : info.getProperties().keySet()) {
p.put(k, info.getProperties().get(k).getDefaultValue());
p.put(k, info.getProperties().get(k).getValue());
}
}
@ -201,6 +262,32 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
private boolean registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
String absolutePath) {
if (validateRegisterInterpreters(registeredInterpreters)) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
registeredInterpreter.setPath(absolutePath);
Interpreter.register(registeredInterpreter);
}
return true;
} else {
return false;
}
}
private boolean validateRegisterInterpreters(List<RegisteredInterpreter> registeredInterpreters) {
Boolean isValid = true;
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
isValid &= null != registeredInterpreter.getGroup() &&
null != registeredInterpreter.getName() &&
null != registeredInterpreter.getClassName();
}
return isValid;
}
private void loadFromFile() throws IOException {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
@ -745,6 +832,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
throws InterpreterException {
logger.info("Create repl {} from {}", className, dirName);
updatePropertiesFromRegisteredInterpreter(property, className);
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
try {
@ -806,6 +895,9 @@ public class InterpreterFactory implements InterpreterGroupFactory {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
updatePropertiesFromRegisteredInterpreter(property, className);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout,
@ -813,6 +905,22 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return intp;
}
private Properties updatePropertiesFromRegisteredInterpreter(Properties properties,
String className) {
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
className);
if (null != registeredInterpreter) {
Map<String, InterpreterProperty> defaultProperties = registeredInterpreter.getProperties();
for (String key : defaultProperties.keySet()) {
if (!properties.containsKey(key)) {
properties.setProperty(key, defaultProperties.get(key).getValue());
}
}
}
return properties;
}
private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
URL[] urls = new URL[0];