mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Moved business logic to the connection manager
This commit is contained in:
parent
8e4690e80c
commit
2f88e9896d
3 changed files with 42 additions and 48 deletions
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.graph.neo4j;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
|
@ -29,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.neo4j.driver.v1.AuthToken;
|
||||
import org.neo4j.driver.v1.AuthTokens;
|
||||
import org.neo4j.driver.v1.Config;
|
||||
import org.neo4j.driver.v1.Driver;
|
||||
import org.neo4j.driver.v1.GraphDatabase;
|
||||
|
|
@ -42,6 +44,12 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class Neo4jConnectionManager {
|
||||
static final Logger LOGGER = LoggerFactory.getLogger(Neo4jConnectionManager.class);
|
||||
|
||||
public static final String NEO4J_SERVER_URL = "neo4j.url";
|
||||
public static final String NEO4J_AUTH_TYPE = "neo4j.auth.type";
|
||||
public static final String NEO4J_AUTH_USER = "neo4j.auth.user";
|
||||
public static final String NEO4J_AUTH_PASSWORD = "neo4j.auth.password";
|
||||
public static final String NEO4J_MAX_CONCURRENCY = "neo4j.max.concurrency";
|
||||
|
||||
private static final Pattern PROPERTY_PATTERN = Pattern.compile("\\{\\w+\\}");
|
||||
private static final String REPLACE_CURLY_BRACKETS = "\\{|\\}";
|
||||
|
|
@ -57,11 +65,34 @@ public class Neo4jConnectionManager {
|
|||
|
||||
private final AuthToken authToken;
|
||||
|
||||
public Neo4jConnectionManager(String neo4jUrl, AuthToken authToken,
|
||||
Config config) {
|
||||
this.neo4jUrl = neo4jUrl;
|
||||
this.authToken = authToken;
|
||||
this.config = config;
|
||||
/**
|
||||
*
|
||||
* Enum type for the AuthToken
|
||||
*
|
||||
*/
|
||||
public enum Neo4jAuthType {NONE, BASIC}
|
||||
|
||||
public Neo4jConnectionManager(Properties properties) {
|
||||
this.neo4jUrl = properties.getProperty(NEO4J_SERVER_URL);
|
||||
this.config = Config.build()
|
||||
.withMaxIdleSessions(Integer.parseInt(properties.getProperty(NEO4J_MAX_CONCURRENCY)))
|
||||
.toConfig();
|
||||
String authType = properties.getProperty(NEO4J_AUTH_TYPE);
|
||||
switch (Neo4jAuthType.valueOf(authType.toUpperCase())) {
|
||||
case BASIC:
|
||||
String username = properties.getProperty(NEO4J_AUTH_USER);
|
||||
String password = properties.getProperty(NEO4J_AUTH_PASSWORD);
|
||||
LOGGER.debug("Creating a BASIC authentication to neo4j with user '{}' and password '{}'",
|
||||
username, password);
|
||||
this.authToken = AuthTokens.basic(username, password);
|
||||
break;
|
||||
case NONE:
|
||||
LOGGER.debug("Creating NONE authentication");
|
||||
this.authToken = AuthTokens.none();
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Neo4j authentication type not supported");
|
||||
}
|
||||
}
|
||||
|
||||
private Driver getDriver() {
|
||||
|
|
|
|||
|
|
@ -37,9 +37,6 @@ import org.apache.zeppelin.scheduler.Scheduler;
|
|||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.neo4j.driver.internal.types.InternalTypeSystem;
|
||||
import org.neo4j.driver.internal.util.Iterables;
|
||||
import org.neo4j.driver.v1.AuthToken;
|
||||
import org.neo4j.driver.v1.AuthTokens;
|
||||
import org.neo4j.driver.v1.Config;
|
||||
import org.neo4j.driver.v1.Record;
|
||||
import org.neo4j.driver.v1.StatementResult;
|
||||
import org.neo4j.driver.v1.Value;
|
||||
|
|
@ -52,12 +49,6 @@ import org.neo4j.driver.v1.util.Pair;
|
|||
* Neo4j interpreter for Zeppelin.
|
||||
*/
|
||||
public class Neo4jCypherInterpreter extends Interpreter {
|
||||
public static final String NEO4J_SERVER_URL = "neo4j.url";
|
||||
public static final String NEO4J_AUTH_TYPE = "neo4j.auth.type";
|
||||
public static final String NEO4J_AUTH_USER = "neo4j.auth.user";
|
||||
public static final String NEO4J_AUTH_PASSWORD = "neo4j.auth.password";
|
||||
public static final String NEO4J_MAX_CONCURRENCY = "neo4j.max.concurrency";
|
||||
|
||||
private static final String TABLE = "%table";
|
||||
public static final String NEW_LINE = "\n";
|
||||
public static final String TAB = "\t";
|
||||
|
|
@ -65,13 +56,6 @@ public class Neo4jCypherInterpreter extends Interpreter {
|
|||
private static final String MAP_KEY_TEMPLATE = "%s.%s";
|
||||
private static final String ARAY_KEY_TEMPLATE = "%s[%d]";
|
||||
|
||||
/**
|
||||
*
|
||||
* Enum type for the AuthToken
|
||||
*
|
||||
*/
|
||||
public enum Neo4jAuthType {NONE, BASIC}
|
||||
|
||||
private Map<String, String> labels;
|
||||
|
||||
private Set<String> types;
|
||||
|
|
@ -80,28 +64,7 @@ public class Neo4jCypherInterpreter extends Interpreter {
|
|||
|
||||
public Neo4jCypherInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
Config config = Config.build()
|
||||
.withMaxIdleSessions(Integer.parseInt(getProperty(NEO4J_MAX_CONCURRENCY)))
|
||||
.toConfig();
|
||||
String authType = getProperty(NEO4J_AUTH_TYPE);
|
||||
AuthToken authToken = null;
|
||||
switch (Neo4jAuthType.valueOf(authType.toUpperCase())) {
|
||||
case BASIC:
|
||||
String username = getProperty(NEO4J_AUTH_USER);
|
||||
String password = getProperty(NEO4J_AUTH_PASSWORD);
|
||||
logger.debug("Creating a BASIC authentication to neo4j with user '{}' and password '{}'",
|
||||
username, password);
|
||||
authToken = AuthTokens.basic(username, password);
|
||||
break;
|
||||
case NONE:
|
||||
logger.debug("Creating NONE authentication");
|
||||
authToken = AuthTokens.none();
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Neo4j authentication type not supported");
|
||||
}
|
||||
this.neo4jConnectionManager = new Neo4jConnectionManager(
|
||||
getProperty(NEO4J_SERVER_URL), authToken, config);
|
||||
this.neo4jConnectionManager = new Neo4jConnectionManager(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -293,7 +256,7 @@ public class Neo4jCypherInterpreter extends Interpreter {
|
|||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton()
|
||||
.createOrGetParallelScheduler(Neo4jCypherInterpreter.class.getName() + this.hashCode(),
|
||||
Integer.parseInt(getProperty(NEO4J_MAX_CONCURRENCY)));
|
||||
Integer.parseInt(getProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import java.util.Properties;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.graph.neo4j.Neo4jCypherInterpreter.Neo4jAuthType;
|
||||
import org.apache.zeppelin.graph.neo4j.Neo4jConnectionManager.Neo4jAuthType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
|
@ -87,9 +87,9 @@ public class Neo4jCypherInterpreterTest {
|
|||
@Before
|
||||
public void setUpZeppelin() {
|
||||
Properties p = new Properties();
|
||||
p.setProperty(Neo4jCypherInterpreter.NEO4J_SERVER_URL, server.boltURI().toString());
|
||||
p.setProperty(Neo4jCypherInterpreter.NEO4J_AUTH_TYPE, Neo4jAuthType.NONE.toString());
|
||||
p.setProperty(Neo4jCypherInterpreter.NEO4J_MAX_CONCURRENCY, "50");
|
||||
p.setProperty(Neo4jConnectionManager.NEO4J_SERVER_URL, server.boltURI().toString());
|
||||
p.setProperty(Neo4jConnectionManager.NEO4J_AUTH_TYPE, Neo4jAuthType.NONE.toString());
|
||||
p.setProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY, "50");
|
||||
interpreter = new Neo4jCypherInterpreter(p);
|
||||
context = new InterpreterContext("note", "id", null, "title", "text",
|
||||
new AuthenticationInfo(),
|
||||
|
|
|
|||
Loading…
Reference in a new issue