mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3976. Create AbstractInterprter for common usage
This commit is contained in:
parent
51ba1936a2
commit
7b42a42f3f
20 changed files with 158 additions and 63 deletions
|
|
@ -43,7 +43,7 @@ public class GroovyZeppelinContext extends BaseZeppelinContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String showData(Object obj) {
|
||||
public String showData(Object obj) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class DevZeppelinContext extends BaseZeppelinContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String showData(Object obj) {
|
||||
public String showData(Object obj) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.alias.CredentialProvider;
|
||||
import org.apache.hadoop.security.alias.CredentialProviderFactory;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -153,6 +154,11 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
maxLineResults = MAX_LINE_DEFAULT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean runKerberosLogin() {
|
||||
try {
|
||||
|
|
@ -727,7 +733,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
|
||||
String statementPrecode =
|
||||
getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, propertyKey));
|
||||
|
||||
|
||||
if (StringUtils.isNotBlank(statementPrecode)) {
|
||||
statement.execute(statementPrecode);
|
||||
}
|
||||
|
|
@ -803,9 +809,12 @@ public class JDBCInterpreter extends KerberosInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String originalCmd, InterpreterContext contextInterpreter) {
|
||||
String cmd = Boolean.parseBoolean(getProperty("zeppelin.jdbc.interpolation")) ?
|
||||
interpolate(originalCmd, contextInterpreter.getResourcePool()) : originalCmd;
|
||||
protected boolean isInterpolate() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.jdbc.interpolation", "false"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult internalInterpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.debug("Run SQL command '{}'", cmd);
|
||||
String propertyKey = getPropertyKey(contextInterpreter);
|
||||
cmd = cmd.trim();
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package org.apache.zeppelin.jdbc;
|
|||
|
||||
import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
|
@ -77,7 +78,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testEnableDisableProperty() throws IOException {
|
||||
public void testEnableDisableProperty() throws IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -115,7 +116,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNormalQueryInterpolation() throws IOException {
|
||||
public void testNormalQueryInterpolation() throws IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -154,7 +155,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testEscapedInterpolationPattern() throws IOException {
|
||||
public void testEscapedInterpolationPattern() throws IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -177,7 +178,7 @@ public class JDBCInterpreterInterpolationTest extends BasicJDBCTestCaseAdapter {
|
|||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals(1, interpreterResult.message().size());
|
||||
assertEquals("ID\tNAME\nkey\tkeyboard\nmou\tmouse\n",
|
||||
assertEquals("ID\tNAME\nkey\tkeyboard\nmou\tmouse\n",
|
||||
interpreterResult.message().get(0).getData());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testForMapPrefix() throws SQLException, IOException {
|
||||
public void testForMapPrefix() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -170,7 +170,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSelectQuery() throws SQLException, IOException {
|
||||
public void testSelectQuery() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -191,7 +191,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testColumnAliasQuery() throws IOException {
|
||||
public void testColumnAliasQuery() throws IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -243,7 +243,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testQueryWithEscapedCharacters() throws SQLException, IOException {
|
||||
public void testQueryWithEscapedCharacters() throws SQLException, IOException,
|
||||
InterpreterException {
|
||||
String sqlQuery = "select '\\n', ';';" +
|
||||
"select replace('A\\;B', '\\', 'text');" +
|
||||
"select '\\', ';';" +
|
||||
|
|
@ -274,7 +275,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSelectMultipleQueries() throws SQLException, IOException {
|
||||
public void testSelectMultipleQueries() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -301,7 +302,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultSplitQuries() throws SQLException, IOException {
|
||||
public void testDefaultSplitQuries() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -324,7 +325,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSelectQueryWithNull() throws SQLException, IOException {
|
||||
public void testSelectQueryWithNull() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -346,7 +347,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
|
||||
|
||||
@Test
|
||||
public void testSelectQueryMaxResult() throws SQLException, IOException {
|
||||
public void testSelectQueryMaxResult() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
@ -444,7 +445,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMultiTenant() throws SQLException, IOException {
|
||||
public void testMultiTenant() throws SQLException, IOException, InterpreterException {
|
||||
/*
|
||||
* assume that the database user is 'dbuser' and password is 'dbpassword'
|
||||
* 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property
|
||||
|
|
@ -513,7 +514,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPrecode() throws SQLException, IOException {
|
||||
public void testPrecode() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
|
|
@ -556,7 +557,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPrecodeWithAnotherPrefix() throws SQLException, IOException {
|
||||
public void testPrecodeWithAnotherPrefix() throws SQLException, IOException,
|
||||
InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("anotherPrefix.driver", "org.h2.Driver");
|
||||
properties.setProperty("anotherPrefix.url", getJdbcConnection());
|
||||
|
|
@ -585,7 +587,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testStatementPrecode() throws SQLException, IOException {
|
||||
public void testStatementPrecode() throws SQLException, IOException, InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
|
|
@ -605,7 +607,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testIncorrectStatementPrecode() throws SQLException, IOException {
|
||||
public void testIncorrectStatementPrecode() throws SQLException, IOException,
|
||||
InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
|
|
@ -624,7 +627,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testStatementPrecodeWithAnotherPrefix() throws SQLException, IOException {
|
||||
public void testStatementPrecodeWithAnotherPrefix() throws SQLException, IOException,
|
||||
InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("anotherPrefix.driver", "org.h2.Driver");
|
||||
properties.setProperty("anotherPrefix.url", getJdbcConnection());
|
||||
|
|
@ -652,7 +656,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSplitSqlQueryWithComments() throws SQLException, IOException {
|
||||
public void testSplitSqlQueryWithComments() throws SQLException, IOException,
|
||||
InterpreterException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class PythonZeppelinContext extends BaseZeppelinContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String showData(Object obj) {
|
||||
public String showData(Object obj) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.zeppelin.sap;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreter;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
|
@ -36,7 +38,7 @@ import java.util.concurrent.TimeUnit;
|
|||
/**
|
||||
* SAP Universe interpreter for Zeppelin.
|
||||
*/
|
||||
public class UniverseInterpreter extends Interpreter {
|
||||
public class UniverseInterpreter extends AbstractInterpreter {
|
||||
|
||||
public UniverseInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
|
|
@ -80,10 +82,18 @@ public class UniverseInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String originalSt, InterpreterContext context)
|
||||
protected boolean isInterpolate() {
|
||||
return Boolean.parseBoolean(getProperty("universe.interpolation", "false"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult internalInterpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
final String st = Boolean.parseBoolean(getProperty("universe.interpolation", "false")) ?
|
||||
interpolate(originalSt, context.getResourcePool()) : originalSt;
|
||||
try {
|
||||
InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
String paragraphId = context.getParagraphId();
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.apache.commons.exec.ExecuteException;
|
|||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -83,14 +84,22 @@ public class ShellInterpreter extends KerberosInterpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isInterpolate() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.shell.interpolation", "false"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String originalCmd, InterpreterContext contextInterpreter) {
|
||||
String cmd = Boolean.parseBoolean(getProperty("zeppelin.shell.interpolation")) ?
|
||||
interpolate(originalCmd, contextInterpreter.getResourcePool()) : originalCmd;
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult internalInterpret(String cmd,
|
||||
InterpreterContext contextInterpreter) {
|
||||
LOGGER.debug("Run shell command '" + cmd + "'");
|
||||
OutputStream outStream = new ByteArrayOutputStream();
|
||||
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(shell);
|
||||
// the Windows CMD shell doesn't handle multiline statements,
|
||||
// they need to be delimited by '&&' instead
|
||||
|
|
@ -113,7 +122,7 @@ public class ShellInterpreter extends KerberosInterpreter {
|
|||
}
|
||||
|
||||
int exitVal = executor.execute(cmdLine);
|
||||
LOGGER.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
LOGGER.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " return with exit value: " + exitVal);
|
||||
return new InterpreterResult(Code.SUCCESS, outStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.shell;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -51,7 +52,7 @@ public class ShellInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
public void test() throws InterpreterException {
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("dir", context);
|
||||
} else {
|
||||
|
|
@ -65,7 +66,7 @@ public class ShellInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidCommand(){
|
||||
public void testInvalidCommand() throws InterpreterException {
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("invalid_command\ndir", context);
|
||||
} else {
|
||||
|
|
@ -76,7 +77,7 @@ public class ShellInterpreterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testShellTimeout() {
|
||||
public void testShellTimeout() throws InterpreterException {
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("timeout 4", context);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.spark;
|
|||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
|
||||
|
|
@ -29,7 +30,7 @@ import java.util.Properties;
|
|||
* Abstract class for SparkInterpreter. For the purpose of co-exist of NewSparkInterpreter
|
||||
* and OldSparkInterpreter
|
||||
*/
|
||||
public abstract class AbstractSparkInterpreter extends Interpreter {
|
||||
public abstract class AbstractSparkInterpreter extends AbstractInterpreter {
|
||||
|
||||
private SparkInterpreter parentSparkInterpreter;
|
||||
|
||||
|
|
@ -49,8 +50,6 @@ public abstract class AbstractSparkInterpreter extends Interpreter {
|
|||
|
||||
public abstract JavaSparkContext getJavaSparkContext();
|
||||
|
||||
public abstract SparkZeppelinContext getZeppelinContext();
|
||||
|
||||
public abstract String getSparkUIUrl();
|
||||
|
||||
public abstract boolean isUnsupportedSparkVersion();
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
|
||||
|
|
@ -143,11 +144,7 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
InterpreterContext.set(context);
|
||||
z.setGui(context.getGui());
|
||||
z.setNoteGui(context.getNoteGui());
|
||||
z.setInterpreterContext(context);
|
||||
public InterpreterResult internalInterpret(String st, InterpreterContext context) {
|
||||
sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
|
||||
// set spark.scheduler.pool to null to clear the pool assosiated with this paragraph
|
||||
// sc.setLocalProperty("spark.scheduler.pool", null) will clean the pool
|
||||
|
|
|
|||
|
|
@ -998,12 +998,11 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter {
|
|||
* Interpret a single line.
|
||||
*/
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
public InterpreterResult internalInterpret(String line, InterpreterContext context) {
|
||||
if (isUnsupportedSparkVersion()) {
|
||||
return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
|
||||
+ " is not supported");
|
||||
}
|
||||
z.setInterpreterContext(context);
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -186,7 +186,7 @@ public class PySparkInterpreter extends PythonInterpreter {
|
|||
return "python";
|
||||
}
|
||||
|
||||
public SparkZeppelinContext getZeppelinContext() {
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
if (sparkInterpreter != null) {
|
||||
return sparkInterpreter.getZeppelinContext();
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ package org.apache.zeppelin.spark;
|
|||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
|
@ -68,7 +68,7 @@ public class SparkInterpreter extends AbstractSparkInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
public InterpreterResult internalInterpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
return delegation.interpret(st, context);
|
||||
}
|
||||
|
|
@ -132,7 +132,7 @@ public class SparkInterpreter extends AbstractSparkInterpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public SparkZeppelinContext getZeppelinContext() {
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return delegation.getZeppelinContext();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
|
||||
}
|
||||
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
|
||||
ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
|
||||
ZeppelinRContext.setZeppelinContext((SparkZeppelinContext) sparkInterpreter.getZeppelinContext());
|
||||
|
||||
zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this);
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,8 @@ package org.apache.zeppelin.spark;
|
|||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreter;
|
||||
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
|
@ -37,7 +39,7 @@ import java.util.Properties;
|
|||
/**
|
||||
* Spark SQL interpreter for Zeppelin.
|
||||
*/
|
||||
public class SparkSqlInterpreter extends Interpreter {
|
||||
public class SparkSqlInterpreter extends AbstractInterpreter {
|
||||
private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
|
||||
|
||||
private SparkInterpreter sparkInterpreter;
|
||||
|
|
@ -59,7 +61,17 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
public void close() {}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
protected boolean isInterpolate() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation", "false"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseZeppelinContext getZeppelinContext() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult internalInterpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
if (sparkInterpreter.isUnsupportedSparkVersion()) {
|
||||
return new InterpreterResult(Code.ERROR, "Spark "
|
||||
|
|
@ -73,11 +85,9 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
|
||||
|
||||
try {
|
||||
String effectiveSQL = Boolean.parseBoolean(getProperty("zeppelin.spark.sql.interpolation")) ?
|
||||
interpolate(st, context.getResourcePool()) : st;
|
||||
Method method = sqlc.getClass().getMethod("sql", String.class);
|
||||
String msg = sparkInterpreter.getZeppelinContext().showData(
|
||||
method.invoke(sqlc, effectiveSQL));
|
||||
method.invoke(sqlc, st));
|
||||
sc.clearJobGroup();
|
||||
return new InterpreterResult(Code.SUCCESS, msg);
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public abstract class AbstractInterpreter extends Interpreter {
|
||||
|
||||
public AbstractInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st,
|
||||
InterpreterContext context) throws InterpreterException {
|
||||
InterpreterContext.set(context);
|
||||
BaseZeppelinContext z = getZeppelinContext();
|
||||
if (z != null) {
|
||||
z.setGui(context.getGui());
|
||||
z.setNoteGui(context.getNoteGui());
|
||||
z.setInterpreterContext(context);
|
||||
}
|
||||
boolean interpolate = isInterpolate() ||
|
||||
Boolean.parseBoolean(context.getLocalProperties().getOrDefault("interpolate", "false"));
|
||||
if (interpolate) {
|
||||
st = interpolate(st, context.getResourcePool());
|
||||
}
|
||||
return internalInterpret(st, context);
|
||||
}
|
||||
|
||||
public abstract BaseZeppelinContext getZeppelinContext();
|
||||
|
||||
protected boolean isInterpolate() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected abstract InterpreterResult internalInterpret(
|
||||
String st,
|
||||
InterpreterContext context) throws InterpreterException;
|
||||
}
|
||||
|
|
@ -71,7 +71,7 @@ public abstract class BaseZeppelinContext {
|
|||
* @param obj
|
||||
* @return
|
||||
*/
|
||||
protected abstract String showData(Object obj);
|
||||
public abstract String showData(Object obj);
|
||||
|
||||
/**
|
||||
* @deprecated use z.textbox instead
|
||||
|
|
@ -225,7 +225,7 @@ public abstract class BaseZeppelinContext {
|
|||
public void setMaxResult(int maxResult) {
|
||||
this.maxResult = maxResult;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* display special types of objects for interpreter.
|
||||
* Each interpreter can has its own supported classes.
|
||||
|
|
|
|||
|
|
@ -38,13 +38,13 @@ import org.slf4j.LoggerFactory;
|
|||
* startKerberosLoginThread() needs to be called inside the open() and
|
||||
* shutdownExecutorService() inside close().
|
||||
*
|
||||
*
|
||||
*
|
||||
* Environment variables defined in zeppelin-env.sh
|
||||
* KERBEROS_REFRESH_INTERVAL controls the refresh interval for Kerberos ticket. The default value
|
||||
* is 1d.
|
||||
* KINIT_FAIL_THRESHOLD controls how many times should kinit retry. The default value is 5.
|
||||
*/
|
||||
public abstract class KerberosInterpreter extends Interpreter {
|
||||
public abstract class KerberosInterpreter extends AbstractInterpreter {
|
||||
|
||||
private Integer kinitFailCount = 0;
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ public class BaseZeppelinContextTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected String showData(Object obj) {
|
||||
public String showData(Object obj) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue