First commit

This commit is contained in:
conker84 2017-06-25 01:24:09 +02:00
parent 300c9ef162
commit 35b4e29de8
11 changed files with 1144 additions and 2 deletions

137
neo4j/pom.xml Normal file
View file

@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-neo4j</artifactId>
<packaging>jar</packaging>
<version>0.8.0-SNAPSHOT</version>
<name>Zeppelin: Neo4j interpreter</name>
<properties>
<neo4j.driver.version>1.4.0</neo4j.driver.version>
<test.neo4j.kernel.version>3.2.1</test.neo4j.kernel.version>
<neo4j.version>3.2.1</neo4j.version>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>${neo4j.driver.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.neo4j.test</groupId>
<artifactId>neo4j-harness</artifactId>
<version>${neo4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/neo4j</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/neo4j</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,356 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.graph.neo4j;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.graph.neo4j.utils.Neo4jConversionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.graph.GraphResult;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.types.Node;
import org.neo4j.driver.v1.types.Relationship;
import org.neo4j.driver.v1.types.TypeSystem;
import org.neo4j.driver.v1.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Neo4j interpreter for Zeppelin.
*/
public class Neo4jCypherInterpreter extends Interpreter {
static final Logger LOGGER = LoggerFactory.getLogger(Neo4jCypherInterpreter.class);
public static final String NEO4J_SERVER_URL = "neo4j.url";
public static final String NEO4J_AUTH_TYPE = "neo4j.auth.type";
public static final String NEO4J_AUTH_USER = "neo4j.auth.user";
public static final String NEO4J_AUTH_PASSWORD = "neo4j.auth.password";
public static final String NEO4J_MAX_CONCURRENCY = "neo4j.max.concurrency";
private static final String TABLE = "%table";
public static final String NEW_LINE = "\n";
public static final String TAB = "\t";
private static final Pattern PROPERTY_PATTERN = Pattern.compile("\\{\\w+\\}");
private static final String REPLACE_CURLY_BRACKETS = "\\{|\\}";
private static final Pattern $_PATTERN = Pattern.compile("\\$\\w+\\}");
private static final String REPLACE_$ = "\\$";
private static final String MAP_KEY_TEMPLATE = "%s.%s";
private static final String ARAY_KEY_TEMPLATE = "%s[%d]";
/**
*
* Enum type for the AuthToken
*
*/
public enum Neo4jAuthType {NONE, BASIC}
private Driver driver = null;
private Map<String, String> labels;
private Set<String> types;
public Neo4jCypherInterpreter(Properties properties) {
super(properties);
}
private Driver getDriver() {
if (driver == null) {
Config config = Config.build()
.withMaxIdleSessions(Integer.parseInt(getProperty(NEO4J_MAX_CONCURRENCY)))
.toConfig();
String authType = getProperty(NEO4J_AUTH_TYPE);
AuthToken authToken = null;
switch (Neo4jAuthType.valueOf(authType.toUpperCase())) {
case BASIC:
authToken = AuthTokens.basic(getProperty(NEO4J_AUTH_USER),
getProperty(NEO4J_AUTH_PASSWORD));
break;
case NONE:
authToken = AuthTokens.none();
break;
default:
throw new RuntimeException("Neo4j authentication type not supported");
}
driver = GraphDatabase.driver(getProperty(NEO4J_SERVER_URL), authToken, config);
}
return driver;
}
@Override
public void open() {
getDriver();
}
@Override
public void close() {
getDriver().close();
}
public Map<String, String> getLabels(boolean refresh) {
if (labels == null || refresh) {
Map<String, String> old = labels == null ?
new LinkedHashMap<String, String>() : new LinkedHashMap<>(labels);
labels = new LinkedHashMap<>();
try (Session session = getDriver().session()) {
StatementResult result = session.run("CALL db.labels()");
Set<String> colors = new HashSet<>();
while (result.hasNext()) {
Record record = result.next();
String label = record.get("label").asString();
String color = old.get(label);
while (color == null || colors.contains(color)) {
color = Neo4jConversionUtils.getRandomLabelColor();
}
colors.add(color);
labels.put(label, color);
}
}
}
return labels;
}
private Set<String> getTypes(boolean refresh) {
if (types == null || refresh) {
types = new HashSet<>();
try (Session session = getDriver().session()) {
StatementResult result = session.run("CALL db.relationshipTypes()");
while (result.hasNext()) {
Record record = result.next();
types.add(record.get("relationshipType").asString());
}
}
}
return types;
}
@Override
public InterpreterResult interpret(String cypherQuery, InterpreterContext interpreterContext) {
logger.info("Opening session");
if (StringUtils.isEmpty(cypherQuery)) {
return new InterpreterResult(Code.ERROR, "Cypher query is Empty");
}
try (Session session = getDriver().session()){
StatementResult result = execute(session, cypherQuery, interpreterContext);
Set<Node> nodes = new HashSet<>();
Set<Relationship> relationships = new HashSet<>();
List<String> columns = new ArrayList<>();
List<List<String>> lines = new ArrayList<List<String>>();
while (result.hasNext()) {
Record record = result.next();
List<Pair<String, Value>> fields = record.fields();
List<String> line = new ArrayList<>();
for (Pair<String, Value> field : fields) {
if (field.value().hasType(session.typeSystem().NODE())) {
nodes.add(field.value().asNode());
} else if (field.value().hasType(session.typeSystem().RELATIONSHIP())) {
relationships.add(field.value().asRelationship());
} else if (field.value().hasType(session.typeSystem().PATH())) {
nodes.addAll(Iterables.asList(field.value().asPath().nodes()));
relationships.addAll(Iterables.asList(field.value().asPath().relationships()));
} else if (field.value().hasType(session.typeSystem().LIST())) {
List<Object> list = field.value().asList();
for (Object elem : list) {
List<String> lineList = new ArrayList<>();
setTabularResult(field.key(), elem, columns, lineList, session.typeSystem());
if (!lineList.isEmpty()) {
lines.add(lineList);
}
}
} else {
setTabularResult(field.key(), field.value(), columns, line, session.typeSystem());
}
}
if (!line.isEmpty()) {
lines.add(line);
}
}
if (!nodes.isEmpty()) {
return renderGraph(nodes, relationships);
} else {
return renderTable(columns, lines);
}
} catch (Exception e) {
logger.error("Exception while interpreting cypher query", e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
private void setTabularResult(String key, Object obj, List<String> columns, List<String> line,
TypeSystem typeSystem) {
if (obj instanceof Value) {
Value value = (Value) obj;
if (value.hasType(typeSystem.MAP())) {
Map<String, Object> map = value.asMap();
for (Entry<String, Object> entry : map.entrySet()) {
setTabularResult(String.format(MAP_KEY_TEMPLATE, key, entry.getKey()), entry.getValue(),
columns, line, typeSystem);
}
} else if (value.hasType(typeSystem.LIST())) {
List<Object> list = value.asList();
for (int i = 0; i < list.size(); i++) {
Object elem = list.get(i);
setTabularResult(String.format(ARAY_KEY_TEMPLATE, key, i), elem, columns,
line, typeSystem);
}
} else {
addLine(key, columns, line, value);
}
} else if (obj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) obj;
for (Entry<String, Object> entry : map.entrySet()) {
setTabularResult(String.format(MAP_KEY_TEMPLATE, key, entry.getKey()), entry.getValue(),
columns, line, typeSystem);
}
} else if (obj instanceof List) {
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) obj;
for (int i = 0; i < list.size(); i++) {
Object elem = list.get(i);
setTabularResult(String.format(ARAY_KEY_TEMPLATE, key, i), elem, columns, line, typeSystem);
}
} else {
addLine(key, columns, line, obj);
}
}
private void addLine(String key, List<String> columns, List<String> line, Object value) {
if (!columns.contains(key)) {
columns.add(key);
}
int position = columns.indexOf(key);
if (line.size() < columns.size()) {
for (int i = line.size(); i < columns.size(); i++) {
line.add(null);
}
}
line.set(position, value == null ? null : value.toString());
}
private StatementResult execute(Session session, String cypherQuery,
InterpreterContext interpreterContext) {
Map<String, Object> params = new HashMap<>();
ResourcePool resourcePool = interpreterContext.getResourcePool();
Set<String> keys = extractParams(cypherQuery, PROPERTY_PATTERN, REPLACE_CURLY_BRACKETS);
keys.addAll(extractParams(cypherQuery, $_PATTERN, REPLACE_$));
for (String key : keys) {
Resource resource = resourcePool.get(key);
if (resource != null) {
params.put(key, resource.get());
}
}
logger.info("Executing cypher query {} with params {}", cypherQuery, params);
return params.isEmpty() ? session.run(cypherQuery) : session.run(cypherQuery, params);
}
private Set<String> extractParams(String cypherQuery, Pattern pattern, String replaceChar) {
Matcher matcher = pattern.matcher(cypherQuery);
Set<String> keys = new HashSet<>();
while (matcher.find()) {
keys.add(matcher.group().replaceAll(replaceChar, StringUtils.EMPTY));
}
return keys;
}
private InterpreterResult renderTable(List<String> cols, List<List<String>> lines) {
logger.info("Executing renderTable method");
StringBuilder msg = new StringBuilder(TABLE);
msg.append(NEW_LINE);
msg.append(StringUtils.join(cols, TAB));
msg.append(NEW_LINE);
for (List<String> line : lines) {
if (line.size() < cols.size()) {
for (int i = line.size(); i < cols.size(); i++) {
line.add(null);
}
}
msg.append(StringUtils.join(line, TAB));
msg.append(NEW_LINE);
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
}
private InterpreterResult renderGraph(Set<Node> nodes,
Set<Relationship> relationships) {
logger.info("Executing renderGraph method");
List<org.apache.zeppelin.tabledata.Node> nodesList = new ArrayList<>();
List<org.apache.zeppelin.tabledata.Relationship> relsList = new ArrayList<>();
for (Relationship rel : relationships) {
relsList.add(Neo4jConversionUtils.toZeppelinRelationship(rel));
}
Map<String, String> labels = getLabels(true);
for (Node node : nodes) {
nodesList.add(Neo4jConversionUtils.toZeppelinNode(node, labels));
}
return new GraphResult(Code.SUCCESS,
new GraphResult.Graph(nodesList, relsList, labels, getTypes(true), true));
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton()
.createOrGetParallelScheduler(Neo4jCypherInterpreter.class.getName() + this.hashCode(),
Integer.parseInt(getProperty(NEO4J_MAX_CONCURRENCY)));
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public void cancel(InterpreterContext context) {
}
}

View file

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.graph.neo4j.utils;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.neo4j.driver.v1.types.Node;
import org.neo4j.driver.v1.types.Relationship;
/**
* Neo4jConversionUtils
*/
public class Neo4jConversionUtils {
private Neo4jConversionUtils() {}
private static final String[] LETTERS = "0123456789ABCDEF".split("");
public static final String COLOR_GREY = "#D3D3D3";
public static org.apache.zeppelin.tabledata.Node toZeppelinNode(Node n,
Map<String, String> graphLabels) {
Set<String> labels = new LinkedHashSet<>();
String firstLabel = null;
for (String label : n.labels()) {
if (firstLabel == null) {
firstLabel = label;
}
labels.add(label);
}
return new org.apache.zeppelin.tabledata.Node(n.id(), n.asMap(),
labels);
}
public static org.apache.zeppelin.tabledata.Relationship
toZeppelinRelationship(Relationship r) {
return new org.apache.zeppelin.tabledata.Relationship(r.id(), r.asMap(),
r.startNodeId(), r.endNodeId(), r.type());
}
public static String getRandomLabelColor() {
char[] color = new char[7];
color[0] = '#';
for (int i = 1; i < color.length; i++) {
color[i] = LETTERS[(int) Math.floor(Math.random() * 16)].charAt(0);
}
return new String(color);
}
}

View file

@ -0,0 +1,43 @@
[
{
"group": "neo4j",
"name": "neo4j",
"className": "org.apache.zeppelin.graph.neo4j.Neo4jCypherInterpreter",
"properties": {
"neo4j.url": {
"envName": null,
"propertyName": "neo4j.url",
"defaultValue": "bolt://localhost:7687",
"description": "The Neo4j's BOLT url."
},
"neo4j.auth.type": {
"envName": null,
"propertyName": "neo4j.auth.type",
"defaultValue": "BASIC",
"description": "The Neo4j's authentication type (NONE, BASIC)."
},
"neo4j.auth.user": {
"envName": null,
"propertyName": "neo4j.auth.user",
"defaultValue": "neo4j",
"description": "The Neo4j user name."
},
"neo4j.auth.password": {
"envName": null,
"propertyName": "neo4j.auth.password",
"defaultValue": "neo4j",
"description": "The Neo4j user password."
},
"neo4j.max.concurrency": {
"envName": null,
"propertyName": "neo4j.max.concurrency",
"defaultValue": "50",
"description": "Max concurrency call from Zeppelin to Neo4j server."
}
},
"editor": {
"language": "cypher",
"editOnDblClick": false
}
}
]

View file

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.graph.neo4j;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.graph.neo4j.Neo4jCypherInterpreter.Neo4jAuthType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.graph.GraphResult;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.neo4j.harness.ServerControls;
import org.neo4j.harness.TestServerBuilders;
import com.google.gson.Gson;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class Neo4jCypherInterpreterTest {
private Neo4jCypherInterpreter interpreter;
private InterpreterContext context;
private static ServerControls server;
private static final Gson gson = new Gson();
private static final String LABEL_PERSON = "Person";
private static final String REL_KNOWS = "KNOWS";
private static final String CYPHER_FOREACH = "FOREACH (x in range(1,1000) | CREATE (:%s{name: \"name\" + x, age: %s}))";
private static final String CHPHER_UNWIND = "UNWIND range(1,1000) as x "
+ "MATCH (n), (m) WHERE id(n) = x AND id(m) = toInt(rand() * 1000) "
+ "CREATE (n)-[:%s]->(m)";
private static final String QUERY_TEMPLATE_SIMPLE_OBJECT = "RETURN %s as object";
@BeforeClass
public static void setUpNeo4jServer() throws Exception {
server = TestServerBuilders.newInProcessBuilder()
.withConfig("dbms.security.auth_enabled","false")
.withFixture(String.format(CYPHER_FOREACH, LABEL_PERSON, "x % 10"))
.withFixture(String.format(CHPHER_UNWIND, REL_KNOWS))
.newServer();
}
@AfterClass
public static void tearDownNeo4jServer() throws Exception {
server.close();
}
@Before
public void setUpZeppelin() {
Properties p = new Properties();
p.setProperty(Neo4jCypherInterpreter.NEO4J_SERVER_URL, server.boltURI().toString());
p.setProperty(Neo4jCypherInterpreter.NEO4J_AUTH_TYPE, Neo4jAuthType.NONE.toString());
p.setProperty(Neo4jCypherInterpreter.NEO4J_MAX_CONCURRENCY, "50");
interpreter = new Neo4jCypherInterpreter(p);
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(new InterpreterGroup().getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(null));
}
@After
public void tearDownZeppelin() throws Exception {
interpreter.close();
}
@Test
public void testRenderTable() {
interpreter.open();
InterpreterResult result = interpreter.interpret("MATCH (n:Person) "
+ "WHERE n.name IN ['name1', 'name2', 'name3'] "
+ "RETURN n.name AS name, n.age AS age", context);
assertEquals(Code.SUCCESS, result.code());
final String tableResult = "name\tage\n\"name1\"\t1\n\"name2\"\t2\n\"name3\"\t3\n";
assertEquals(tableResult, result.toString().replace("%table ", StringUtils.EMPTY));
}
@Test
public void testRenderArray() {
interpreter.open();
final String array = "[0, 1, 2]";
final String tableResult = "object\n0\n1\n2\n";
InterpreterResult result = interpreter.interpret(String.format(QUERY_TEMPLATE_SIMPLE_OBJECT, array), context);
assertEquals(Code.SUCCESS, result.code());
assertEquals(tableResult, result.toString().replace("%table ", StringUtils.EMPTY));
}
@Test
public void testRenderMap() {
interpreter.open();
final String json = "{key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]}";
final String objectKey = "object.key";
final String objectListKey0Inner = "object.listKey[0].inner";
final String objectListKey1Inner = "object.listKey[1].inner";
InterpreterResult result = interpreter.interpret(String.format(QUERY_TEMPLATE_SIMPLE_OBJECT, json), context);
assertEquals(Code.SUCCESS, result.code());
String[] rows = result.toString().replace("%table ", StringUtils.EMPTY).split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 2);
List<String> header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB));
assertEquals(header.contains(objectKey), true);
assertEquals(header.contains(objectListKey0Inner), true);
assertEquals(header.contains(objectListKey1Inner), true);
List<String> row = Arrays.asList(rows[1].split(Neo4jCypherInterpreter.TAB));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value");
assertEquals(row.get(header.indexOf(objectListKey0Inner)), "Map1");
assertEquals(row.get(header.indexOf(objectListKey1Inner)), "Map2");
final String jsonList = "[{key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]},"
+ "{key: \"value2\", listKey: [{inner: \"Map12\"}, {inner: \"Map22\"}]}]";
result = interpreter.interpret(String.format(QUERY_TEMPLATE_SIMPLE_OBJECT, jsonList), context);
assertEquals(Code.SUCCESS, result.code());
rows = result.toString().replace("%table ", StringUtils.EMPTY).split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 3);
header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB));
assertEquals(header.contains(objectKey), true);
assertEquals(header.contains(objectListKey0Inner), true);
assertEquals(header.contains(objectListKey1Inner), true);
row = Arrays.asList(rows[1].split(Neo4jCypherInterpreter.TAB));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value");
assertEquals(row.get(header.indexOf(objectListKey0Inner)), "Map1");
assertEquals(row.get(header.indexOf(objectListKey1Inner)), "Map2");
row = Arrays.asList(rows[2].split(Neo4jCypherInterpreter.TAB));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value2");
assertEquals(row.get(header.indexOf(objectListKey0Inner)), "Map12");
assertEquals(row.get(header.indexOf(objectListKey1Inner)), "Map22");
final String jsonListWithNull = "[{key: \"value\", listKey: [{inner: \"Map1\"}, {inner: \"Map2\"}]},"
+ "{key: \"value2\", listKey: null}]";
final String objectListKey = "object.listKey";
result = interpreter.interpret(String.format(QUERY_TEMPLATE_SIMPLE_OBJECT, jsonListWithNull), context);
assertEquals(Code.SUCCESS, result.code());
rows = result.toString().replace("%table ", StringUtils.EMPTY).split(Neo4jCypherInterpreter.NEW_LINE);
assertEquals(rows.length, 3);
header = Arrays.asList(rows[0].split(Neo4jCypherInterpreter.TAB, -1));
assertEquals(header.contains(objectKey), true);
assertEquals(header.contains(objectListKey0Inner), true);
assertEquals(header.contains(objectListKey1Inner), true);
assertEquals(header.contains(objectListKey), true);
row = Arrays.asList(rows[1].split(Neo4jCypherInterpreter.TAB, -1));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value");
assertEquals(row.get(header.indexOf(objectListKey0Inner)), "Map1");
assertEquals(row.get(header.indexOf(objectListKey1Inner)), "Map2");
assertEquals(row.get(header.indexOf(objectListKey)), "");
row = Arrays.asList(rows[2].split(Neo4jCypherInterpreter.TAB, -1));
assertEquals(row.size(), header.size());
assertEquals(row.get(header.indexOf(objectKey)), "value2");
assertEquals(row.get(header.indexOf(objectListKey0Inner)), "");
assertEquals(row.get(header.indexOf(objectListKey1Inner)), "");
assertEquals(row.get(header.indexOf(objectListKey)), "");
}
@Test
public void testRenderNetwork() {
interpreter.open();
InterpreterResult result = interpreter.interpret("MATCH (n)-[r:KNOWS]-(m) RETURN n, r, m LIMIT 1", context);
GraphResult.Graph graph = gson.fromJson(result.toString().replace("%network ", StringUtils.EMPTY), GraphResult.Graph.class);
assertEquals(2, graph.getNodes().size());
assertEquals(true, graph.getNodes().iterator().next().getLabel().equals(LABEL_PERSON));
assertEquals(1, graph.getEdges().size());
assertEquals(true, graph.getEdges().iterator().next().getLabel().equals(REL_KNOWS));
assertEquals(1, graph.getLabels().size());
assertEquals(1, graph.getTypes().size());
assertEquals(true, graph.getLabels().containsKey(LABEL_PERSON));
assertEquals(REL_KNOWS, graph.getTypes().iterator().next());
assertEquals(Code.SUCCESS, result.code());
}
@Test
public void testFallingQuery() {
interpreter.open();
final String ERROR_MSG_EMPTY = "%text Cypher query is Empty";
InterpreterResult result = interpreter.interpret(StringUtils.EMPTY, context);
assertEquals(Code.ERROR, result.code());
assertEquals(ERROR_MSG_EMPTY, result.toString());
result = interpreter.interpret(null, context);
assertEquals(Code.ERROR, result.code());
assertEquals(ERROR_MSG_EMPTY, result.toString());
result = interpreter.interpret("MATCH (n:Person{name: }) RETURN n.name AS name, n.age AS age", context);
assertEquals(Code.ERROR, result.code());
}
}

View file

@ -76,6 +76,7 @@
<module>bigquery</module>
<module>alluxio</module>
<module>scio</module>
<module>neo4j</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-jupyter</module>

View file

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.graph;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.tabledata.Node;
import org.apache.zeppelin.tabledata.Relationship;
import com.google.gson.Gson;
/**
* The intepreter result template for Networks
*
*/
public class GraphResult extends InterpreterResult {
/**
* The Graph structure parsed from the front-end
*
*/
public static class Graph {
private Collection<Node> nodes;
private Collection<Relationship> edges;
/**
* The node types in the whole graph, and the related colors
*
*/
private Map<String, String> labels;
/**
* The relationship types in the whole graph
*
*/
private Set<String> types;
/**
* Is a directed graph
*/
private boolean directed;
public Graph() {}
public Graph(Collection<Node> nodes, Collection<Relationship> edges,
Map<String, String> labels, Set<String> types, boolean directed) {
super();
this.setNodes(nodes);
this.setEdges(edges);
this.setLabels(labels);
this.setTypes(types);
this.setDirected(directed);
}
public Collection<Node> getNodes() {
return nodes;
}
public void setNodes(Collection<Node> nodes) {
this.nodes = nodes;
}
public Collection<Relationship> getEdges() {
return edges;
}
public void setEdges(Collection<Relationship> edges) {
this.edges = edges;
}
public Map<String, String> getLabels() {
return labels;
}
public void setLabels(Map<String, String> labels) {
this.labels = labels;
}
public Set<String> getTypes() {
return types;
}
public void setTypes(Set<String> types) {
this.types = types;
}
public boolean isDirected() {
return directed;
}
public void setDirected(boolean directed) {
this.directed = directed;
}
}
private static final Gson gson = new Gson();
public GraphResult(Code code, Graph graphObject) {
super(code, Type.NETWORK, gson.toJson(graphObject));
}
}

View file

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tabledata;
import java.util.Map;
/**
* The base network entity
*
*/
public abstract class GraphEntity {
private long id;
/**
* The data of the entity
*
*/
private Map<String, Object> data;
/**
* The primary type of the entity
*/
private String label;
public GraphEntity() {}
public GraphEntity(long id, Map<String, Object> data, String label) {
super();
this.setId(id);
this.setData(data);
this.setLabel(label);
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public Map<String, Object> getData() {
return data;
}
public void setData(Map<String, Object> data) {
this.data = data;
}
public String getLabel() {
return label;
}
public void setLabel(String label) {
this.label = label;
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tabledata;
import java.util.Map;
import java.util.Set;
/**
* The Zeppelin Node Entity
*
*/
public class Node extends GraphEntity {
/**
* The labels (types) attached to a node
*/
private Set<String> labels;
public Node() {}
public Node(long id, Map<String, Object> data, Set<String> labels) {
super(id, data, labels.iterator().next());
}
public Set<String> getLabels() {
return labels;
}
public void setLabels(Set<String> labels) {
this.labels = labels;
}
}

View file

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tabledata;
import java.util.Map;
/**
* The Zeppelin Relationship entity
*
*/
public class Relationship extends GraphEntity {
/**
* Source node ID
*/
private long source;
/**
* End node ID
*/
private long target;
public Relationship() {}
public Relationship(long id, Map<String, Object> data, long source,
long target, String label) {
super(id, data, label);
this.setSource(source);
this.setTarget(target);
}
public long getSource() {
return source;
}
public void setSource(long startNodeId) {
this.source = startNodeId;
}
public long getTarget() {
return target;
}
public void setTarget(long endNodeId) {
this.target = endNodeId;
}
}

View file

@ -623,7 +623,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.bigquery.BigQueryInterpreter,"
+ "org.apache.zeppelin.beam.BeamInterpreter,"
+ "org.apache.zeppelin.scio.ScioInterpreter,"
+ "org.apache.zeppelin.groovy.GroovyInterpreter"
+ "org.apache.zeppelin.groovy.GroovyInterpreter,"
+ "org.apache.zeppelin.neo4j.Neo4jCypherInterpreter"
),
ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
@ -634,7 +635,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy"),
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy,neo4j"),
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),