mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Handle NPE when closing interpreters in group spark.
This commit is contained in:
parent
5047217ed3
commit
feabb5fff0
4 changed files with 13 additions and 3 deletions
|
|
@ -313,7 +313,11 @@ public class DepInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return getSparkInterpreter().getScheduler();
|
||||
SparkInterpreter sparkInterpreter = getSparkInterpreter();
|
||||
if (sparkInterpreter != null) {
|
||||
return getSparkInterpreter().getScheduler();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -931,6 +931,8 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
logger.info("Close interpreter");
|
||||
|
||||
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
|
||||
sc.stop();
|
||||
sc = null;
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
if (intp != null) {
|
||||
return intp.getScheduler();
|
||||
} else {
|
||||
throw new InterpreterException("Can't find SparkInterpreter");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -137,6 +137,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
* Close all interpreter instances in this group
|
||||
*/
|
||||
public void close() {
|
||||
LOGGER.info("Close interpreter group " + getId());
|
||||
List<Interpreter> intpToClose = new LinkedList<Interpreter>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToClose.addAll(intpGroupForNote);
|
||||
|
|
@ -149,6 +150,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
* @param noteId
|
||||
*/
|
||||
public void close(String noteId) {
|
||||
LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
close(intpForNote);
|
||||
}
|
||||
|
|
@ -189,6 +191,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
* @param noteId
|
||||
*/
|
||||
public void destroy(String noteId) {
|
||||
LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
|
||||
List<Interpreter> intpForNote = this.get(noteId);
|
||||
destroy(intpForNote);
|
||||
}
|
||||
|
|
@ -198,6 +201,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
* Destroy all interpreter instances in this group
|
||||
*/
|
||||
public void destroy() {
|
||||
LOGGER.info("Destroy interpreter group " + getId());
|
||||
List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
|
||||
for (List<Interpreter> intpGroupForNote : this.values()) {
|
||||
intpToDestroy.addAll(intpGroupForNote);
|
||||
|
|
|
|||
Loading…
Reference in a new issue