mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-4390]. ExecutorService is not properly shutdown
This commit is contained in:
parent
54ce30f492
commit
439221492a
7 changed files with 23 additions and 7 deletions
|
|
@ -130,7 +130,8 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
result = interpreter.interpret(codeKillKernel, context);
|
||||
assertEquals(Code.ERROR, result.code());
|
||||
output = context.out.toInterpreterResultMessage().get(0);
|
||||
assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
|
||||
assertTrue(output.getData(),
|
||||
output.getData().equals("Ipython kernel has been stopped. Please check logs. "
|
||||
+ "It might be because of an out of memory issue."));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -167,6 +167,7 @@ public class InterpreterGroup {
|
|||
for (Interpreter interpreter : session) {
|
||||
try {
|
||||
interpreter.close();
|
||||
interpreter.getScheduler().stop();
|
||||
} catch (InterpreterException e) {
|
||||
LOGGER.warn("Fail to close interpreter: " + interpreter.getClassName(), e);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ import org.apache.zeppelin.scheduler.Job;
|
|||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -255,7 +256,9 @@ public class RemoteInterpreterServer extends Thread
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isTest) {
|
||||
SchedulerFactory.singleton().destroy();
|
||||
}
|
||||
server.stop();
|
||||
|
||||
// server.stop() does not always finish server.serve() loop
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ public abstract class AbstractScheduler implements Scheduler {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!terminate) {
|
||||
while (!terminate && !Thread.currentThread().isInterrupted()) {
|
||||
Job runningJob = null;
|
||||
try {
|
||||
runningJob = queue.take();
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.scheduler;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
|
|
@ -25,12 +25,12 @@ import java.util.concurrent.Executors;
|
|||
*/
|
||||
public class FIFOScheduler extends AbstractScheduler {
|
||||
|
||||
private Executor executor;
|
||||
private ExecutorService executor;
|
||||
|
||||
FIFOScheduler(String name) {
|
||||
super(name);
|
||||
executor = Executors.newSingleThreadExecutor(
|
||||
new SchedulerThreadFactory("FIFOScheduler-Worker-"));
|
||||
new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -38,4 +38,10 @@ public class FIFOScheduler extends AbstractScheduler {
|
|||
// run job in the SingleThreadExecutor since this is FIFO.
|
||||
executor.execute(() -> runJob(job));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
super.stop();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,7 +64,11 @@ public class SchedulerFactory {
|
|||
}
|
||||
|
||||
public void destroy() {
|
||||
ExecutorFactory.singleton().shutdown("SchedulerFactory");
|
||||
LOGGER.info("Destroy all executors");
|
||||
ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
|
||||
this.executor.shutdownNow();
|
||||
this.executor = null;
|
||||
singleton = null;
|
||||
}
|
||||
|
||||
public Scheduler createOrGetFIFOScheduler(String name) {
|
||||
|
|
|
|||
|
|
@ -140,6 +140,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
|
|||
if (appendFuture != null) {
|
||||
appendFuture.cancel(true);
|
||||
}
|
||||
appendService.shutdownNow();
|
||||
LOGGER.info("RemoteInterpreterEventServer is stopped");
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue