Fix unittest and update comment

This commit is contained in:
Lee moon soo 2015-05-11 19:55:53 +02:00
parent 27fc306a35
commit ebbd0dabc8
4 changed files with 17 additions and 58 deletions

View file

@ -27,7 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* The class override execute() method to create an PlanExecutor with
* jar file that packages classes from scala compiler.
*/
public class FlinkEnvironment extends ExecutionEnvironment {
Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
@ -79,52 +80,4 @@ public class FlinkEnvironment extends ExecutionEnvironment {
return jsonPlan;
}
/*
private File createJar() throws IOException {
// create execution environment
File jarFile = new File(System.getProperty("java.io.tmpdir")
+ "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar");
File[] classFiles = classDir.listFiles();
if (classFiles == null) {
return null;
}
byte buffer[] = new byte[10240];
// Open archive file
FileOutputStream stream = new FileOutputStream(jarFile);
JarOutputStream out = new JarOutputStream(stream, new Manifest());
for (int i = 0; i < classFiles.length; i++) {
File classFile = classFiles[i];
if (classFiles == null || !classFile.exists()
|| classFile.isDirectory())
continue;
// Add class
JarEntry jarAdd = new JarEntry(classFile.getName());
jarAdd.setTime(classFile.lastModified());
out.putNextEntry(jarAdd);
logger.info("add class {} into jar", classFile);
// Write file to archive
FileInputStream in = new FileInputStream(classFile);
while (true) {
int nRead = in.read(buffer, 0, buffer.length);
if (nRead <= 0)
break;
out.write(buffer, 0, nRead);
}
in.close();
}
out.close();
stream.close();
return jarFile;
}
*/
}

View file

@ -17,6 +17,7 @@ import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.IMain;
/**
* Scala compiler
*/
public class FlinkIMain extends IMain {
Logger logger = LoggerFactory.getLogger(FlinkIMain.class);

View file

@ -11,6 +11,9 @@ import java.util.jar.JarInputStream;
import java.util.jar.JarOutputStream;
/**
* This class copied from flink-scala-shell. Once the flink-0.9 is published in
* the maven repository, this class can be removed
*
* Provides utility services for jarring and unjarring files and directories.
* Note that a given instance of JarHelper is not threadsafe with respect to
* multiple jar operations.

View file

@ -23,25 +23,26 @@ import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.Before;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class FlinkInterpreterTest {
private FlinkInterpreter flink;
private InterpreterContext context;
private static FlinkInterpreter flink;
private static InterpreterContext context;
@Before
public void setUp() {
@BeforeClass
public static void setUp() {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null);
}
@After
public void tearDown() {
@AfterClass
public static void tearDown() {
flink.close();
flink.destroy();
}
@ -53,12 +54,13 @@ public class FlinkInterpreterTest {
assertEquals("1", result.message());
}
@Test
public void testWordCount() {
flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
flink.interpret("counts.print()", context);
InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
assertEquals("", result.message());
assertEquals(Code.SUCCESS, result.code());
}
}