mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-4714]. Flink table api doesn't work in multiple threads
This commit is contained in:
parent
b0ffd7e67a
commit
55d6135765
15 changed files with 246 additions and 75 deletions
|
|
@ -239,3 +239,4 @@ after_failure:
|
|||
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stdout
|
||||
- cat livy/target/tmp/livy-int-test/MiniYarnMain/target/org.apache.livy.test.framework.MiniYarnMain/*/*/*/stderr
|
||||
- cat zeppelin-zengine/target/org.apache.zeppelin.interpreter.MiniHadoopCluster/*/*/*/stdout
|
||||
- cat flink/*.log
|
||||
|
|
|
|||
|
|
@ -637,7 +637,10 @@
|
|||
<skip>false</skip>
|
||||
<forkCount>1</forkCount>
|
||||
<reuseForks>false</reuseForks>
|
||||
<argLine>-Xmx3072m -XX:MaxPermSize=256m </argLine>
|
||||
<!-- set sun.zip.disableMemoryMapping=true because of
|
||||
https://blogs.oracle.com/poonam/crashes-in-zipgetentry
|
||||
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
|
||||
<argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
|
||||
|
||||
<environmentVariables>
|
||||
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
|
||||
|
|
|
|||
|
|
@ -137,6 +137,13 @@ public class FlinkInterpreter extends Interpreter {
|
|||
return this.innerIntp.getDefaultSqlParallelism();
|
||||
}
|
||||
|
||||
/**
|
||||
* Workaround for issue of FLINK-16936.
|
||||
*/
|
||||
public void createPlannerAgain() {
|
||||
this.innerIntp.createPlannerAgain();
|
||||
}
|
||||
|
||||
public ClassLoader getFlinkScalaShellLoader() {
|
||||
return innerIntp.getFlinkScalaShellLoader();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
|
|||
private FlinkInterpreter flinkInterpreter;
|
||||
private InterpreterContext curInterpreterContext;
|
||||
private boolean opened = false;
|
||||
private ClassLoader originalClassLoader;
|
||||
|
||||
public IPyFlinkInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -78,16 +79,26 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
|
|||
public InterpreterResult internalInterpret(String st,
|
||||
InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
// set InterpreterContext in the python thread first, otherwise flink job could not be
|
||||
// associated with paragraph in JobListener
|
||||
this.curInterpreterContext = context;
|
||||
InterpreterResult result =
|
||||
super.internalInterpret("intp.setInterpreterContextInPythonThread()", context);
|
||||
if (result.code() != InterpreterResult.Code.SUCCESS) {
|
||||
throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
|
||||
result.toString());
|
||||
try {
|
||||
// set InterpreterContext in the python thread first, otherwise flink job could not be
|
||||
// associated with paragraph in JobListener
|
||||
this.curInterpreterContext = context;
|
||||
InterpreterResult result =
|
||||
super.internalInterpret("intp.initJavaThread()", context);
|
||||
if (result.code() != InterpreterResult.Code.SUCCESS) {
|
||||
throw new InterpreterException("Fail to initJavaThread: " +
|
||||
result.toString());
|
||||
}
|
||||
return super.internalInterpret(st, context);
|
||||
} finally {
|
||||
if (getKernelProcessLauncher().isRunning()) {
|
||||
InterpreterResult result =
|
||||
super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
|
||||
if (result.code() != InterpreterResult.Code.SUCCESS) {
|
||||
LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.internalInterpret(st, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -105,8 +116,23 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
|
|||
}
|
||||
}
|
||||
|
||||
public void setInterpreterContextInPythonThread() {
|
||||
/**
|
||||
* Called by python process.
|
||||
*/
|
||||
public void initJavaThread() {
|
||||
InterpreterContext.set(curInterpreterContext);
|
||||
originalClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
|
||||
flinkInterpreter.createPlannerAgain();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by python process.
|
||||
*/
|
||||
public void resetClassLoaderInPythonThread() {
|
||||
if (originalClassLoader != null) {
|
||||
Thread.currentThread().setContextClassLoader(originalClassLoader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
|
|||
private FlinkInterpreter flinkInterpreter;
|
||||
private InterpreterContext curInterpreterContext;
|
||||
private boolean isOpened = false;
|
||||
private ClassLoader originalClassLoader;
|
||||
|
||||
public PyFlinkInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
|
|
@ -103,22 +104,53 @@ public class PyFlinkInterpreter extends PythonInterpreter {
|
|||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
|
||||
if (isOpened) {
|
||||
// set InterpreterContext in the python thread first, otherwise flink job could not be
|
||||
// associated with paragraph in JobListener
|
||||
this.curInterpreterContext = context;
|
||||
InterpreterResult result =
|
||||
super.interpret("intp.setInterpreterContextInPythonThread()", context);
|
||||
if (result.code() != InterpreterResult.Code.SUCCESS) {
|
||||
throw new InterpreterException("Fail to setInterpreterContextInPythonThread: " +
|
||||
result.toString());
|
||||
try {
|
||||
if (isOpened) {
|
||||
// set InterpreterContext in the python thread first, otherwise flink job could not be
|
||||
// associated with paragraph in JobListener
|
||||
this.curInterpreterContext = context;
|
||||
InterpreterResult result =
|
||||
super.interpret("intp.initJavaThread()", context);
|
||||
if (result.code() != InterpreterResult.Code.SUCCESS) {
|
||||
throw new InterpreterException("Fail to initJavaThread: " +
|
||||
result.toString());
|
||||
}
|
||||
}
|
||||
flinkInterpreter.createPlannerAgain();
|
||||
return super.interpret(st, context);
|
||||
} finally {
|
||||
if (getPythonProcessLauncher().isRunning()) {
|
||||
InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
|
||||
if (result.code() != InterpreterResult.Code.SUCCESS) {
|
||||
LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.interpret(st, context);
|
||||
}
|
||||
|
||||
public void setInterpreterContextInPythonThread() {
|
||||
/**
|
||||
* Called by python process.
|
||||
*/
|
||||
public void initJavaThread() {
|
||||
InterpreterContext.set(curInterpreterContext);
|
||||
originalClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(flinkInterpreter.getFlinkScalaShellLoader());
|
||||
flinkInterpreter.createPlannerAgain();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by python process.
|
||||
*/
|
||||
public void resetClassLoaderInPythonThread() {
|
||||
if (originalClassLoader != null) {
|
||||
Thread.currentThread().setContextClassLoader(originalClassLoader);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) throws InterpreterException {
|
||||
super.cancel(context);
|
||||
flinkInterpreter.cancel(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -182,6 +182,20 @@ public class TableEnvFactory {
|
|||
settings.isStreamingMode());
|
||||
}
|
||||
|
||||
public void createPlanner(EnvironmentSettings settings) {
|
||||
Map<String, String> executorProperties = settings.toExecutorProperties();
|
||||
Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
|
||||
|
||||
Map<String, String> plannerProperties = settings.toPlannerProperties();
|
||||
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
|
||||
.create(
|
||||
plannerProperties,
|
||||
executor,
|
||||
tblConfig,
|
||||
blinkFunctionCatalog,
|
||||
catalogManager);
|
||||
}
|
||||
|
||||
public StreamTableEnvironment createJavaBlinkStreamTableEnvironment(
|
||||
EnvironmentSettings settings) {
|
||||
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
|
|||
|
||||
private var mode: ExecutionMode.Value = _
|
||||
|
||||
private var tblEnvFactory: TableEnvFactory = _
|
||||
private var benv: ExecutionEnvironment = _
|
||||
private var senv: StreamExecutionEnvironment = _
|
||||
|
||||
|
|
@ -229,7 +230,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
|
|||
config.externalJars.getOrElse(Array.empty[String]).mkString(":"))
|
||||
val classLoader = Thread.currentThread().getContextClassLoader
|
||||
try {
|
||||
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could find
|
||||
// use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
|
||||
// the TableFactory properly
|
||||
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
|
||||
val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
|
||||
|
|
@ -299,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
|
|||
val flinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
|
||||
val blinkFunctionCatalog = new FunctionCatalog(tblConfig, catalogManager, moduleManager);
|
||||
|
||||
val tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
|
||||
this.tblEnvFactory = new TableEnvFactory(this.benv, this.senv, tblConfig,
|
||||
catalogManager, moduleManager, flinkFunctionCatalog, blinkFunctionCatalog)
|
||||
|
||||
// blink planner
|
||||
|
|
@ -547,7 +548,23 @@ class FlinkScalaInterpreter(val properties: Properties) {
|
|||
field.get(obj)
|
||||
}
|
||||
|
||||
/**
|
||||
* This is just a workaround to make table api work in multiple threads.
|
||||
*/
|
||||
def createPlannerAgain(): Unit = {
|
||||
val originalClassLoader = Thread.currentThread().getContextClassLoader
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
|
||||
val stEnvSetting =
|
||||
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
|
||||
this.tblEnvFactory.createPlanner(stEnvSetting)
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(originalClassLoader)
|
||||
}
|
||||
}
|
||||
|
||||
def interpret(code: String, context: InterpreterContext): InterpreterResult = {
|
||||
createPlannerAgain()
|
||||
val originalStdOut = System.out
|
||||
val originalStdErr = System.err;
|
||||
if (context != null) {
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
|
|||
// select which use scala udf
|
||||
context = getInterpreterContext();
|
||||
result = sqlInterpreter.interpret("SELECT addOne(id) as add_one FROM source_table", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
|
||||
resultMessages = context.out.toInterpreterResultMessage();
|
||||
assertEquals(1, resultMessages.size());
|
||||
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Properties;
|
||||
|
||||
|
|
@ -45,6 +46,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
|
|||
|
||||
private RemoteInterpreterEventClient mockIntpEventClient =
|
||||
mock(RemoteInterpreterEventClient.class);
|
||||
private LazyOpenInterpreter flinkScalaInterpreter;
|
||||
|
||||
protected Properties initIntpProperties() {
|
||||
Properties p = new Properties();
|
||||
|
|
@ -62,12 +64,12 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
|
|||
context.setIntpEventClient(mockIntpEventClient);
|
||||
InterpreterContext.set(context);
|
||||
|
||||
LazyOpenInterpreter flinkInterpreter = new LazyOpenInterpreter(
|
||||
this.flinkScalaInterpreter = new LazyOpenInterpreter(
|
||||
new FlinkInterpreter(properties));
|
||||
intpGroup = new InterpreterGroup();
|
||||
intpGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
intpGroup.get("session_1").add(flinkInterpreter);
|
||||
flinkInterpreter.setInterpreterGroup(intpGroup);
|
||||
intpGroup.get("session_1").add(flinkScalaInterpreter);
|
||||
flinkScalaInterpreter.setInterpreterGroup(intpGroup);
|
||||
|
||||
LazyOpenInterpreter pyFlinkInterpreter =
|
||||
new LazyOpenInterpreter(new PyFlinkInterpreter(properties));
|
||||
|
|
@ -94,17 +96,17 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void testBatchIPyFlink() throws InterpreterException {
|
||||
testBatchPyFlink(interpreter);
|
||||
testBatchPyFlink(interpreter, flinkScalaInterpreter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStreamIPyFlink() throws InterpreterException {
|
||||
testStreamPyFlink(interpreter);
|
||||
public void testStreamIPyFlink() throws InterpreterException, IOException {
|
||||
testStreamPyFlink(interpreter, flinkScalaInterpreter);
|
||||
}
|
||||
|
||||
public static void testBatchPyFlink(Interpreter interpreter) throws InterpreterException {
|
||||
public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException {
|
||||
InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
|
||||
InterpreterResult result = interpreter.interpret(
|
||||
InterpreterResult result = pyflinkInterpreter.interpret(
|
||||
"import tempfile\n" +
|
||||
"import os\n" +
|
||||
"import shutil\n" +
|
||||
|
|
@ -131,6 +133,77 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
|
|||
"bt_env.execute(\"batch_job\")"
|
||||
, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
// use group by
|
||||
context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
|
||||
result = pyflinkInterpreter.interpret(
|
||||
"import tempfile\n" +
|
||||
"import os\n" +
|
||||
"import shutil\n" +
|
||||
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
|
||||
"if os.path.exists(sink_path):\n" +
|
||||
" if os.path.isfile(sink_path):\n" +
|
||||
" os.remove(sink_path)\n" +
|
||||
" else:\n" +
|
||||
" shutil.rmtree(sink_path)\n" +
|
||||
"b_env.set_parallelism(1)\n" +
|
||||
"t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
|
||||
"bt_env.connect(FileSystem().path(sink_path)) \\\n" +
|
||||
" .with_format(OldCsv()\n" +
|
||||
" .field_delimiter(',')\n" +
|
||||
" .field(\"a\", DataTypes.STRING())\n" +
|
||||
" .field(\"b\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"c\", DataTypes.BIGINT())) \\\n" +
|
||||
" .with_schema(Schema()\n" +
|
||||
" .field(\"a\", DataTypes.STRING())\n" +
|
||||
" .field(\"b\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"c\", DataTypes.BIGINT())) \\\n" +
|
||||
" .register_table_sink(\"batch_sink4\")\n" +
|
||||
"t.group_by(\"c\").select(\"c, sum(a), count(b)\").insert_into(\"batch_sink4\")\n" +
|
||||
"bt_env.execute(\"batch_job4\")"
|
||||
, context);
|
||||
assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
// use scala udf in pyflink
|
||||
// define scala udf
|
||||
result = flinkScalaInterpreter.interpret(
|
||||
"class AddOne extends ScalarFunction {\n" +
|
||||
" def eval(a: java.lang.Long): String = a + \"\1\"\n" +
|
||||
"}", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
result = flinkScalaInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
|
||||
context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
|
||||
result = pyflinkInterpreter.interpret(
|
||||
"import tempfile\n" +
|
||||
"import os\n" +
|
||||
"import shutil\n" +
|
||||
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
|
||||
"if os.path.exists(sink_path):\n" +
|
||||
" if os.path.isfile(sink_path):\n" +
|
||||
" os.remove(sink_path)\n" +
|
||||
" else:\n" +
|
||||
" shutil.rmtree(sink_path)\n" +
|
||||
"b_env.set_parallelism(1)\n" +
|
||||
"t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
|
||||
"bt_env.connect(FileSystem().path(sink_path)) \\\n" +
|
||||
" .with_format(OldCsv()\n" +
|
||||
" .field_delimiter(',')\n" +
|
||||
" .field(\"a\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"b\", DataTypes.STRING())\n" +
|
||||
" .field(\"c\", DataTypes.STRING())) \\\n" +
|
||||
" .with_schema(Schema()\n" +
|
||||
" .field(\"a\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"b\", DataTypes.STRING())\n" +
|
||||
" .field(\"c\", DataTypes.STRING())) \\\n" +
|
||||
" .register_table_sink(\"batch_sink3\")\n" +
|
||||
"t.select(\"a, addOne(a), c\").insert_into(\"batch_sink3\")\n" +
|
||||
"bt_env.execute(\"batch_job3\")"
|
||||
, context);
|
||||
assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -149,33 +222,33 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
|
|||
}
|
||||
}
|
||||
|
||||
public static void testStreamPyFlink(Interpreter interpreter) throws InterpreterException {
|
||||
public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
|
||||
InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
|
||||
InterpreterResult result = interpreter.interpret(
|
||||
"import tempfile\n" +
|
||||
"import os\n" +
|
||||
"import shutil\n" +
|
||||
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
|
||||
"if os.path.exists(sink_path):\n" +
|
||||
" if os.path.isfile(sink_path):\n" +
|
||||
" os.remove(sink_path)\n" +
|
||||
" else:\n" +
|
||||
" shutil.rmtree(sink_path)\n" +
|
||||
"s_env.set_parallelism(1)\n" +
|
||||
"t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
|
||||
"st_env.connect(FileSystem().path(sink_path)) \\\n" +
|
||||
" .with_format(OldCsv()\n" +
|
||||
" .field_delimiter(',')\n" +
|
||||
" .field(\"a\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"b\", DataTypes.STRING())\n" +
|
||||
" .field(\"c\", DataTypes.STRING())) \\\n" +
|
||||
" .with_schema(Schema()\n" +
|
||||
" .field(\"a\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"b\", DataTypes.STRING())\n" +
|
||||
" .field(\"c\", DataTypes.STRING())) \\\n" +
|
||||
" .register_table_sink(\"stream_sink\")\n" +
|
||||
"t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
|
||||
"st_env.execute(\"stream_job\")"
|
||||
"import tempfile\n" +
|
||||
"import os\n" +
|
||||
"import shutil\n" +
|
||||
"sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
|
||||
"if os.path.exists(sink_path):\n" +
|
||||
" if os.path.isfile(sink_path):\n" +
|
||||
" os.remove(sink_path)\n" +
|
||||
" else:\n" +
|
||||
" shutil.rmtree(sink_path)\n" +
|
||||
"s_env.set_parallelism(1)\n" +
|
||||
"t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
|
||||
"st_env.connect(FileSystem().path(sink_path)) \\\n" +
|
||||
" .with_format(OldCsv()\n" +
|
||||
" .field_delimiter(',')\n" +
|
||||
" .field(\"a\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"b\", DataTypes.STRING())\n" +
|
||||
" .field(\"c\", DataTypes.STRING())) \\\n" +
|
||||
" .with_schema(Schema()\n" +
|
||||
" .field(\"a\", DataTypes.BIGINT())\n" +
|
||||
" .field(\"b\", DataTypes.STRING())\n" +
|
||||
" .field(\"c\", DataTypes.STRING())) \\\n" +
|
||||
" .register_table_sink(\"stream_sink\")\n" +
|
||||
"t.select(\"a + 1, b, c\").insert_into(\"stream_sink\")\n" +
|
||||
"st_env.execute(\"stream_job\")"
|
||||
, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ package org.apache.zeppelin.flink;
|
|||
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
|
|
@ -32,6 +31,8 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
|
|||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.python.PythonInterpreterTest;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -48,7 +49,7 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
|
|||
private RemoteInterpreterEventClient mockRemoteEventClient =
|
||||
mock(RemoteInterpreterEventClient.class);
|
||||
|
||||
private Interpreter flinkInterpreter;
|
||||
private Interpreter flinkScalaInterpreter;
|
||||
private Interpreter streamSqlInterpreter;
|
||||
private Interpreter batchSqlInterpreter;
|
||||
|
||||
|
|
@ -77,9 +78,9 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
|
|||
.setIntpEventClient(mockRemoteEventClient)
|
||||
.build();
|
||||
InterpreterContext.set(context);
|
||||
flinkInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
|
||||
intpGroup.get("session_1").add(flinkInterpreter);
|
||||
flinkInterpreter.setInterpreterGroup(intpGroup);
|
||||
flinkScalaInterpreter = new LazyOpenInterpreter(new FlinkInterpreter(properties));
|
||||
intpGroup.get("session_1").add(flinkScalaInterpreter);
|
||||
flinkScalaInterpreter.setInterpreterGroup(intpGroup);
|
||||
|
||||
LazyOpenInterpreter iPyFlinkInterpreter =
|
||||
new LazyOpenInterpreter(new IPyFlinkInterpreter(properties));
|
||||
|
|
@ -108,9 +109,9 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPyFlink() throws InterpreterException {
|
||||
IPyFlinkInterpreterTest.testBatchPyFlink(interpreter);
|
||||
IPyFlinkInterpreterTest.testStreamPyFlink(interpreter);
|
||||
public void testPyFlink() throws InterpreterException, IOException {
|
||||
IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
|
||||
IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
|
||||
}
|
||||
|
||||
protected InterpreterContext getInterpreterContext() {
|
||||
|
|
|
|||
|
|
@ -15,11 +15,12 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
log4j.rootLogger = WARN, stdout
|
||||
log4j.rootLogger = INFO, stdout
|
||||
|
||||
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
|
||||
|
||||
log4j.logger.org.apache.hive=WARN
|
||||
log4j.logger.org.apache.flink=WARN
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
status = WARN
|
||||
status = INFO
|
||||
name = HiveLog4j2
|
||||
packages = org.apache.hadoop.hive.ql.log
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.zeppelin.python;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.io.Files;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
|
|
@ -60,7 +59,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
private static final int MAX_TIMEOUT_SEC = 30;
|
||||
|
||||
private GatewayServer gatewayServer;
|
||||
private PythonProcessLauncher pythonProcessLauncher;
|
||||
protected PythonProcessLauncher pythonProcessLauncher;
|
||||
private File pythonWorkDir;
|
||||
protected boolean useBuiltinPy4j = true;
|
||||
|
||||
|
|
@ -163,7 +162,6 @@ public class PythonInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public PythonProcessLauncher getPythonProcessLauncher() {
|
||||
return pythonProcessLauncher;
|
||||
}
|
||||
|
|
@ -572,7 +570,7 @@ public class PythonInterpreter extends Interpreter {
|
|||
LOGGER.debug("Python Process Output: " + message);
|
||||
}
|
||||
|
||||
class PythonProcessLauncher extends ProcessLauncher {
|
||||
public class PythonProcessLauncher extends ProcessLauncher {
|
||||
|
||||
PythonProcessLauncher(CommandLine commandLine, Map<String, String> envs) {
|
||||
super(commandLine, envs);
|
||||
|
|
|
|||
|
|
@ -146,7 +146,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
|
|||
assertEquals(Code.ERROR, result.code());
|
||||
output = context.out.toInterpreterResultMessage().get(0);
|
||||
assertTrue(output.getData(),
|
||||
output.getData().equals("Ipython kernel has been stopped. Please check logs. "
|
||||
output.getData().contains("Ipython kernel has been stopped. Please check logs. "
|
||||
+ "It might be because of an out of memory issue."));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.zeppelin.jupyter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
|
|
@ -213,7 +212,6 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
|
|||
return EnvironmentUtils.getProcEnvironment();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public JupyterKernelProcessLauncher getKernelProcessLauncher() {
|
||||
return jupyterKernelProcessLauncher;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue