[ZEPPELIN-4390]. ExecutorService is not properly shutdown

This commit is contained in:
Jeff Zhang 2019-10-23 18:10:10 +08:00
parent 54ce30f492
commit 439221492a
7 changed files with 23 additions and 7 deletions

View file

@ -130,7 +130,8 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
result = interpreter.interpret(codeKillKernel, context);
assertEquals(Code.ERROR, result.code());
output = context.out.toInterpreterResultMessage().get(0);
assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
assertTrue(output.getData(),
output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+ "It might be because of an out of memory issue."));
}

View file

@ -167,6 +167,7 @@ public class InterpreterGroup {
for (Interpreter interpreter : session) {
try {
interpreter.close();
interpreter.getScheduler().stop();
} catch (InterpreterException e) {
LOGGER.warn("Fail to close interpreter: " + interpreter.getClassName(), e);
}

View file

@ -74,6 +74,7 @@ import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -255,7 +256,9 @@ public class RemoteInterpreterServer extends Thread
}
}
}
if (!isTest) {
SchedulerFactory.singleton().destroy();
}
server.stop();
// server.stop() does not always finish server.serve() loop

View file

@ -76,7 +76,7 @@ public abstract class AbstractScheduler implements Scheduler {
@Override
public void run() {
while (!terminate) {
while (!terminate && !Thread.currentThread().isInterrupted()) {
Job runningJob = null;
try {
runningJob = queue.take();

View file

@ -17,7 +17,7 @@
package org.apache.zeppelin.scheduler;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@ -25,12 +25,12 @@ import java.util.concurrent.Executors;
*/
public class FIFOScheduler extends AbstractScheduler {
private Executor executor;
private ExecutorService executor;
FIFOScheduler(String name) {
super(name);
executor = Executors.newSingleThreadExecutor(
new SchedulerThreadFactory("FIFOScheduler-Worker-"));
new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
}
@Override
@ -38,4 +38,10 @@ public class FIFOScheduler extends AbstractScheduler {
// run job in the SingleThreadExecutor since this is FIFO.
executor.execute(() -> runJob(job));
}
@Override
public void stop() {
super.stop();
executor.shutdownNow();
}
}

View file

@ -64,7 +64,11 @@ public class SchedulerFactory {
}
public void destroy() {
ExecutorFactory.singleton().shutdown("SchedulerFactory");
LOGGER.info("Destroy all executors");
ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
this.executor.shutdownNow();
this.executor = null;
singleton = null;
}
public Scheduler createOrGetFIFOScheduler(String name) {

View file

@ -140,6 +140,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
if (appendFuture != null) {
appendFuture.cancel(true);
}
appendService.shutdownNow();
LOGGER.info("RemoteInterpreterEventServer is stopped");
}