ZEPPELIN-804 Refactoring registration mechanism on Interpreters

- Extracted new initialization logic into another methods
This commit is contained in:
Jongyoul Lee 2016-04-22 16:44:27 +09:00
parent 5d63d91505
commit c5b7d545da

View file

@ -50,6 +50,7 @@ import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -116,90 +117,52 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
private void init() throws InterpreterException, IOException, RepositoryException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
String interpreterJson = conf.getInterpreterJson();
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {}.getType();
ArrayList<RegisteredInterpreter> registeredInterpreters;
Boolean registered;
File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles();
if (interpreterDirs != null) {
for (File path : interpreterDirs) {
registered = false;
String absolutePath = path.getAbsolutePath();
logger.info("Reading " + absolutePath);
Path jsonPath = Paths.get(absolutePath, interpreterJson);
if (Files.exists(jsonPath)) {
logger.info("Reading {}", jsonPath.toString());
FileReader fileReader = new FileReader(jsonPath.toString());
registeredInterpreters = gson.fromJson(fileReader, registeredInterpreterListType);
registered = registerInterpreters(registeredInterpreters, absolutePath);
if (registered) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
logger.info("Interpreter {} found. class={}",
registeredInterpreter.getInterpreterKey(),
registeredInterpreter.getClassName());
}
for (Path interpreterDir : Files.newDirectoryStream(Paths.get(conf.getInterpreterDir()),
new DirectoryStream.Filter<Path>() {
@Override
public boolean accept(Path entry) throws IOException {
return Files.isDirectory(entry);
}
}
})) {
String interpreterDirString = interpreterDir.toString();
URLClassLoader ccl;
try {
URL[] urls = recursiveBuildLibList(path);
ccl = new URLClassLoader(urls, oldcl);
if (!registered) {
registerInterpreterFromPath(interpreterDirString, interpreterJson);
InputStream inputStream = ccl.getResourceAsStream(interpreterJson);
registerInterpreterFromResource(cl, interpreterDirString, interpreterJson);
if (null != inputStream) {
logger.info("Reading {} from resources in {}", interpreterJson, absolutePath);
registeredInterpreters = gson.fromJson(new InputStreamReader(inputStream),
registeredInterpreterListType);
registered = registerInterpreters(registeredInterpreters, absolutePath);
if (registered) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
logger.info("Interpreter {} found. class={}",
registeredInterpreter.getInterpreterKey(),
registeredInterpreter.getClassName());
}
/**
* TODO(jongyoul)
* - Remove these codes below because of legacy code
* - Support ThreadInterpreter
*/
URLClassLoader ccl = new URLClassLoader(recursiveBuildLibList(interpreterDir.toFile()), cl);
for (String className : interpreterClassList) {
// Check whether className is already registered through above logic
if (null == Interpreter.findRegisteredInterpreterByClassName(className)) {
try {
// Load classes
Class.forName(className, true, ccl);
Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
for (String interpreterKey : interpreterKeys) {
if (className.equals(
Interpreter.registeredInterpreters.get(interpreterKey).getClassName())) {
Interpreter.registeredInterpreters.get(interpreterKey).setPath(
interpreterDirString);
logger.info("Interpreter " + interpreterKey + " found. class=" + className);
cleanCl.put(interpreterDirString, ccl);
}
}
} catch (ClassNotFoundException e) {
// nothing to do
}
if (!registered) {
for (String className : interpreterClassList) {
try {
// Load classes
Class.forName(className, true, ccl);
Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
for (String interpreterKey : interpreterKeys) {
if (className.equals(
Interpreter.registeredInterpreters.get(interpreterKey).getClassName())) {
Interpreter.registeredInterpreters.get(interpreterKey).setPath(absolutePath);
logger.info("Interpreter " + interpreterKey + " found. class=" + className);
cleanCl.put(absolutePath, ccl);
registered = true;
}
}
} catch (ClassNotFoundException e) {
// nothing to do
}
}
}
} catch (MalformedURLException e1) {
logger.error("Can't load jars ", e1);
}
if (!registered) {
logger.info("Failed to register Interpreter from {}", absolutePath);
}
}
}
for (RegisteredInterpreter registeredInterpreter:
for (RegisteredInterpreter registeredInterpreter :
Interpreter.registeredInterpreters.values()) {
logger.debug("Registered: {} -> {}. Properties: {}",
registeredInterpreter.getInterpreterKey(), registeredInterpreter.getClassName(),
@ -211,8 +174,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// if no interpreter settings are loaded, create default set
synchronized (interpreterSettings) {
if (interpreterSettings.size() == 0) {
HashMap<String, List<RegisteredInterpreter>> groupClassNameMap =
new HashMap<String, List<RegisteredInterpreter>>();
HashMap<String, List<RegisteredInterpreter>> groupClassNameMap = new HashMap<>();
for (String k : Interpreter.registeredInterpreters.keySet()) {
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(k);
@ -242,11 +204,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
if (found) {
// add all interpreters in group
add(groupName,
groupName,
new LinkedList<Dependency>(),
defaultOption,
p);
add(groupName, groupName, new LinkedList<Dependency>(), defaultOption, p);
groupClassNameMap.remove(groupName);
break;
}
@ -257,35 +215,68 @@ public class InterpreterFactory implements InterpreterGroupFactory {
for (String settingId : interpreterSettings.keySet()) {
InterpreterSetting setting = interpreterSettings.get(settingId);
logger.info("Interpreter setting group {} : id={}, name={}",
setting.getGroup(), settingId, setting.getName());
logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId,
setting.getName());
}
}
private boolean registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
String absolutePath) {
if (validateRegisterInterpreters(registeredInterpreters)) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
private void registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
String interpreterJson)
throws MalformedURLException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
InputStream inputStream = tempClassLoader.getResourceAsStream(interpreterJson);
if (null != inputStream) {
logger.debug("Reading {} from resources in {}", interpreterJson, interpreterDir);
List<RegisteredInterpreter> registeredInterpreterList = getInterpreterListFromJson(
inputStream);
registerInterpreters(registeredInterpreterList, interpreterDir);
}
}
private void registerInterpreterFromPath(String interpreterDir,
String interpreterJson) throws IOException {
Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
if (Files.exists(interpreterJsonPath)) {
logger.debug("Reading {}", interpreterJsonPath);
List<RegisteredInterpreter> registeredInterpreterList = getInterpreterListFromJson(
interpreterJsonPath);
registerInterpreters(registeredInterpreterList, interpreterDir);
}
}
private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename)
throws FileNotFoundException {
return getInterpreterListFromJson(new FileInputStream(filename.toFile()));
}
private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) {
Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {
}.getType();
return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
}
private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
String absolutePath) {
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
String className = registeredInterpreter.getClassName();
if (validateRegisterInterpreter(registeredInterpreter) &&
null == Interpreter.findRegisteredInterpreterByClassName(className)) {
registeredInterpreter.setPath(absolutePath);
Interpreter.register(registeredInterpreter);
logger.debug("Registered. key: {}, className: {}, path: {}",
registeredInterpreter.getInterpreterKey(), registeredInterpreter.getClassName(),
registeredInterpreter.getProperties());
}
return true;
} else {
return false;
}
}
private boolean validateRegisterInterpreters(List<RegisteredInterpreter> registeredInterpreters) {
Boolean isValid = true;
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
isValid &= null != registeredInterpreter.getGroup() &&
null != registeredInterpreter.getName() &&
null != registeredInterpreter.getClassName();
}
return isValid;
private boolean validateRegisterInterpreter(RegisteredInterpreter registeredInterpreter) {
return null != registeredInterpreter.getGroup() && null != registeredInterpreter.getName() &&
null != registeredInterpreter.getClassName();
}
private void loadFromFile() throws IOException {