[ZEPPELIN-4714]. Flink table api doesn't work in multiple threads

This commit is contained in:
Jeff Zhang 2020-04-02 11:43:38 +08:00
parent b0ffd7e67a
commit 55d6135765
15 changed files with 246 additions and 75 deletions

View file

@ -239,3 +239,4 @@ after_failure:
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
- cat zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
- cat flink/*.log

View file

@ -637,7 +637,10 @@
<skip>false</skip>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Xmx3072m -XX:MaxPermSize=256m </argLine>
<!-- set sun.zip.disableMemoryMapping=true because of
https://blogs.oracle.com/poonam/crashes-in-zipgetentry
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
<argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
<environmentVariables>
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>

View file

@ -137,6 +137,13 @@ public class FlinkInterpreter extends Interpreter {
return this.innerIntp.getDefaultSqlParallelism();
}
/**
* Workaround for issue of FLINK-16936.
*/
public void createPlannerAgain() {
this.innerIntp.createPlannerAgain();
}
public ClassLoader getFlinkScalaShellLoader() {
return innerIntp.getFlinkScalaShellLoader();
}

View file

@ -40,6 +40,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
private FlinkInterpreter flinkInterpreter;
private InterpreterContext curInterpreterContext;
private boolean opened = false;
private ClassLoader originalClassLoader;
public IPyFlinkInterpreter(Properties property) {
super(property);
@ -78,16 +79,26 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
public InterpreterResult internalInterpret(String st,
InterpreterContext context)
throws InterpreterException {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.internalInterpret("intp.setInterpreterContextInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
result.toString());
try {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.internalInterpret("intp.initJavaThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to initJavaThread: " +
result.toString());
}
return super.internalInterpret(st, context);
} finally {
if (getKernelProcessLauncher().isRunning()) {
InterpreterResult result =
super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
}
}
}
return super.internalInterpret(st, context);
}
@Override
@ -105,8 +116,23 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
}
}
public void setInterpreterContextInPythonThread() {
/**
* Called by python process.
*/
public void initJavaThread() {
InterpreterContext.set(curInterpreterContext);
originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
flinkInterpreter.createPlannerAgain();
}
/**
* Called by python process.
*/
public void resetClassLoaderInPythonThread() {
if (originalClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
@Override

View file

@ -46,6 +46,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
private FlinkInterpreter flinkInterpreter;
private InterpreterContext curInterpreterContext;
private boolean isOpened = false;
private ClassLoader originalClassLoader;
public PyFlinkInterpreter(Properties properties) {
super(properties);
@ -103,22 +104,53 @@ public class PyFlinkInterpreter extends PythonInterpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
if (isOpened) {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.interpret("intp.setInterpreterContextInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
result.toString());
try {
if (isOpened) {
// set InterpreterContext in the python thread first, otherwise flink job could not be
// associated with paragraph in JobListener
this.curInterpreterContext = context;
InterpreterResult result =
super.interpret("intp.initJavaThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
throw new InterpreterException("Fail to initJavaThread: " +
result.toString());
}
}
flinkInterpreter.createPlannerAgain();
return super.interpret(st, context);
} finally {
if (getPythonProcessLauncher().isRunning()) {
InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
}
}
}
return super.interpret(st, context);
}
public void setInterpreterContextInPythonThread() {
/**
* Called by python process.
*/
public void initJavaThread() {
InterpreterContext.set(curInterpreterContext);
originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
flinkInterpreter.createPlannerAgain();
}
/**
* Called by python process.
*/
public void resetClassLoaderInPythonThread() {
if (originalClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);
flinkInterpreter.cancel(context);
}
@Override

View file

@ -182,6 +182,20 @@ public class TableEnvFactory {
settings.isStreamingMode());
}
public void createPlanner(EnvironmentSettings settings) {
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
Map<String, String> plannerProperties = settings.toPlannerProperties();
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tblConfig,
blinkFunctionCatalog,
catalogManager);
}
public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
EnvironmentSettings settings) {

View file

@ -79,6 +79,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
private var mode: ExecutionMode.Value = _
private var tblEnvFactory: TableEnvFactory = _
private var benv: ExecutionEnvironment = _
private var senv: StreamExecutionEnvironment = _
@ -229,7 +230,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
val classLoader = Thread.currentThread().getContextClassLoader
try {
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could find
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
// the TableFactory properly
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
@ -299,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
val tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
// blink planner
@ -547,7 +548,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
field.get(obj)
}
/**
* This is just a workaround to make table api work in multiple threads.
*/
def createPlannerAgain(): Unit = {
val originalClassLoader = Thread.currentThread().getContextClassLoader
try {
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
val stEnvSetting =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
this.tblEnvFactory.createPlanner(stEnvSetting)
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader)
}
}
def interpret(code: String, context: InterpreterContext): InterpreterResult = {
createPlannerAgain()
val originalStdOut = System.out
val originalStdErr = System.err;
if (context != null) {

View file

@ -84,7 +84,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
// select which use scala udf
context = getInterpreterContext();
result = sqlInterpreter.interpret("SELECT addOne(id) as add_one FROM source_table", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());

View file

@ -33,6 +33,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
@ -45,6 +46,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
private RemoteInterpreterEventClient mockIntpEventClient =
mock(RemoteInterpreterEventClient.class);
private LazyOpenInterpreter flinkScalaInterpreter;
protected Properties initIntpProperties() {
Properties p = new Properties();
@ -62,12 +64,12 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
context.setIntpEventClient(mockIntpEventClient);
InterpreterContext.set(context);
LazyOpenInterpreter flinkInterpreter = new LazyOpenInterpreter(
this.flinkScalaInterpreter = new LazyOpenInterpreter(
new FlinkInterpreter(properties));
intpGroup = new InterpreterGroup();
intpGroup.put("session_1", new ArrayList<Interpreter>());
intpGroup.get("session_1").add(flinkInterpreter);
flinkInterpreter.setInterpreterGroup(intpGroup);
intpGroup.get("session_1").add(flinkScalaInterpreter);
flinkScalaInterpreter.setInterpreterGroup(intpGroup);
LazyOpenInterpreter pyFlinkInterpreter =
new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
@ -94,17 +96,17 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
@Test
public void testBatchIPyFlink() throws InterpreterException {
testBatchPyFlink(interpreter);
testBatchPyFlink(interpreter, flinkScalaInterpreter);
}
@Test
public void testStreamIPyFlink() throws InterpreterException {
testStreamPyFlink(interpreter);
public void testStreamIPyFlink() throws InterpreterException, IOException {
testStreamPyFlink(interpreter, flinkScalaInterpreter);
}
public static void testBatchPyFlink(Interpreter interpreter) throws InterpreterException {
public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException {
InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
InterpreterResult result = interpreter.interpret(
InterpreterResult result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
"import shutil\n" +
@ -131,6 +133,77 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
"bt_env.execute(\"batch_job\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// use group by
context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
"import shutil\n" +
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
"if os.path.exists(sink_path):\n" +
" if os.path.isfile(sink_path):\n" +
" os.remove(sink_path)\n" +
" else:\n" +
" shutil.rmtree(sink_path)\n" +
"b_env.set_parallelism(1)\n" +
"t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
"bt_env.connect(FileSystem().path(sink_path)) \\\n" +
" .with_format(OldCsv()\n" +
" .field_delimiter(',')\n" +
" .field(\"a\", DataTypes.STRING())\n" +
" .field(\"b\", DataTypes.BIGINT())\n" +
" .field(\"c\", DataTypes.BIGINT())) \\\n" +
" .with_schema(Schema()\n" +
" .field(\"a\", DataTypes.STRING())\n" +
" .field(\"b\", DataTypes.BIGINT())\n" +
" .field(\"c\", DataTypes.BIGINT())) \\\n" +
" .register_table_sink(\"batch_sink4\")\n" +
"t.group_by(\"c\").select(\"c, sum(a), count(b)\").insert_into(\"batch_sink4\")\n" +
"bt_env.execute(\"batch_job4\")"
, context);
assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
// use scala udf in pyflink
// define scala udf
result = flinkScalaInterpreter.interpret(
"class AddOne extends ScalarFunction {\n" +
" def eval(a: java.lang.Long): String = a + \"\1\"\n" +
"}", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
result = flinkScalaInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
"import shutil\n" +
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
"if os.path.exists(sink_path):\n" +
" if os.path.isfile(sink_path):\n" +
" os.remove(sink_path)\n" +
" else:\n" +
" shutil.rmtree(sink_path)\n" +
"b_env.set_parallelism(1)\n" +
"t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
"bt_env.connect(FileSystem().path(sink_path)) \\\n" +
" .with_format(OldCsv()\n" +
" .field_delimiter(',')\n" +
" .field(\"a\", DataTypes.BIGINT())\n" +
" .field(\"b\", DataTypes.STRING())\n" +
" .field(\"c\", DataTypes.STRING())) \\\n" +
" .with_schema(Schema()\n" +
" .field(\"a\", DataTypes.BIGINT())\n" +
" .field(\"b\", DataTypes.STRING())\n" +
" .field(\"c\", DataTypes.STRING())) \\\n" +
" .register_table_sink(\"batch_sink3\")\n" +
"t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" +
"bt_env.execute(\"batch_job3\")"
, context);
assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
}
@Override
@ -149,33 +222,33 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
}
}
public static void testStreamPyFlink(Interpreter interpreter) throws InterpreterException {
public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
InterpreterResult result = interpreter.interpret(
"import tempfile\n" +
"import os\n" +
"import shutil\n" +
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
"if os.path.exists(sink_path):\n" +
" if os.path.isfile(sink_path):\n" +
" os.remove(sink_path)\n" +
" else:\n" +
" shutil.rmtree(sink_path)\n" +
"s_env.set_parallelism(1)\n" +
"t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
"st_env.connect(FileSystem().path(sink_path)) \\\n" +
" .with_format(OldCsv()\n" +
" .field_delimiter(',')\n" +
" .field(\"a\", DataTypes.BIGINT())\n" +
" .field(\"b\", DataTypes.STRING())\n" +
" .field(\"c\", DataTypes.STRING())) \\\n" +
" .with_schema(Schema()\n" +
" .field(\"a\", DataTypes.BIGINT())\n" +
" .field(\"b\", DataTypes.STRING())\n" +
" .field(\"c\", DataTypes.STRING())) \\\n" +
" .register_table_sink(\"stream_sink\")\n" +
"t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
"st_env.execute(\"stream_job\")"
"import tempfile\n" +
"import os\n" +
"import shutil\n" +
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
"if os.path.exists(sink_path):\n" +
" if os.path.isfile(sink_path):\n" +
" os.remove(sink_path)\n" +
" else:\n" +
" shutil.rmtree(sink_path)\n" +
"s_env.set_parallelism(1)\n" +
"t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
"st_env.connect(FileSystem().path(sink_path)) \\\n" +
" .with_format(OldCsv()\n" +
" .field_delimiter(',')\n" +
" .field(\"a\", DataTypes.BIGINT())\n" +
" .field(\"b\", DataTypes.STRING())\n" +
" .field(\"c\", DataTypes.STRING())) \\\n" +
" .with_schema(Schema()\n" +
" .field(\"a\", DataTypes.BIGINT())\n" +
" .field(\"b\", DataTypes.STRING())\n" +
" .field(\"c\", DataTypes.STRING())) \\\n" +
" .register_table_sink(\"stream_sink\")\n" +
"t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
"st_env.execute(\"stream_job\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}

View file

@ -19,7 +19,6 @@ package org.apache.zeppelin.flink;
import com.google.common.io.Files;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -32,6 +31,8 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.PythonInterpreterTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@ -48,7 +49,7 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
private RemoteInterpreterEventClient mockRemoteEventClient =
mock(RemoteInterpreterEventClient.class);
private Interpreter flinkInterpreter;
private Interpreter flinkScalaInterpreter;
private Interpreter streamSqlInterpreter;
private Interpreter batchSqlInterpreter;
@ -77,9 +78,9 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
.setIntpEventClient(mockRemoteEventClient)
.build();
InterpreterContext.set(context);
flinkInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
intpGroup.get("session_1").add(flinkInterpreter);
flinkInterpreter.setInterpreterGroup(intpGroup);
flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
intpGroup.get("session_1").add(flinkScalaInterpreter);
flinkScalaInterpreter.setInterpreterGroup(intpGroup);
LazyOpenInterpreter iPyFlinkInterpreter =
new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
@ -108,9 +109,9 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
}
@Test
public void testPyFlink() throws InterpreterException {
IPyFlinkInterpreterTest.testBatchPyFlink(interpreter);
IPyFlinkInterpreterTest.testStreamPyFlink(interpreter);
public void testPyFlink() throws InterpreterException, IOException {
IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
}
protected InterpreterContext getInterpreterContext() {

View file

@ -15,11 +15,12 @@
# limitations under the License.
#
log4j.rootLogger = WARN, stdout
log4j.rootLogger = INFO, stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
log4j.logger.org.apache.hive=WARN
log4j.logger.org.apache.flink=WARN

View file

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
status = WARN
status = INFO
name = HiveLog4j2
packages = org.apache.hadoop.hive.ql.log

View file

@ -17,7 +17,6 @@
package org.apache.zeppelin.python;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.gson.Gson;
import org.apache.commons.exec.CommandLine;
@ -60,7 +59,7 @@ public class PythonInterpreter extends Interpreter {
private static final int MAX_TIMEOUT_SEC = 30;
private GatewayServer gatewayServer;
private PythonProcessLauncher pythonProcessLauncher;
protected PythonProcessLauncher pythonProcessLauncher;
private File pythonWorkDir;
protected boolean useBuiltinPy4j = true;
@ -163,7 +162,6 @@ public class PythonInterpreter extends Interpreter {
}
}
@VisibleForTesting
public PythonProcessLauncher getPythonProcessLauncher() {
return pythonProcessLauncher;
}
@ -572,7 +570,7 @@ public class PythonInterpreter extends Interpreter {
LOGGER.debug("Python Process Output: " + message);
}
class PythonProcessLauncher extends ProcessLauncher {
public class PythonProcessLauncher extends ProcessLauncher {
PythonProcessLauncher(CommandLine commandLine, Map<String, String> envs) {
super(commandLine, envs);

View file

@ -146,7 +146,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
assertEquals(Code.ERROR, result.code());
output = context.out.toInterpreterResultMessage().get(0);
assertTrue(output.getData(),
output.getData().equals("Ipython kernel has been stopped. Please check logs. "
output.getData().contains("Ipython kernel has been stopped. Please check logs. "
+ "It might be because of an out of memory issue."));
}

View file

@ -17,7 +17,6 @@
package org.apache.zeppelin.jupyter;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.environment.EnvironmentUtils;
@ -213,7 +212,6 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
return EnvironmentUtils.getProcEnvironment();
}
@VisibleForTesting
public JupyterKernelProcessLauncher getKernelProcessLauncher() {
return jupyterKernelProcessLauncher;
}