Handle NPE when closing interpreters in group spark.

This commit is contained in:
Lee moon soo 2016-02-19 21:03:32 -08:00
parent 5047217ed3
commit feabb5fff0
4 changed files with 13 additions and 3 deletions

View file

@ -313,7 +313,11 @@ public class DepInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return getSparkInterpreter().getScheduler();
SparkInterpreter sparkInterpreter = getSparkInterpreter();
if (sparkInterpreter != null) {
return getSparkInterpreter().getScheduler();
} else {
return null;
}
}
}

View file

@ -931,6 +931,8 @@ public class SparkInterpreter extends Interpreter {
@Override
public void close() {
logger.info("Close interpreter");
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
sc.stop();
sc = null;

View file

@ -180,7 +180,7 @@ public class SparkSqlInterpreter extends Interpreter {
if (intp != null) {
return intp.getScheduler();
} else {
throw new InterpreterException("Can't find SparkInterpreter");
return null;
}
}
}

View file

@ -137,6 +137,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
* Close all interpreter instances in this group
*/
public void close() {
LOGGER.info("Close interpreter group " + getId());
List<Interpreter> intpToClose = new LinkedList<Interpreter>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToClose.addAll(intpGroupForNote);
@ -149,6 +150,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
* @param noteId
*/
public void close(String noteId) {
LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
List<Interpreter> intpForNote = this.get(noteId);
close(intpForNote);
}
@ -189,6 +191,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
* @param noteId
*/
public void destroy(String noteId) {
LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
List<Interpreter> intpForNote = this.get(noteId);
destroy(intpForNote);
}
@ -198,6 +201,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
* Destroy all interpreter instances in this group
*/
public void destroy() {
LOGGER.info("Destroy interpreter group " + getId());
List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
for (List<Interpreter> intpGroupForNote : this.values()) {
intpToDestroy.addAll(intpGroupForNote);