mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Fix unittest and update comment
This commit is contained in:
parent
27fc306a35
commit
ebbd0dabc8
4 changed files with 17 additions and 58 deletions
|
|
@ -27,7 +27,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* The class override execute() method to create an PlanExecutor with
|
||||
* jar file that packages classes from scala compiler.
|
||||
*/
|
||||
public class FlinkEnvironment extends ExecutionEnvironment {
|
||||
Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
|
||||
|
|
@ -79,52 +80,4 @@ public class FlinkEnvironment extends ExecutionEnvironment {
|
|||
|
||||
return jsonPlan;
|
||||
}
|
||||
|
||||
/*
|
||||
private File createJar() throws IOException {
|
||||
// create execution environment
|
||||
File jarFile = new File(System.getProperty("java.io.tmpdir")
|
||||
+ "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar");
|
||||
|
||||
|
||||
File[] classFiles = classDir.listFiles();
|
||||
if (classFiles == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
byte buffer[] = new byte[10240];
|
||||
// Open archive file
|
||||
FileOutputStream stream = new FileOutputStream(jarFile);
|
||||
JarOutputStream out = new JarOutputStream(stream, new Manifest());
|
||||
|
||||
for (int i = 0; i < classFiles.length; i++) {
|
||||
File classFile = classFiles[i];
|
||||
if (classFiles == null || !classFile.exists()
|
||||
|| classFile.isDirectory())
|
||||
continue;
|
||||
|
||||
|
||||
// Add class
|
||||
JarEntry jarAdd = new JarEntry(classFile.getName());
|
||||
jarAdd.setTime(classFile.lastModified());
|
||||
out.putNextEntry(jarAdd);
|
||||
logger.info("add class {} into jar", classFile);
|
||||
|
||||
// Write file to archive
|
||||
FileInputStream in = new FileInputStream(classFile);
|
||||
while (true) {
|
||||
int nRead = in.read(buffer, 0, buffer.length);
|
||||
if (nRead <= 0)
|
||||
break;
|
||||
out.write(buffer, 0, nRead);
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
|
||||
out.close();
|
||||
stream.close();
|
||||
return jarFile;
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import scala.tools.nsc.Settings;
|
|||
import scala.tools.nsc.interpreter.IMain;
|
||||
|
||||
/**
|
||||
* Scala compiler
|
||||
*/
|
||||
public class FlinkIMain extends IMain {
|
||||
Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
|
||||
|
|
|
|||
|
|
@ -11,6 +11,9 @@ import java.util.jar.JarInputStream;
|
|||
import java.util.jar.JarOutputStream;
|
||||
|
||||
/**
|
||||
* This class copied from flink-scala-shell. Once the flink-0.9 is published in
|
||||
* the maven repository, this class can be removed
|
||||
*
|
||||
* Provides utility services for jarring and unjarring files and directories.
|
||||
* Note that a given instance of JarHelper is not threadsafe with respect to
|
||||
* multiple jar operations.
|
||||
|
|
|
|||
|
|
@ -23,25 +23,26 @@ import java.util.Properties;
|
|||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FlinkInterpreterTest {
|
||||
|
||||
private FlinkInterpreter flink;
|
||||
private InterpreterContext context;
|
||||
private static FlinkInterpreter flink;
|
||||
private static InterpreterContext context;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
Properties p = new Properties();
|
||||
flink = new FlinkInterpreter(p);
|
||||
flink.open();
|
||||
context = new InterpreterContext(null, null, null, null, null, null, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
flink.close();
|
||||
flink.destroy();
|
||||
}
|
||||
|
|
@ -53,12 +54,13 @@ public class FlinkInterpreterTest {
|
|||
assertEquals("1", result.message());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWordCount() {
|
||||
flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
|
||||
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
|
||||
flink.interpret("counts.print()", context);
|
||||
InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
|
||||
assertEquals("", result.message());
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue