mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into ZEPPELIN-1306
This commit is contained in:
commit
3127154ef5
30 changed files with 489 additions and 167 deletions
|
|
@ -34,7 +34,7 @@
|
|||
<description>Zeppelin flink support</description>
|
||||
|
||||
<properties>
|
||||
<flink.version>1.0.3</flink.version>
|
||||
<flink.version>1.1.2</flink.version>
|
||||
<flink.akka.version>2.3.7</flink.akka.version>
|
||||
<scala.macros.version>2.0.1</scala.macros.version>
|
||||
</properties>
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import java.util.*;
|
|||
import org.apache.flink.api.scala.FlinkILoop;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
|
||||
import org.apache.flink.runtime.util.EnvironmentInformation;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
|
|
@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import scala.Console;
|
||||
import scala.None;
|
||||
import scala.Option;
|
||||
import scala.Some;
|
||||
import scala.collection.JavaConversions;
|
||||
import scala.collection.immutable.Nil;
|
||||
|
|
@ -83,14 +85,25 @@ public class FlinkInterpreter extends Interpreter {
|
|||
startFlinkMiniCluster();
|
||||
}
|
||||
|
||||
flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, new PrintWriter(out));
|
||||
flinkIloop = new FlinkILoop(getHost(),
|
||||
getPort(),
|
||||
flinkConf,
|
||||
(BufferedReader) null,
|
||||
new PrintWriter(out));
|
||||
|
||||
flinkIloop.settings_$eq(createSettings());
|
||||
flinkIloop.createInterpreter();
|
||||
|
||||
|
||||
imain = flinkIloop.intp();
|
||||
|
||||
org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv();
|
||||
env.getConfig().disableSysoutLogging();
|
||||
org.apache.flink.api.scala.ExecutionEnvironment benv =
|
||||
flinkIloop.scalaBenv();
|
||||
//new ExecutionEnvironment(remoteBenv)
|
||||
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv =
|
||||
flinkIloop.scalaSenv();
|
||||
|
||||
senv.getConfig().disableSysoutLogging();
|
||||
benv.getConfig().disableSysoutLogging();
|
||||
|
||||
// prepare bindings
|
||||
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
|
||||
|
|
@ -100,13 +113,19 @@ public class FlinkInterpreter extends Interpreter {
|
|||
imain.interpret("import scala.tools.nsc.io._");
|
||||
imain.interpret("import Properties.userHome");
|
||||
imain.interpret("import scala.compat.Platform.EOL");
|
||||
|
||||
|
||||
imain.interpret("import org.apache.flink.api.scala._");
|
||||
imain.interpret("import org.apache.flink.api.common.functions._");
|
||||
|
||||
binder.put("env", env);
|
||||
imain.interpret("val env = _binder.get(\"env\").asInstanceOf["
|
||||
+ env.getClass().getName() + "]");
|
||||
|
||||
binder.put("benv", benv);
|
||||
imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf["
|
||||
+ benv.getClass().getName() + "]");
|
||||
|
||||
binder.put("senv", senv);
|
||||
imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf["
|
||||
+ senv.getClass().getName() + "]");
|
||||
|
||||
}
|
||||
|
||||
private boolean localMode() {
|
||||
|
|
@ -313,8 +332,6 @@ public class FlinkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
|
@ -354,4 +371,5 @@ public class FlinkInterpreter extends Interpreter {
|
|||
static final String toString(Object o) {
|
||||
return (o instanceof String) ? (String) o : "";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ public class FlinkInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void testWordCount() {
|
||||
flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
|
||||
flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context);
|
||||
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
|
||||
InterpreterResult result = flink.interpret("counts.print()", context);
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
|
|
|
|||
6
pom.xml
6
pom.xml
|
|
@ -531,7 +531,7 @@
|
|||
<profile>
|
||||
<id>scala-2.10</id>
|
||||
<activation>
|
||||
<property><name>!scala-2.11</name></property>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<properties>
|
||||
<scala.version>2.10.5</scala.version>
|
||||
|
|
@ -541,9 +541,6 @@
|
|||
|
||||
<profile>
|
||||
<id>scala-2.11</id>
|
||||
<activation>
|
||||
<property><name>scala-2.11</name></property>
|
||||
</activation>
|
||||
<properties>
|
||||
<scala.version>2.11.7</scala.version>
|
||||
<scala.binary.version>2.11</scala.binary.version>
|
||||
|
|
@ -828,5 +825,4 @@
|
|||
</profile>
|
||||
</profiles>
|
||||
|
||||
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -329,6 +329,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
setupConfForPySpark(conf);
|
||||
setupConfForSparkR(conf);
|
||||
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
|
||||
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
|
||||
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
|
||||
|
|
@ -443,6 +444,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
setupConfForPySpark(conf);
|
||||
setupConfForSparkR(conf);
|
||||
SparkContext sparkContext = new SparkContext(conf);
|
||||
return sparkContext;
|
||||
}
|
||||
|
|
@ -494,6 +496,35 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private void setupConfForSparkR(SparkConf conf) {
|
||||
String sparkRBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue();
|
||||
File sparkRPath;
|
||||
if (null == sparkRBasePath) {
|
||||
sparkRBasePath =
|
||||
new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue();
|
||||
sparkRPath = new File(sparkRBasePath,
|
||||
"interpreter" + File.separator + "spark" + File.separator + "R");
|
||||
} else {
|
||||
sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
|
||||
}
|
||||
|
||||
sparkRPath = new File(sparkRPath, "sparkr.zip");
|
||||
if (sparkRPath.exists() && sparkRPath.isFile()) {
|
||||
String archives = null;
|
||||
if (conf.contains("spark.yarn.dist.archives")) {
|
||||
archives = conf.get("spark.yarn.dist.archives");
|
||||
}
|
||||
if (archives != null) {
|
||||
archives = archives + "," + sparkRPath + "#sparkr";
|
||||
} else {
|
||||
archives = sparkRPath + "#sparkr";
|
||||
}
|
||||
conf.set("spark.yarn.dist.archives", archives);
|
||||
} else {
|
||||
logger.warn("sparkr.zip is not found, sparkr may not work.");
|
||||
}
|
||||
}
|
||||
|
||||
static final String toString(Object o) {
|
||||
return (o instanceof String) ? (String) o : "";
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,13 +52,19 @@ public class SparkVersion {
|
|||
if (pos > 0) {
|
||||
numberPart = versionString.substring(0, pos);
|
||||
}
|
||||
version = Integer.parseInt(numberPart.replaceAll("\\.", ""));
|
||||
|
||||
String versions[] = numberPart.split("\\.");
|
||||
int major = Integer.parseInt(versions[0]);
|
||||
int minor = Integer.parseInt(versions[1]);
|
||||
int patch = Integer.parseInt(versions[2]);
|
||||
// version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
|
||||
version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch));
|
||||
} catch (Exception e) {
|
||||
logger.error("Can not recognize Spark version " + versionString +
|
||||
". Assume it's a future release", e);
|
||||
|
||||
// assume it is future release
|
||||
version = 999;
|
||||
version = 99999;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
|
|||
# setup spark env
|
||||
assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
|
||||
assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
|
||||
if (version >= 200) {
|
||||
if (version >= 20000) {
|
||||
assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
|
||||
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -107,9 +107,9 @@ class PyZeppelinContext(dict):
|
|||
|
||||
|
||||
class SparkVersion(object):
|
||||
SPARK_1_4_0 = 140
|
||||
SPARK_1_3_0 = 130
|
||||
SPARK_2_0_0 = 200
|
||||
SPARK_1_4_0 = 10400
|
||||
SPARK_1_3_0 = 10300
|
||||
SPARK_2_0_0 = 20000
|
||||
|
||||
def __init__(self, versionNumber):
|
||||
self.version = versionNumber
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ public class SparkVersionTest {
|
|||
|
||||
@Test
|
||||
public void testUnknownSparkVersion() {
|
||||
assertEquals(999, SparkVersion.fromVersionString("DEV-10.10").toNumber());
|
||||
assertEquals(99999, SparkVersion.fromVersionString("DEV-10.10").toNumber());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -33,6 +33,8 @@ public class SparkVersionTest {
|
|||
assertFalse(SparkVersion.fromVersionString("1.5.9").isUnsupportedVersion());
|
||||
assertTrue(SparkVersion.fromVersionString("0.9.0").isUnsupportedVersion());
|
||||
assertTrue(SparkVersion.UNSUPPORTED_FUTURE_VERSION.isUnsupportedVersion());
|
||||
// should support spark2 version of HDP 2.5
|
||||
assertFalse(SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245").isUnsupportedVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -40,6 +42,9 @@ public class SparkVersionTest {
|
|||
// test equals
|
||||
assertEquals(SparkVersion.SPARK_1_2_0, SparkVersion.fromVersionString("1.2.0"));
|
||||
assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
|
||||
assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
|
||||
// test spark2 version of HDP 2.5
|
||||
assertEquals(SparkVersion.SPARK_2_0_0, SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245"));
|
||||
|
||||
// test newer than
|
||||
assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));
|
||||
|
|
@ -60,7 +65,7 @@ public class SparkVersionTest {
|
|||
assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0));
|
||||
|
||||
// conversion
|
||||
assertEquals(120, SparkVersion.SPARK_1_2_0.toNumber());
|
||||
assertEquals(10200, SparkVersion.SPARK_1_2_0.toNumber());
|
||||
assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,9 +95,6 @@
|
|||
<profiles>
|
||||
<profile>
|
||||
<id>scala-2.11</id>
|
||||
<activation>
|
||||
<property><name>scala-2.11</name></property>
|
||||
</activation>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang.modules</groupId>
|
||||
|
|
|
|||
|
|
@ -114,9 +114,6 @@
|
|||
<profiles>
|
||||
<profile>
|
||||
<id>scala-2.11</id>
|
||||
<activation>
|
||||
<property><name>scala-2.11</name></property>
|
||||
</activation>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ public class InterpreterProperty {
|
|||
}
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (o == null) return false;
|
||||
return this.toString().equals(o.toString());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -527,4 +527,11 @@ public class RemoteInterpreter extends Interpreter {
|
|||
public void setEnv(Map<String, String> env) {
|
||||
this.env = env;
|
||||
}
|
||||
|
||||
public void addEnv(Map<String, String> env) {
|
||||
if (this.env == null) {
|
||||
this.env = new HashMap<>();
|
||||
}
|
||||
this.env.putAll(env);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,91 +23,89 @@ import static org.junit.Assert.fail;
|
|||
import org.junit.Test;
|
||||
|
||||
|
||||
|
||||
|
||||
public class InterpreterResultTest {
|
||||
|
||||
@Test
|
||||
public void testTextType() {
|
||||
|
||||
InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"this is a TEXT type");
|
||||
assertEquals("No magic",InterpreterResult.Type.TEXT, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%this is a TEXT type");
|
||||
assertEquals("No magic",InterpreterResult.Type.TEXT, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%\n");
|
||||
assertEquals("No magic",InterpreterResult.Type.TEXT, result.type());
|
||||
}
|
||||
@Test
|
||||
public void testSimpleMagicType() {
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n");
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table\ncol1\tcol2\naaa\t123\n");
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
}
|
||||
@Test
|
||||
public void testTextType() {
|
||||
|
||||
public void testComplexMagicType() {
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("some text before magic return magic",InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%html <h3> This is a hack </h3> %table\n col1\tcol2\naaa\t123\n");
|
||||
assertEquals("magic A before magic B return magic A",InterpreterResult.Type.HTML, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
|
||||
assertEquals("text & magic A before magic B return magic A" ,InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
|
||||
assertEquals("magic A, magic B, magic a' return magic A" ,InterpreterResult.Type.TABLE, result.type());
|
||||
InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "this is a TEXT type");
|
||||
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%this is a TEXT type");
|
||||
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%\n");
|
||||
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testSimpleMagicType() {
|
||||
InterpreterResult result = null;
|
||||
|
||||
@Test
|
||||
public void testSimpleMagicData() {
|
||||
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("%table col1\tcol2\naaa\t123\n","col1\tcol2\naaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table\ncol1\tcol2\naaa\t123\n");
|
||||
assertEquals("%table\ncol1\tcol2\naaa\t123\n","col1\tcol2\naaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("some text before magic word %table col1\tcol2\naaa\t123\n","col1\tcol2\naaa\t123\n", result.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComplexMagicData() {
|
||||
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("text before %table return %table","col1\tcol2\naaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%html <h3> This is a hack </h3> %table\ncol1\tcol2\naaa\t123\n");
|
||||
assertEquals("%html before %table return %html"," <h3> This is a hack </h3> %table\n" +
|
||||
"col1\tcol2\n" +
|
||||
"aaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
|
||||
assertEquals("text & %table befoe %html return %table","col1\tcol2\n" +
|
||||
"aaa\t123\n" +
|
||||
" %html <h3> This is a hack </h3>", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
|
||||
assertEquals("%table, %html before %table return first %table","col1\tcol2\n" +
|
||||
"aaa\t123\n" +
|
||||
" %html <h3> This is a hack </h3> %table col1\n" +
|
||||
"aaa\n" +
|
||||
"123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n %table col1\naaa\n123\n");
|
||||
assertEquals("%table before %table return first %table","col1\tcol2\n" +
|
||||
"aaa\t123\n" +
|
||||
" %table col1\n" +
|
||||
"aaa\n" +
|
||||
"123\n", result.message());
|
||||
}
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n");
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table\ncol1\tcol2\naaa\t123\n");
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToString() {
|
||||
assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
|
||||
}
|
||||
public void testComplexMagicType() {
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("some text before magic return magic", InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3> %table\n col1\tcol2\naaa\t123\n");
|
||||
assertEquals("magic A before magic B return magic A", InterpreterResult.Type.HTML, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
|
||||
assertEquals("text & magic A before magic B return magic A", InterpreterResult.Type.TABLE, result.type());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
|
||||
assertEquals("magic A, magic B, magic a' return magic A", InterpreterResult.Type.TABLE, result.type());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleMagicData() {
|
||||
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("%table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table\ncol1\tcol2\naaa\t123\n");
|
||||
assertEquals("%table\ncol1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("some text before magic word %table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComplexMagicData() {
|
||||
|
||||
InterpreterResult result = null;
|
||||
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before %table col1\tcol2\naaa\t123\n");
|
||||
assertEquals("text before %table return %table", "col1\tcol2\naaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3> %table\ncol1\tcol2\naaa\t123\n");
|
||||
assertEquals("%html before %table return %html", " <h3> This is a hack </h3> %table\n" +
|
||||
"col1\tcol2\n" +
|
||||
"aaa\t123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
|
||||
assertEquals("text & %table befoe %html return %table", "col1\tcol2\n" +
|
||||
"aaa\t123\n" +
|
||||
" %html <h3> This is a hack </h3>", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
|
||||
assertEquals("%table, %html before %table return first %table", "col1\tcol2\n" +
|
||||
"aaa\t123\n" +
|
||||
" %html <h3> This is a hack </h3> %table col1\n" +
|
||||
"aaa\n" +
|
||||
"123\n", result.message());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %table col1\naaa\n123\n");
|
||||
assertEquals("%table before %table return first %table", "col1\tcol2\n" +
|
||||
"aaa\t123\n" +
|
||||
" %table col1\n" +
|
||||
"aaa\n" +
|
||||
"123\n", result.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToString() {
|
||||
assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -448,9 +448,6 @@
|
|||
<profiles>
|
||||
<profile>
|
||||
<id>scala-2.11</id>
|
||||
<activation>
|
||||
<property><name>scala-2.11</name></property>
|
||||
</activation>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
|||
|
|
@ -779,7 +779,8 @@ public class NotebookRestApi {
|
|||
LOG.info("Get notebook jobs for job manager");
|
||||
|
||||
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
|
||||
List<Map<String, Object>> notebookJobs = notebook.getJobListforNotebook(false, 0, subject);
|
||||
List<Map<String, Object>> notebookJobs = notebook
|
||||
.getJobListByUnixTime(false, 0, subject);
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
|
|
@ -791,6 +792,8 @@ public class NotebookRestApi {
|
|||
/**
|
||||
* Get updated notebook jobs for job manager
|
||||
*
|
||||
* Return the `Note` change information within the post unix timestamp.
|
||||
*
|
||||
* @return JSON with status.OK
|
||||
* @throws IOException, IllegalArgumentException
|
||||
*/
|
||||
|
|
@ -804,7 +807,7 @@ public class NotebookRestApi {
|
|||
|
||||
List<Map<String, Object>> notebookJobs;
|
||||
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
|
||||
notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime, subject);
|
||||
notebookJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, subject);
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
|
|
|
|||
|
|
@ -104,6 +104,7 @@ public class ZeppelinServer extends Application {
|
|||
heliumApplicationFactory.setApplicationEventListener(notebookWsServer);
|
||||
|
||||
notebook.addNotebookEventListener(heliumApplicationFactory);
|
||||
notebook.addNotebookEventListener(notebookWsServer.getNotebookInformationListener());
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
|
|
|||
|
|
@ -241,8 +241,8 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
case LIST_NOTEBOOK_JOBS:
|
||||
unicastNotebookJobInfo(conn, messagereceived);
|
||||
break;
|
||||
case LIST_UPDATE_NOTEBOOK_JOBS:
|
||||
unicastUpdateNotebookJobInfo(conn, messagereceived);
|
||||
case UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS:
|
||||
unsubscribeNotebookJobInfo(conn);
|
||||
break;
|
||||
case GET_INTERPRETER_BINDINGS:
|
||||
getInterpreterBindings(conn, messagereceived);
|
||||
|
|
@ -398,9 +398,10 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
|
||||
public void unicastNotebookJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
|
||||
|
||||
addConnectionToNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn);
|
||||
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
|
||||
List<Map<String, Object>> notebookJobs = notebook().getJobListforNotebook(false, 0, subject);
|
||||
List<Map<String, Object>> notebookJobs = notebook()
|
||||
.getJobListByUnixTime(false, 0, subject);
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
|
|
@ -410,21 +411,25 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
.put("notebookJobs", response)));
|
||||
}
|
||||
|
||||
public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message fromMessage)
|
||||
throws IOException {
|
||||
double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime");
|
||||
long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue();
|
||||
|
||||
List<Map<String, Object>> notebookJobs;
|
||||
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
|
||||
notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime, subject);
|
||||
public void broadcastUpdateNotebookJobInfo(long lastUpdateUnixTime) throws IOException {
|
||||
List<Map<String, Object>> notebookJobs = new LinkedList<>();
|
||||
Notebook notebookObject = notebook();
|
||||
List<Map<String, Object>> jobNotes = null;
|
||||
if (notebookObject != null) {
|
||||
jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, null);
|
||||
notebookJobs = jobNotes == null ? notebookJobs : jobNotes;
|
||||
}
|
||||
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
response.put("jobs", notebookJobs);
|
||||
response.put("jobs", notebookJobs != null ? notebookJobs : new LinkedList<>());
|
||||
|
||||
conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS)
|
||||
.put("notebookRunningJobs", response)));
|
||||
broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
|
||||
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
|
||||
}
|
||||
|
||||
public void unsubscribeNotebookJobInfo(NotebookSocket conn) {
|
||||
removeConnectionFromNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn);
|
||||
}
|
||||
|
||||
public void saveInterpreterBindings(NotebookSocket conn, Message fromMessage) {
|
||||
|
|
@ -719,7 +724,10 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
if (fromMessage != null) {
|
||||
String noteName = (String) ((Map) fromMessage.get("notebook")).get("name");
|
||||
String noteJson = gson.toJson(fromMessage.get("notebook"));
|
||||
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
|
||||
AuthenticationInfo subject = null;
|
||||
if (fromMessage.principal != null) {
|
||||
subject = new AuthenticationInfo(fromMessage.principal);
|
||||
}
|
||||
note = notebook.importNote(noteJson, noteName, subject);
|
||||
note.persist(subject);
|
||||
broadcastNote(note);
|
||||
|
|
@ -1291,6 +1299,113 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
broadcast(noteId, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notebook Information Change event
|
||||
*/
|
||||
public static class NotebookInformationListener implements NotebookEventListener {
|
||||
private NotebookServer notebookServer;
|
||||
|
||||
public NotebookInformationListener(NotebookServer notebookServer) {
|
||||
this.notebookServer = notebookServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onParagraphRemove(Paragraph p) {
|
||||
try {
|
||||
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("can not broadcast for job manager {}", ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoteRemove(Note note) {
|
||||
try {
|
||||
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("can not broadcast for job manager {}", ioe.getMessage());
|
||||
}
|
||||
|
||||
List<Map<String, Object>> notesInfo = new LinkedList<>();
|
||||
Map<String, Object> info = new HashMap<>();
|
||||
info.put("notebookId", note.getId());
|
||||
// set paragraphs
|
||||
List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
|
||||
|
||||
// notebook json object root information.
|
||||
info.put("isRunningJob", false);
|
||||
info.put("unixTimeLastRun", 0);
|
||||
info.put("isRemoved", true);
|
||||
info.put("paragraphs", paragraphsInfo);
|
||||
notesInfo.add(info);
|
||||
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
response.put("jobs", notesInfo);
|
||||
|
||||
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
|
||||
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onParagraphCreate(Paragraph p) {
|
||||
Notebook notebook = notebookServer.notebook();
|
||||
List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(
|
||||
p.getId()
|
||||
);
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
response.put("jobs", notebookJobs);
|
||||
|
||||
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
|
||||
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoteCreate(Note note) {
|
||||
Notebook notebook = notebookServer.notebook();
|
||||
List<Map<String, Object>> notebookJobs = notebook.getJobListBymNotebookId(
|
||||
note.getId()
|
||||
);
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
response.put("jobs", notebookJobs);
|
||||
|
||||
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
|
||||
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onParagraphStatusChange(Paragraph p, Status status) {
|
||||
Notebook notebook = notebookServer.notebook();
|
||||
List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(
|
||||
p.getId()
|
||||
);
|
||||
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
response.put("jobs", notebookJobs);
|
||||
|
||||
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
|
||||
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUnbindInterpreter(Note note, InterpreterSetting setting) {
|
||||
Notebook notebook = notebookServer.notebook();
|
||||
List<Map<String, Object>> notebookJobs = notebook.getJobListBymNotebookId(
|
||||
note.getId()
|
||||
);
|
||||
Map<String, Object> response = new HashMap<>();
|
||||
response.put("lastResponseUnixTime", System.currentTimeMillis());
|
||||
response.put("jobs", notebookJobs);
|
||||
|
||||
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
|
||||
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Need description here.
|
||||
*
|
||||
|
|
@ -1334,6 +1449,12 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
}
|
||||
}
|
||||
notebookServer.broadcastNote(note);
|
||||
|
||||
try {
|
||||
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000);
|
||||
} catch (IOException e) {
|
||||
LOG.error("can not broadcast for job manager {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1374,6 +1495,10 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
return new ParagraphListenerImpl(this, note);
|
||||
}
|
||||
|
||||
public NotebookEventListener getNotebookInformationListener() {
|
||||
return new NotebookInformationListener(this);
|
||||
}
|
||||
|
||||
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
|
||||
List<InterpreterSetting> settings =
|
||||
notebook().getInterpreterFactory().getInterpreterSettings(note.getId());
|
||||
|
|
|
|||
|
|
@ -32,14 +32,8 @@ angular.module('zeppelinWebApp')
|
|||
$scope.JobInfomationsByFilter = $scope.jobInfomations;
|
||||
|
||||
websocketMsgSrv.getNotebookJobsList();
|
||||
var refreshObj = $interval(function() {
|
||||
if ($scope.lastJobServerUnixTime !== undefined) {
|
||||
websocketMsgSrv.getUpdateNotebookJobsList($scope.lastJobServerUnixTime);
|
||||
}
|
||||
}, 1000);
|
||||
|
||||
$scope.$on('$destroy', function() {
|
||||
$interval.cancel(refreshObj);
|
||||
websocketMsgSrv.unsubscribeJobManager();
|
||||
});
|
||||
};
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ limitations under the License.
|
|||
Bind interpreter for this note.
|
||||
Click to Bind/Unbind interpreter.
|
||||
Drag and drop to reorder interpreters. <br />
|
||||
The first interpreter on the list becomes default. To create/remove interpreters, go to <a href="/#/interpreter">Interpreter</a> menu.
|
||||
The first interpreter on the list becomes default. To create/remove interpreters, go to <a href="#/interpreter">Interpreter</a> menu.
|
||||
</p>
|
||||
|
||||
<div class="interpreterSettings"
|
||||
|
|
|
|||
|
|
@ -195,7 +195,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope,
|
|||
},
|
||||
|
||||
unsubscribeJobManager: function() {
|
||||
websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_JOBMANAGER'});
|
||||
websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS'});
|
||||
},
|
||||
|
||||
getInterpreterBindings: function(noteID) {
|
||||
|
|
|
|||
|
|
@ -252,5 +252,14 @@
|
|||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<configuration combine.children="append">
|
||||
<forkMode>always</forkMode>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -672,7 +672,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
Interpreter interpreter;
|
||||
for (InterpreterInfo info : interpreterInfos) {
|
||||
if (option.isRemote()) {
|
||||
if (option.isConnectExistingProcess()) {
|
||||
if (option.isExistingProcess()) {
|
||||
interpreter =
|
||||
connectToRemoteRepl(noteId, info.getClassName(), option.getHost(), option.getPort(),
|
||||
properties);
|
||||
|
|
@ -1013,7 +1013,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener);
|
||||
remoteInterpreter.setEnv(env);
|
||||
remoteInterpreter.addEnv(env);
|
||||
|
||||
return new LazyOpenInterpreter(remoteInterpreter);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,10 +85,6 @@ public class InterpreterOption {
|
|||
this.perNoteSession = perNoteSession;
|
||||
}
|
||||
|
||||
public boolean isConnectExistingProcess() {
|
||||
return (host != null && port != -1);
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,15 +24,18 @@ import java.util.Collections;
|
|||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.stream.JsonReader;
|
||||
import org.apache.commons.codec.binary.StringUtils;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
import org.quartz.CronTrigger;
|
||||
import org.quartz.JobBuilder;
|
||||
|
|
@ -157,6 +160,11 @@ public class Notebook implements NoteEventListener {
|
|||
bindInterpretersToNote(note.getId(), interpreterIds);
|
||||
}
|
||||
|
||||
if (subject != null && !"anonymous".equals(subject.getUser())) {
|
||||
Set<String> owners = new HashSet<String>();
|
||||
owners.add(subject.getUser());
|
||||
notebookAuthorization.setOwners(note.getId(), owners);
|
||||
}
|
||||
notebookIndex.addIndexDoc(note);
|
||||
note.persist(subject);
|
||||
fireNoteCreateEvent(note);
|
||||
|
|
@ -468,7 +476,7 @@ public class Notebook implements NoteEventListener {
|
|||
if (notebookRepo instanceof NotebookRepoSync) {
|
||||
NotebookRepoSync mainRepo = (NotebookRepoSync) notebookRepo;
|
||||
if (mainRepo.getRepoCount() > 1) {
|
||||
mainRepo.sync();
|
||||
mainRepo.sync(subject);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -573,7 +581,78 @@ public class Notebook implements NoteEventListener {
|
|||
return lastRunningUnixTime;
|
||||
}
|
||||
|
||||
public List<Map<String, Object>> getJobListforNotebook(boolean needsReload,
|
||||
public List<Map<String, Object>> getJobListByParagraphId(String paragraphID) {
|
||||
String gotNoteId = null;
|
||||
List<Note> notes = getAllNotes();
|
||||
for (Note note : notes) {
|
||||
Paragraph p = note.getParagraph(paragraphID);
|
||||
if (p != null) {
|
||||
gotNoteId = note.getId();
|
||||
}
|
||||
}
|
||||
return getJobListBymNotebookId(gotNoteId);
|
||||
}
|
||||
|
||||
public List<Map<String, Object>> getJobListBymNotebookId(String notebookID) {
|
||||
final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";
|
||||
long lastRunningUnixTime = 0;
|
||||
boolean isNotebookRunning = false;
|
||||
Note jobNote = getNote(notebookID);
|
||||
List<Map<String, Object>> notesInfo = new LinkedList<>();
|
||||
if (jobNote == null) {
|
||||
return notesInfo;
|
||||
}
|
||||
|
||||
Map<String, Object> info = new HashMap<>();
|
||||
|
||||
info.put("notebookId", jobNote.getId());
|
||||
String notebookName = jobNote.getName();
|
||||
if (notebookName != null && !notebookName.equals("")) {
|
||||
info.put("notebookName", jobNote.getName());
|
||||
} else {
|
||||
info.put("notebookName", "Note " + jobNote.getId());
|
||||
}
|
||||
// set notebook type ( cron or normal )
|
||||
if (jobNote.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) && !jobNote.getConfig()
|
||||
.get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
|
||||
info.put("notebookType", "cron");
|
||||
} else {
|
||||
info.put("notebookType", "normal");
|
||||
}
|
||||
|
||||
// set paragraphs
|
||||
List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
|
||||
for (Paragraph paragraph : jobNote.getParagraphs()) {
|
||||
// check paragraph's status.
|
||||
if (paragraph.getStatus().isRunning()) {
|
||||
isNotebookRunning = true;
|
||||
}
|
||||
|
||||
// get data for the job manager.
|
||||
Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
|
||||
lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
|
||||
|
||||
paragraphsInfo.add(paragraphItem);
|
||||
}
|
||||
|
||||
// set interpreter bind type
|
||||
String interpreterGroupName = null;
|
||||
if (replFactory.getInterpreterSettings(jobNote.getId()) != null
|
||||
&& replFactory.getInterpreterSettings(jobNote.getId()).size() >= 1) {
|
||||
interpreterGroupName = replFactory.getInterpreterSettings(jobNote.getId()).get(0).getName();
|
||||
}
|
||||
|
||||
// notebook json object root information.
|
||||
info.put("interpreter", interpreterGroupName);
|
||||
info.put("isRunningJob", isNotebookRunning);
|
||||
info.put("unixTimeLastRun", lastRunningUnixTime);
|
||||
info.put("paragraphs", paragraphsInfo);
|
||||
notesInfo.add(info);
|
||||
|
||||
return notesInfo;
|
||||
};
|
||||
|
||||
public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload,
|
||||
long lastUpdateServerUnixTime, AuthenticationInfo subject) {
|
||||
final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";
|
||||
|
||||
|
|
|
|||
|
|
@ -93,7 +93,8 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
}
|
||||
if (getRepoCount() > 1) {
|
||||
try {
|
||||
sync(0, 1);
|
||||
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
|
||||
sync(0, 1, subject);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to sync with secondary storage on start {}", e);
|
||||
}
|
||||
|
|
@ -175,12 +176,12 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
|
||||
void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) throws IOException {
|
||||
LOG.info("Sync started");
|
||||
NotebookRepo srcRepo = getRepo(sourceRepoIndex);
|
||||
NotebookRepo dstRepo = getRepo(destRepoIndex);
|
||||
List <NoteInfo> srcNotes = srcRepo.list(null);
|
||||
List <NoteInfo> dstNotes = dstRepo.list(null);
|
||||
List <NoteInfo> srcNotes = srcRepo.list(subject);
|
||||
List <NoteInfo> dstNotes = dstRepo.list(subject);
|
||||
|
||||
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
|
||||
List<String> pushNoteIDs = noteIDs.get(pushKey);
|
||||
|
|
@ -192,7 +193,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (String id : pushNoteIDs) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
pushNotes(pushNoteIDs, srcRepo, dstRepo);
|
||||
pushNotes(subject, pushNoteIDs, srcRepo, dstRepo);
|
||||
} else {
|
||||
LOG.info("Nothing to push");
|
||||
}
|
||||
|
|
@ -202,7 +203,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (String id : pullNoteIDs) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
pushNotes(pullNoteIDs, dstRepo, srcRepo);
|
||||
pushNotes(subject, pullNoteIDs, dstRepo, srcRepo);
|
||||
} else {
|
||||
LOG.info("Nothing to pull");
|
||||
}
|
||||
|
|
@ -212,7 +213,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
for (String id : delDstNoteIDs) {
|
||||
LOG.info("ID : " + id);
|
||||
}
|
||||
deleteNotes(delDstNoteIDs, dstRepo);
|
||||
deleteNotes(subject, delDstNoteIDs, dstRepo);
|
||||
} else {
|
||||
LOG.info("Nothing to delete from dest");
|
||||
}
|
||||
|
|
@ -220,20 +221,21 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
LOG.info("Sync ended");
|
||||
}
|
||||
|
||||
public void sync() throws IOException {
|
||||
sync(0, 1);
|
||||
public void sync(AuthenticationInfo subject) throws IOException {
|
||||
sync(0, 1, subject);
|
||||
}
|
||||
|
||||
private void pushNotes(List<String> ids, NotebookRepo localRepo,
|
||||
private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo,
|
||||
NotebookRepo remoteRepo) throws IOException {
|
||||
for (String id : ids) {
|
||||
remoteRepo.save(localRepo.get(id, null), null);
|
||||
remoteRepo.save(localRepo.get(id, subject), subject);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteNotes(List<String> ids, NotebookRepo repo) throws IOException {
|
||||
private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo)
|
||||
throws IOException {
|
||||
for (String id : ids) {
|
||||
repo.remove(id, null);
|
||||
repo.remove(id, subject);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -127,8 +127,8 @@ public class Message {
|
|||
APP_STATUS_CHANGE, // [s-c] on app status change
|
||||
|
||||
LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations
|
||||
LIST_UPDATE_NOTEBOOK_JOBS, // [c-s] get job management informations for until unixtime
|
||||
// @param unixTime
|
||||
LIST_UPDATE_NOTEBOOK_JOBS, // [s-c] get job management informations
|
||||
UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS, // [c-s] unsubscribe job information for job management
|
||||
GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings
|
||||
// @param noteID
|
||||
SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import java.io.*;
|
|||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
|
@ -31,6 +33,7 @@ import org.apache.zeppelin.dep.Dependency;
|
|||
import org.apache.zeppelin.dep.DependencyResolver;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
||||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -52,7 +55,10 @@ public class InterpreterFactoryTest {
|
|||
tmpDir.mkdirs();
|
||||
new File(tmpDir, "conf").mkdirs();
|
||||
|
||||
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
|
||||
Map<String, InterpreterProperty> propertiesMockInterpreter1 = new HashMap<String, InterpreterProperty>();
|
||||
propertiesMockInterpreter1.put("PROPERTY_1", new InterpreterProperty("PROPERTY_1", "", "VALUE_1", "desc"));
|
||||
propertiesMockInterpreter1.put("property_2", new InterpreterProperty("", "property_2", "value_2", "desc"));
|
||||
MockInterpreter1.register("mock1", "mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1", propertiesMockInterpreter1);
|
||||
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
|
||||
|
|
@ -95,6 +101,29 @@ public class InterpreterFactoryTest {
|
|||
assertNull(mock1Setting.getInterpreterGroup("sharedProcess").get("session"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteRepl() throws Exception {
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver);
|
||||
List<InterpreterSetting> all = factory.get();
|
||||
InterpreterSetting mock1Setting = null;
|
||||
for (InterpreterSetting setting : all) {
|
||||
if (setting.getName().equals("mock1")) {
|
||||
mock1Setting = setting;
|
||||
break;
|
||||
}
|
||||
}
|
||||
InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("sharedProcess");
|
||||
factory.createInterpretersForNote(mock1Setting, "sharedProcess", "session");
|
||||
// get interpreter
|
||||
assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
|
||||
assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter);
|
||||
LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0));
|
||||
assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter();
|
||||
assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1"));
|
||||
assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFactoryDefaultList() throws IOException, RepositoryException {
|
||||
// get default settings
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ import org.apache.zeppelin.scheduler.Job;
|
|||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.user.Credentials;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
|
@ -209,6 +210,18 @@ public class NotebookTest implements JobListenerFactory{
|
|||
assertEquals(1, notebook2.getAllNotes().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateNoteWithSubject() throws IOException, SchedulerException, RepositoryException {
|
||||
AuthenticationInfo subject = new AuthenticationInfo("user1");
|
||||
Note note = notebook.createNote(subject);
|
||||
|
||||
assertNotNull(notebook.getNotebookAuthorization().getOwners(note.getId()));
|
||||
assertEquals(1, notebook.getNotebookAuthorization().getOwners(note.getId()).size());
|
||||
Set<String> owners = new HashSet<>();
|
||||
owners.add("user1");
|
||||
assertEquals(owners, notebook.getNotebookAuthorization().getOwners(note.getId()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearParagraphOutput() throws IOException, SchedulerException{
|
||||
Note note = notebook.createNote(null);
|
||||
|
|
@ -351,7 +364,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
|
||||
@Test
|
||||
public void testExportAndImportNote() throws IOException, CloneNotSupportedException,
|
||||
InterruptedException {
|
||||
InterruptedException, InterpreterException, SchedulerException, RepositoryException {
|
||||
Note note = notebook.createNote(null);
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
|
|
@ -374,11 +387,20 @@ public class NotebookTest implements JobListenerFactory{
|
|||
assertEquals(p.getId(), p2.getId());
|
||||
assertEquals(p.text, p2.text);
|
||||
assertEquals(p.getResult().message(), p2.getResult().message());
|
||||
|
||||
// Verify import note with subject
|
||||
AuthenticationInfo subject = new AuthenticationInfo("user1");
|
||||
Note importedNote2 = notebook.importNote(exportedNoteJson, "Title2", subject);
|
||||
assertNotNull(notebook.getNotebookAuthorization().getOwners(importedNote2.getId()));
|
||||
assertEquals(1, notebook.getNotebookAuthorization().getOwners(importedNote2.getId()).size());
|
||||
Set<String> owners = new HashSet<>();
|
||||
owners.add("user1");
|
||||
assertEquals(owners, notebook.getNotebookAuthorization().getOwners(importedNote2.getId()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloneNote() throws IOException, CloneNotSupportedException,
|
||||
InterruptedException {
|
||||
InterruptedException, InterpreterException, SchedulerException, RepositoryException {
|
||||
Note note = notebook.createNote(null);
|
||||
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
|
|
@ -396,6 +418,15 @@ public class NotebookTest implements JobListenerFactory{
|
|||
assertEquals(cp.getId(), p.getId());
|
||||
assertEquals(cp.text, p.text);
|
||||
assertEquals(cp.getResult().message(), p.getResult().message());
|
||||
|
||||
// Verify clone note with subject
|
||||
AuthenticationInfo subject = new AuthenticationInfo("user1");
|
||||
Note cloneNote2 = notebook.cloneNote(note.getId(), "clone note2", subject);
|
||||
assertNotNull(notebook.getNotebookAuthorization().getOwners(cloneNote2.getId()));
|
||||
assertEquals(1, notebook.getNotebookAuthorization().getOwners(cloneNote2.getId()).size());
|
||||
Set<String> owners = new HashSet<>();
|
||||
owners.add("user1");
|
||||
assertEquals(owners, notebook.getNotebookAuthorization().getOwners(cloneNote2.getId()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -185,7 +185,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
assertEquals(0, notebookRepoSync.get(1,
|
||||
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());
|
||||
/* apply sync */
|
||||
notebookRepoSync.sync();
|
||||
notebookRepoSync.sync(null);
|
||||
/* check whether added to second storage */
|
||||
assertEquals(1, notebookRepoSync.get(1,
|
||||
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());
|
||||
|
|
|
|||
Loading…
Reference in a new issue