Merge branch 'master' into ZEPPELIN-1306

This commit is contained in:
astroshim 2016-09-16 22:18:35 +09:00
commit 3127154ef5
30 changed files with 489 additions and 167 deletions

View file

@ -34,7 +34,7 @@
<description>Zeppelin flink support</description>
<properties>
<flink.version>1.0.3</flink.version>
<flink.version>1.1.2</flink.version>
<flink.akka.version>2.3.7</flink.akka.version>
<scala.macros.version>2.0.1</scala.macros.version>
</properties>

View file

@ -30,6 +30,7 @@ import java.util.*;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
import scala.Console;
import scala.None;
import scala.Option;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.Nil;
@ -83,14 +85,25 @@ public class FlinkInterpreter extends Interpreter {
startFlinkMiniCluster();
}
flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, new PrintWriter(out));
flinkIloop = new FlinkILoop(getHost(),
getPort(),
flinkConf,
(BufferedReader) null,
new PrintWriter(out));
flinkIloop.settings_$eq(createSettings());
flinkIloop.createInterpreter();
imain = flinkIloop.intp();
org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv();
env.getConfig().disableSysoutLogging();
org.apache.flink.api.scala.ExecutionEnvironment benv =
flinkIloop.scalaBenv();
//new ExecutionEnvironment(remoteBenv)
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv =
flinkIloop.scalaSenv();
senv.getConfig().disableSysoutLogging();
benv.getConfig().disableSysoutLogging();
// prepare bindings
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@ -100,13 +113,19 @@ public class FlinkInterpreter extends Interpreter {
imain.interpret("import scala.tools.nsc.io._");
imain.interpret("import Properties.userHome");
imain.interpret("import scala.compat.Platform.EOL");
imain.interpret("import org.apache.flink.api.scala._");
imain.interpret("import org.apache.flink.api.common.functions._");
binder.put("env", env);
imain.interpret("val env = _binder.get(\"env\").asInstanceOf["
+ env.getClass().getName() + "]");
binder.put("benv", benv);
imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf["
+ benv.getClass().getName() + "]");
binder.put("senv", senv);
imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf["
+ senv.getClass().getName() + "]");
}
private boolean localMode() {
@ -313,8 +332,6 @@ public class FlinkInterpreter extends Interpreter {
}
}
@Override
public void cancel(InterpreterContext context) {
}
@ -354,4 +371,5 @@ public class FlinkInterpreter extends Interpreter {
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}
}

View file

@ -81,7 +81,7 @@ public class FlinkInterpreterTest {
@Test
public void testWordCount() {
flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context);
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
InterpreterResult result = flink.interpret("counts.print()", context);
assertEquals(Code.SUCCESS, result.code());

View file

@ -531,7 +531,7 @@
<profile>
<id>scala-2.10</id>
<activation>
<property><name>!scala-2.11</name></property>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scala.version>2.10.5</scala.version>
@ -541,9 +541,6 @@
<profile>
<id>scala-2.11</id>
<activation>
<property><name>scala-2.11</name></property>
</activation>
<properties>
<scala.version>2.11.7</scala.version>
<scala.binary.version>2.11</scala.binary.version>
@ -828,5 +825,4 @@
</profile>
</profiles>
</project>

View file

@ -329,6 +329,7 @@ public class SparkInterpreter extends Interpreter {
}
setupConfForPySpark(conf);
setupConfForSparkR(conf);
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
@ -443,6 +444,7 @@ public class SparkInterpreter extends Interpreter {
}
}
setupConfForPySpark(conf);
setupConfForSparkR(conf);
SparkContext sparkContext = new SparkContext(conf);
return sparkContext;
}
@ -494,6 +496,35 @@ public class SparkInterpreter extends Interpreter {
}
}
private void setupConfForSparkR(SparkConf conf) {
String sparkRBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue();
File sparkRPath;
if (null == sparkRBasePath) {
sparkRBasePath =
new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue();
sparkRPath = new File(sparkRBasePath,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
}
sparkRPath = new File(sparkRPath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
String archives = null;
if (conf.contains("spark.yarn.dist.archives")) {
archives = conf.get("spark.yarn.dist.archives");
}
if (archives != null) {
archives = archives + "," + sparkRPath + "#sparkr";
} else {
archives = sparkRPath + "#sparkr";
}
conf.set("spark.yarn.dist.archives", archives);
} else {
logger.warn("sparkr.zip is not found, sparkr may not work.");
}
}
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}

View file

@ -52,13 +52,19 @@ public class SparkVersion {
if (pos > 0) {
numberPart = versionString.substring(0, pos);
}
version = Integer.parseInt(numberPart.replaceAll("\\.", ""));
String versions[] = numberPart.split("\\.");
int major = Integer.parseInt(versions[0]);
int minor = Integer.parseInt(versions[1]);
int patch = Integer.parseInt(versions[2]);
// version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch));
} catch (Exception e) {
logger.error("Can not recognize Spark version " + versionString +
". Assume it's a future release", e);
// assume it is future release
version = 999;
version = 99999;
}
}

View file

@ -42,7 +42,7 @@ assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
# setup spark env
assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv)
assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
if (version >= 200) {
if (version >= 20000) {
assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
}

View file

@ -107,9 +107,9 @@ class PyZeppelinContext(dict):
class SparkVersion(object):
SPARK_1_4_0 = 140
SPARK_1_3_0 = 130
SPARK_2_0_0 = 200
SPARK_1_4_0 = 10400
SPARK_1_3_0 = 10300
SPARK_2_0_0 = 20000
def __init__(self, versionNumber):
self.version = versionNumber

View file

@ -24,7 +24,7 @@ public class SparkVersionTest {
@Test
public void testUnknownSparkVersion() {
assertEquals(999, SparkVersion.fromVersionString("DEV-10.10").toNumber());
assertEquals(99999, SparkVersion.fromVersionString("DEV-10.10").toNumber());
}
@Test
@ -33,6 +33,8 @@ public class SparkVersionTest {
assertFalse(SparkVersion.fromVersionString("1.5.9").isUnsupportedVersion());
assertTrue(SparkVersion.fromVersionString("0.9.0").isUnsupportedVersion());
assertTrue(SparkVersion.UNSUPPORTED_FUTURE_VERSION.isUnsupportedVersion());
// should support spark2 version of HDP 2.5
assertFalse(SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245").isUnsupportedVersion());
}
@Test
@ -40,6 +42,9 @@ public class SparkVersionTest {
// test equals
assertEquals(SparkVersion.SPARK_1_2_0, SparkVersion.fromVersionString("1.2.0"));
assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
// test spark2 version of HDP 2.5
assertEquals(SparkVersion.SPARK_2_0_0, SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245"));
// test newer than
assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));
@ -60,7 +65,7 @@ public class SparkVersionTest {
assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0));
// conversion
assertEquals(120, SparkVersion.SPARK_1_2_0.toNumber());
assertEquals(10200, SparkVersion.SPARK_1_2_0.toNumber());
assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString());
}
}

View file

@ -95,9 +95,6 @@
<profiles>
<profile>
<id>scala-2.11</id>
<activation>
<property><name>scala-2.11</name></property>
</activation>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>

View file

@ -114,9 +114,6 @@
<profiles>
<profile>
<id>scala-2.11</id>
<activation>
<property><name>scala-2.11</name></property>
</activation>
<dependencyManagement>
<dependencies>
<dependency>

View file

@ -75,6 +75,7 @@ public class InterpreterProperty {
}
public boolean equals(Object o) {
if (o == null) return false;
return this.toString().equals(o.toString());
}

View file

@ -527,4 +527,11 @@ public class RemoteInterpreter extends Interpreter {
public void setEnv(Map<String, String> env) {
this.env = env;
}
public void addEnv(Map<String, String> env) {
if (this.env == null) {
this.env = new HashMap<>();
}
this.env.putAll(env);
}
}

View file

@ -23,91 +23,89 @@ import static org.junit.Assert.fail;
import org.junit.Test;
public class InterpreterResultTest {
@Test
public void testTextType() {
InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"this is a TEXT type");
assertEquals("No magic",InterpreterResult.Type.TEXT, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%this is a TEXT type");
assertEquals("No magic",InterpreterResult.Type.TEXT, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%\n");
assertEquals("No magic",InterpreterResult.Type.TEXT, result.type());
}
@Test
public void testSimpleMagicType() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table\ncol1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
}
@Test
public void testTextType() {
public void testComplexMagicType() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before %table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic return magic",InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%html <h3> This is a hack </h3> %table\n col1\tcol2\naaa\t123\n");
assertEquals("magic A before magic B return magic A",InterpreterResult.Type.HTML, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
assertEquals("text & magic A before magic B return magic A" ,InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
assertEquals("magic A, magic B, magic a' return magic A" ,InterpreterResult.Type.TABLE, result.type());
InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "this is a TEXT type");
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%this is a TEXT type");
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%\n");
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
}
}
@Test
public void testSimpleMagicType() {
InterpreterResult result = null;
@Test
public void testSimpleMagicData() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n");
assertEquals("%table col1\tcol2\naaa\t123\n","col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table\ncol1\tcol2\naaa\t123\n");
assertEquals("%table\ncol1\tcol2\naaa\t123\n","col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic word %table col1\tcol2\naaa\t123\n","col1\tcol2\naaa\t123\n", result.message());
}
@Test
public void testComplexMagicData() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before %table col1\tcol2\naaa\t123\n");
assertEquals("text before %table return %table","col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%html <h3> This is a hack </h3> %table\ncol1\tcol2\naaa\t123\n");
assertEquals("%html before %table return %html"," <h3> This is a hack </h3> %table\n" +
"col1\tcol2\n" +
"aaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
assertEquals("text & %table befoe %html return %table","col1\tcol2\n" +
"aaa\t123\n" +
" %html <h3> This is a hack </h3>", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
assertEquals("%table, %html before %table return first %table","col1\tcol2\n" +
"aaa\t123\n" +
" %html <h3> This is a hack </h3> %table col1\n" +
"aaa\n" +
"123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS,"%table col1\tcol2\naaa\t123\n %table col1\naaa\n123\n");
assertEquals("%table before %table return first %table","col1\tcol2\n" +
"aaa\t123\n" +
" %table col1\n" +
"aaa\n" +
"123\n", result.message());
}
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table\ncol1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
}
@Test
public void testToString() {
assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
}
public void testComplexMagicType() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before %table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic return magic", InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3> %table\n col1\tcol2\naaa\t123\n");
assertEquals("magic A before magic B return magic A", InterpreterResult.Type.HTML, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
assertEquals("text & magic A before magic B return magic A", InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
assertEquals("magic A, magic B, magic a' return magic A", InterpreterResult.Type.TABLE, result.type());
}
@Test
public void testSimpleMagicData() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n");
assertEquals("%table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table\ncol1\tcol2\naaa\t123\n");
assertEquals("%table\ncol1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic word %table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
}
@Test
public void testComplexMagicData() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before %table col1\tcol2\naaa\t123\n");
assertEquals("text before %table return %table", "col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3> %table\ncol1\tcol2\naaa\t123\n");
assertEquals("%html before %table return %html", " <h3> This is a hack </h3> %table\n" +
"col1\tcol2\n" +
"aaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
assertEquals("text & %table befoe %html return %table", "col1\tcol2\n" +
"aaa\t123\n" +
" %html <h3> This is a hack </h3>", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
assertEquals("%table, %html before %table return first %table", "col1\tcol2\n" +
"aaa\t123\n" +
" %html <h3> This is a hack </h3> %table col1\n" +
"aaa\n" +
"123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %table col1\naaa\n123\n");
assertEquals("%table before %table return first %table", "col1\tcol2\n" +
"aaa\t123\n" +
" %table col1\n" +
"aaa\n" +
"123\n", result.message());
}
@Test
public void testToString() {
assertEquals("%html hello", new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html hello").toString());
}
}

View file

@ -448,9 +448,6 @@
<profiles>
<profile>
<id>scala-2.11</id>
<activation>
<property><name>scala-2.11</name></property>
</activation>
<dependencyManagement>
<dependencies>
<dependency>

View file

@ -779,7 +779,8 @@ public class NotebookRestApi {
LOG.info("Get notebook jobs for job manager");
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
List<Map<String, Object>> notebookJobs = notebook.getJobListforNotebook(false, 0, subject);
List<Map<String, Object>> notebookJobs = notebook
.getJobListByUnixTime(false, 0, subject);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
@ -791,6 +792,8 @@ public class NotebookRestApi {
/**
* Get updated notebook jobs for job manager
*
* Return the `Note` change information within the post unix timestamp.
*
* @return JSON with status.OK
* @throws IOException, IllegalArgumentException
*/
@ -804,7 +807,7 @@ public class NotebookRestApi {
List<Map<String, Object>> notebookJobs;
AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime, subject);
notebookJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, subject);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());

View file

@ -104,6 +104,7 @@ public class ZeppelinServer extends Application {
heliumApplicationFactory.setApplicationEventListener(notebookWsServer);
notebook.addNotebookEventListener(heliumApplicationFactory);
notebook.addNotebookEventListener(notebookWsServer.getNotebookInformationListener());
}
public static void main(String[] args) throws InterruptedException {

View file

@ -241,8 +241,8 @@ public class NotebookServer extends WebSocketServlet implements
case LIST_NOTEBOOK_JOBS:
unicastNotebookJobInfo(conn, messagereceived);
break;
case LIST_UPDATE_NOTEBOOK_JOBS:
unicastUpdateNotebookJobInfo(conn, messagereceived);
case UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS:
unsubscribeNotebookJobInfo(conn);
break;
case GET_INTERPRETER_BINDINGS:
getInterpreterBindings(conn, messagereceived);
@ -398,9 +398,10 @@ public class NotebookServer extends WebSocketServlet implements
}
public void unicastNotebookJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
addConnectionToNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn);
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
List<Map<String, Object>> notebookJobs = notebook().getJobListforNotebook(false, 0, subject);
List<Map<String, Object>> notebookJobs = notebook()
.getJobListByUnixTime(false, 0, subject);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
@ -410,21 +411,25 @@ public class NotebookServer extends WebSocketServlet implements
.put("notebookJobs", response)));
}
public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message fromMessage)
throws IOException {
double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime");
long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue();
List<Map<String, Object>> notebookJobs;
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime, subject);
public void broadcastUpdateNotebookJobInfo(long lastUpdateUnixTime) throws IOException {
List<Map<String, Object>> notebookJobs = new LinkedList<>();
Notebook notebookObject = notebook();
List<Map<String, Object>> jobNotes = null;
if (notebookObject != null) {
jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, null);
notebookJobs = jobNotes == null ? notebookJobs : jobNotes;
}
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);
response.put("jobs", notebookJobs != null ? notebookJobs : new LinkedList<>());
conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS)
.put("notebookRunningJobs", response)));
broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
}
public void unsubscribeNotebookJobInfo(NotebookSocket conn) {
removeConnectionFromNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn);
}
public void saveInterpreterBindings(NotebookSocket conn, Message fromMessage) {
@ -719,7 +724,10 @@ public class NotebookServer extends WebSocketServlet implements
if (fromMessage != null) {
String noteName = (String) ((Map) fromMessage.get("notebook")).get("name");
String noteJson = gson.toJson(fromMessage.get("notebook"));
AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
AuthenticationInfo subject = null;
if (fromMessage.principal != null) {
subject = new AuthenticationInfo(fromMessage.principal);
}
note = notebook.importNote(noteJson, noteName, subject);
note.persist(subject);
broadcastNote(note);
@ -1291,6 +1299,113 @@ public class NotebookServer extends WebSocketServlet implements
broadcast(noteId, msg);
}
/**
* Notebook Information Change event
*/
public static class NotebookInformationListener implements NotebookEventListener {
private NotebookServer notebookServer;
public NotebookInformationListener(NotebookServer notebookServer) {
this.notebookServer = notebookServer;
}
@Override
public void onParagraphRemove(Paragraph p) {
try {
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000);
} catch (IOException ioe) {
LOG.error("can not broadcast for job manager {}", ioe.getMessage());
}
}
@Override
public void onNoteRemove(Note note) {
try {
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000);
} catch (IOException ioe) {
LOG.error("can not broadcast for job manager {}", ioe.getMessage());
}
List<Map<String, Object>> notesInfo = new LinkedList<>();
Map<String, Object> info = new HashMap<>();
info.put("notebookId", note.getId());
// set paragraphs
List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
// notebook json object root information.
info.put("isRunningJob", false);
info.put("unixTimeLastRun", 0);
info.put("isRemoved", true);
info.put("paragraphs", paragraphsInfo);
notesInfo.add(info);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notesInfo);
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
}
@Override
public void onParagraphCreate(Paragraph p) {
Notebook notebook = notebookServer.notebook();
List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(
p.getId()
);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
}
@Override
public void onNoteCreate(Note note) {
Notebook notebook = notebookServer.notebook();
List<Map<String, Object>> notebookJobs = notebook.getJobListBymNotebookId(
note.getId()
);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
}
@Override
public void onParagraphStatusChange(Paragraph p, Status status) {
Notebook notebook = notebookServer.notebook();
List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(
p.getId()
);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
}
@Override
public void onUnbindInterpreter(Note note, InterpreterSetting setting) {
Notebook notebook = notebookServer.notebook();
List<Map<String, Object>> notebookJobs = notebook.getJobListBymNotebookId(
note.getId()
);
Map<String, Object> response = new HashMap<>();
response.put("lastResponseUnixTime", System.currentTimeMillis());
response.put("jobs", notebookJobs);
notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
}
}
/**
* Need description here.
*
@ -1334,6 +1449,12 @@ public class NotebookServer extends WebSocketServlet implements
}
}
notebookServer.broadcastNote(note);
try {
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000);
} catch (IOException e) {
LOG.error("can not broadcast for job manager {}", e);
}
}
/**
@ -1374,6 +1495,10 @@ public class NotebookServer extends WebSocketServlet implements
return new ParagraphListenerImpl(this, note);
}
public NotebookEventListener getNotebookInformationListener() {
return new NotebookInformationListener(this);
}
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
List<InterpreterSetting> settings =
notebook().getInterpreterFactory().getInterpreterSettings(note.getId());

View file

@ -32,14 +32,8 @@ angular.module('zeppelinWebApp')
$scope.JobInfomationsByFilter = $scope.jobInfomations;
websocketMsgSrv.getNotebookJobsList();
var refreshObj = $interval(function() {
if ($scope.lastJobServerUnixTime !== undefined) {
websocketMsgSrv.getUpdateNotebookJobsList($scope.lastJobServerUnixTime);
}
}, 1000);
$scope.$on('$destroy', function() {
$interval.cancel(refreshObj);
websocketMsgSrv.unsubscribeJobManager();
});
};

View file

@ -26,7 +26,7 @@ limitations under the License.
Bind interpreter for this note.
Click to Bind/Unbind interpreter.
Drag and drop to reorder interpreters. <br />
The first interpreter on the list becomes default. To create/remove interpreters, go to <a href="/#/interpreter">Interpreter</a> menu.
The first interpreter on the list becomes default. To create/remove interpreters, go to <a href="#/interpreter">Interpreter</a> menu.
</p>
<div class="interpreterSettings"

View file

@ -195,7 +195,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope,
},
unsubscribeJobManager: function() {
websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_JOBMANAGER'});
websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS'});
},
getInterpreterBindings: function(noteID) {

View file

@ -252,5 +252,14 @@
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration combine.children="append">
<forkMode>always</forkMode>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -672,7 +672,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
Interpreter interpreter;
for (InterpreterInfo info : interpreterInfos) {
if (option.isRemote()) {
if (option.isConnectExistingProcess()) {
if (option.isExistingProcess()) {
interpreter =
connectToRemoteRepl(noteId, info.getClassName(), option.getHost(), option.getPort(),
properties);
@ -1013,7 +1013,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener);
remoteInterpreter.setEnv(env);
remoteInterpreter.addEnv(env);
return new LazyOpenInterpreter(remoteInterpreter);
}

View file

@ -85,10 +85,6 @@ public class InterpreterOption {
this.perNoteSession = perNoteSession;
}
public boolean isConnectExistingProcess() {
return (host != null && port != -1);
}
public String getHost() {
return host;
}

View file

@ -24,15 +24,18 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.commons.codec.binary.StringUtils;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
@ -157,6 +160,11 @@ public class Notebook implements NoteEventListener {
bindInterpretersToNote(note.getId(), interpreterIds);
}
if (subject != null && !"anonymous".equals(subject.getUser())) {
Set<String> owners = new HashSet<String>();
owners.add(subject.getUser());
notebookAuthorization.setOwners(note.getId(), owners);
}
notebookIndex.addIndexDoc(note);
note.persist(subject);
fireNoteCreateEvent(note);
@ -468,7 +476,7 @@ public class Notebook implements NoteEventListener {
if (notebookRepo instanceof NotebookRepoSync) {
NotebookRepoSync mainRepo = (NotebookRepoSync) notebookRepo;
if (mainRepo.getRepoCount() > 1) {
mainRepo.sync();
mainRepo.sync(subject);
}
}
@ -573,7 +581,78 @@ public class Notebook implements NoteEventListener {
return lastRunningUnixTime;
}
public List<Map<String, Object>> getJobListforNotebook(boolean needsReload,
public List<Map<String, Object>> getJobListByParagraphId(String paragraphID) {
String gotNoteId = null;
List<Note> notes = getAllNotes();
for (Note note : notes) {
Paragraph p = note.getParagraph(paragraphID);
if (p != null) {
gotNoteId = note.getId();
}
}
return getJobListBymNotebookId(gotNoteId);
}
public List<Map<String, Object>> getJobListBymNotebookId(String notebookID) {
final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";
long lastRunningUnixTime = 0;
boolean isNotebookRunning = false;
Note jobNote = getNote(notebookID);
List<Map<String, Object>> notesInfo = new LinkedList<>();
if (jobNote == null) {
return notesInfo;
}
Map<String, Object> info = new HashMap<>();
info.put("notebookId", jobNote.getId());
String notebookName = jobNote.getName();
if (notebookName != null && !notebookName.equals("")) {
info.put("notebookName", jobNote.getName());
} else {
info.put("notebookName", "Note " + jobNote.getId());
}
// set notebook type ( cron or normal )
if (jobNote.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) && !jobNote.getConfig()
.get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
info.put("notebookType", "cron");
} else {
info.put("notebookType", "normal");
}
// set paragraphs
List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
for (Paragraph paragraph : jobNote.getParagraphs()) {
// check paragraph's status.
if (paragraph.getStatus().isRunning()) {
isNotebookRunning = true;
}
// get data for the job manager.
Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph);
lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
paragraphsInfo.add(paragraphItem);
}
// set interpreter bind type
String interpreterGroupName = null;
if (replFactory.getInterpreterSettings(jobNote.getId()) != null
&& replFactory.getInterpreterSettings(jobNote.getId()).size() >= 1) {
interpreterGroupName = replFactory.getInterpreterSettings(jobNote.getId()).get(0).getName();
}
// notebook json object root information.
info.put("interpreter", interpreterGroupName);
info.put("isRunningJob", isNotebookRunning);
info.put("unixTimeLastRun", lastRunningUnixTime);
info.put("paragraphs", paragraphsInfo);
notesInfo.add(info);
return notesInfo;
};
public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload,
long lastUpdateServerUnixTime, AuthenticationInfo subject) {
final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";

View file

@ -93,7 +93,8 @@ public class NotebookRepoSync implements NotebookRepo {
}
if (getRepoCount() > 1) {
try {
sync(0, 1);
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
sync(0, 1, subject);
} catch (IOException e) {
LOG.warn("Failed to sync with secondary storage on start {}", e);
}
@ -175,12 +176,12 @@ public class NotebookRepoSync implements NotebookRepo {
*
* @throws IOException
*/
void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) throws IOException {
LOG.info("Sync started");
NotebookRepo srcRepo = getRepo(sourceRepoIndex);
NotebookRepo dstRepo = getRepo(destRepoIndex);
List <NoteInfo> srcNotes = srcRepo.list(null);
List <NoteInfo> dstNotes = dstRepo.list(null);
List <NoteInfo> srcNotes = srcRepo.list(subject);
List <NoteInfo> dstNotes = dstRepo.list(subject);
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
List<String> pushNoteIDs = noteIDs.get(pushKey);
@ -192,7 +193,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : pushNoteIDs) {
LOG.info("ID : " + id);
}
pushNotes(pushNoteIDs, srcRepo, dstRepo);
pushNotes(subject, pushNoteIDs, srcRepo, dstRepo);
} else {
LOG.info("Nothing to push");
}
@ -202,7 +203,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : pullNoteIDs) {
LOG.info("ID : " + id);
}
pushNotes(pullNoteIDs, dstRepo, srcRepo);
pushNotes(subject, pullNoteIDs, dstRepo, srcRepo);
} else {
LOG.info("Nothing to pull");
}
@ -212,7 +213,7 @@ public class NotebookRepoSync implements NotebookRepo {
for (String id : delDstNoteIDs) {
LOG.info("ID : " + id);
}
deleteNotes(delDstNoteIDs, dstRepo);
deleteNotes(subject, delDstNoteIDs, dstRepo);
} else {
LOG.info("Nothing to delete from dest");
}
@ -220,20 +221,21 @@ public class NotebookRepoSync implements NotebookRepo {
LOG.info("Sync ended");
}
public void sync() throws IOException {
sync(0, 1);
public void sync(AuthenticationInfo subject) throws IOException {
sync(0, 1, subject);
}
private void pushNotes(List<String> ids, NotebookRepo localRepo,
private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo,
NotebookRepo remoteRepo) throws IOException {
for (String id : ids) {
remoteRepo.save(localRepo.get(id, null), null);
remoteRepo.save(localRepo.get(id, subject), subject);
}
}
private void deleteNotes(List<String> ids, NotebookRepo repo) throws IOException {
private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo)
throws IOException {
for (String id : ids) {
repo.remove(id, null);
repo.remove(id, subject);
}
}

View file

@ -127,8 +127,8 @@ public class Message {
APP_STATUS_CHANGE, // [s-c] on app status change
LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations
LIST_UPDATE_NOTEBOOK_JOBS, // [c-s] get job management informations for until unixtime
// @param unixTime
LIST_UPDATE_NOTEBOOK_JOBS, // [s-c] get job management informations
UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS, // [c-s] unsubscribe job information for job management
GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings
// @param noteID
SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings

View file

@ -21,6 +21,8 @@ import java.io.*;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
@ -31,6 +33,7 @@ import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -52,7 +55,10 @@ public class InterpreterFactoryTest {
tmpDir.mkdirs();
new File(tmpDir, "conf").mkdirs();
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
Map<String, InterpreterProperty> propertiesMockInterpreter1 = new HashMap<String, InterpreterProperty>();
propertiesMockInterpreter1.put("PROPERTY_1", new InterpreterProperty("PROPERTY_1", "", "VALUE_1", "desc"));
propertiesMockInterpreter1.put("property_2", new InterpreterProperty("", "property_2", "value_2", "desc"));
MockInterpreter1.register("mock1", "mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1", propertiesMockInterpreter1);
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
@ -95,6 +101,29 @@ public class InterpreterFactoryTest {
assertNull(mock1Setting.getInterpreterGroup("sharedProcess").get("session"));
}
@Test
public void testRemoteRepl() throws Exception {
factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver);
List<InterpreterSetting> all = factory.get();
InterpreterSetting mock1Setting = null;
for (InterpreterSetting setting : all) {
if (setting.getName().equals("mock1")) {
mock1Setting = setting;
break;
}
}
InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("sharedProcess");
factory.createInterpretersForNote(mock1Setting, "sharedProcess", "session");
// get interpreter
assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter);
LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0));
assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter);
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter();
assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1"));
assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
}
@Test
public void testFactoryDefaultList() throws IOException, RepositoryException {
// get default settings

View file

@ -42,6 +42,7 @@ import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.junit.After;
import org.junit.Before;
@ -209,6 +210,18 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(1, notebook2.getAllNotes().size());
}
@Test
public void testCreateNoteWithSubject() throws IOException, SchedulerException, RepositoryException {
AuthenticationInfo subject = new AuthenticationInfo("user1");
Note note = notebook.createNote(subject);
assertNotNull(notebook.getNotebookAuthorization().getOwners(note.getId()));
assertEquals(1, notebook.getNotebookAuthorization().getOwners(note.getId()).size());
Set<String> owners = new HashSet<>();
owners.add("user1");
assertEquals(owners, notebook.getNotebookAuthorization().getOwners(note.getId()));
}
@Test
public void testClearParagraphOutput() throws IOException, SchedulerException{
Note note = notebook.createNote(null);
@ -351,7 +364,7 @@ public class NotebookTest implements JobListenerFactory{
@Test
public void testExportAndImportNote() throws IOException, CloneNotSupportedException,
InterruptedException {
InterruptedException, InterpreterException, SchedulerException, RepositoryException {
Note note = notebook.createNote(null);
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
@ -374,11 +387,20 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(p.getId(), p2.getId());
assertEquals(p.text, p2.text);
assertEquals(p.getResult().message(), p2.getResult().message());
// Verify import note with subject
AuthenticationInfo subject = new AuthenticationInfo("user1");
Note importedNote2 = notebook.importNote(exportedNoteJson, "Title2", subject);
assertNotNull(notebook.getNotebookAuthorization().getOwners(importedNote2.getId()));
assertEquals(1, notebook.getNotebookAuthorization().getOwners(importedNote2.getId()).size());
Set<String> owners = new HashSet<>();
owners.add("user1");
assertEquals(owners, notebook.getNotebookAuthorization().getOwners(importedNote2.getId()));
}
@Test
public void testCloneNote() throws IOException, CloneNotSupportedException,
InterruptedException {
InterruptedException, InterpreterException, SchedulerException, RepositoryException {
Note note = notebook.createNote(null);
factory.setInterpreters(note.getId(), factory.getDefaultInterpreterSettingList());
@ -396,6 +418,15 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(cp.getId(), p.getId());
assertEquals(cp.text, p.text);
assertEquals(cp.getResult().message(), p.getResult().message());
// Verify clone note with subject
AuthenticationInfo subject = new AuthenticationInfo("user1");
Note cloneNote2 = notebook.cloneNote(note.getId(), "clone note2", subject);
assertNotNull(notebook.getNotebookAuthorization().getOwners(cloneNote2.getId()));
assertEquals(1, notebook.getNotebookAuthorization().getOwners(cloneNote2.getId()).size());
Set<String> owners = new HashSet<>();
owners.add("user1");
assertEquals(owners, notebook.getNotebookAuthorization().getOwners(cloneNote2.getId()));
}
@Test

View file

@ -185,7 +185,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
assertEquals(0, notebookRepoSync.get(1,
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());
/* apply sync */
notebookRepoSync.sync();
notebookRepoSync.sync(null);
/* check whether added to second storage */
assertEquals(1, notebookRepoSync.get(1,
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());