mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-189: Handle responses containing reserved characters
This commit is contained in:
parent
14b8a37d05
commit
cd6294be73
3 changed files with 72 additions and 35 deletions
|
|
@ -29,11 +29,12 @@
|
|||
<artifactId>zeppelin-geode</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: Apache Geode Interpreter</name>
|
||||
<name>Zeppelin: Apache Geode interpreter</name>
|
||||
<url>http://geode.incubator.apache.org/</url>
|
||||
|
||||
<properties>
|
||||
<gemfire.version>8.1.0</gemfire.version>
|
||||
<geode.version>1.0.0-incubating-SNAPSHOT</geode.version>
|
||||
<!-- <gemfire.version>8.1.0</gemfire.version> -->
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -44,10 +45,18 @@
|
|||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>com.gemstone.gemfire</groupId>
|
||||
<artifactId>gemfire</artifactId>
|
||||
<version>${gemfire.version}</version>
|
||||
</dependency>
|
||||
-->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.geode</groupId>
|
||||
<artifactId>gemfire-core</artifactId>
|
||||
<version>${geode.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
@ -147,10 +156,12 @@
|
|||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<!--
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>spring-gemstone-releases</id>
|
||||
<url>http://repo.springsource.org/gemstone-release-cache/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
-->
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
|
|
@ -45,7 +46,7 @@ import com.gemstone.gemfire.pdx.PdxInstance;
|
|||
* <p>
|
||||
* Sample usage: <br/>
|
||||
* {@code %geode.oql} <br/>
|
||||
* {@code SELECT * FROM /regionEmployee e WHERE e.companyId > '95'}
|
||||
* {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95}
|
||||
* </p>
|
||||
*
|
||||
* The OQL spec and sample queries:
|
||||
|
|
@ -59,18 +60,24 @@ import com.gemstone.gemfire.pdx.PdxInstance;
|
|||
*/
|
||||
public class GeodeOqlInterpreter extends Interpreter {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
|
||||
int commandTimeOut = 600000;
|
||||
private static final String DEFAULT_PORT = "10334";
|
||||
private static final String DEFAULT_HOST = "localhost";
|
||||
|
||||
private static final String TABLE_MAGIC = "%table ";
|
||||
private static final char NEWLINE = '\n';
|
||||
private static final char TAB = '\t';
|
||||
private static final char WHITESPACE = ' ';
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
|
||||
|
||||
private static final String TABLE_MAGIC_TAG = "%table ";
|
||||
|
||||
public static final String LOCATOR_HOST = "geode.locator.host";
|
||||
public static final String LOCATOR_PORT = "geode.locator.port";
|
||||
|
||||
static {
|
||||
Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder().add(LOCATOR_HOST, "localhost", "The Geode Locator Host.")
|
||||
.add(LOCATOR_PORT, "10334", "The Geode Locator Port").build());
|
||||
new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
|
||||
.add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port").build());
|
||||
}
|
||||
|
||||
private ClientCache clientCache = null;
|
||||
|
|
@ -126,7 +133,6 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private InterpreterResult executeOql(String oql) {
|
||||
try {
|
||||
|
||||
|
|
@ -138,7 +144,7 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
SelectResults<Object> results =
|
||||
(SelectResults<Object>) getQueryService().newQuery(oql).execute();
|
||||
|
||||
StringBuilder msg = new StringBuilder(TABLE_MAGIC);
|
||||
StringBuilder msg = new StringBuilder(TABLE_MAGIC_TAG);
|
||||
boolean isTableHeaderSet = false;
|
||||
|
||||
Iterator<Object> iterator = results.iterator();
|
||||
|
|
@ -157,7 +163,7 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
isTableHeaderSet = true;
|
||||
msg.append('\n');
|
||||
msg.append(NEWLINE);
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.SUCCESS, msg.toString());
|
||||
|
|
@ -168,45 +174,56 @@ public class GeodeOqlInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private void handleStructEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
|
||||
/**
|
||||
* For %table response replace Tab and Newline characters from the content.
|
||||
*/
|
||||
private String replaceReservedChars(String str) {
|
||||
|
||||
if (StringUtils.isBlank(str)) {
|
||||
return str;
|
||||
}
|
||||
|
||||
return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
|
||||
}
|
||||
|
||||
private void handleStructEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
|
||||
Struct struct = (Struct) entry;
|
||||
if (!isTableHeaderSet) {
|
||||
if (!isHeaderSet) {
|
||||
for (String titleName : struct.getStructType().getFieldNames()) {
|
||||
msg.append(titleName).append('\t');
|
||||
msg.append(replaceReservedChars(titleName)).append(TAB);
|
||||
}
|
||||
msg.append('\n');
|
||||
msg.append(NEWLINE);
|
||||
}
|
||||
|
||||
for (String titleName : struct.getStructType().getFieldNames()) {
|
||||
msg.append(struct.get(titleName)).append('\t');
|
||||
msg.append(replaceReservedChars("" + struct.get(titleName))).append(TAB);
|
||||
}
|
||||
}
|
||||
|
||||
private void handlePdxInstanceEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
|
||||
private void handlePdxInstanceEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
|
||||
PdxInstance pdxEntry = (PdxInstance) entry;
|
||||
if (!isTableHeaderSet) {
|
||||
if (!isHeaderSet) {
|
||||
for (String titleName : pdxEntry.getFieldNames()) {
|
||||
msg.append(titleName).append('\t');
|
||||
msg.append(replaceReservedChars(titleName)).append(TAB);
|
||||
}
|
||||
msg.append('\n');
|
||||
msg.append(NEWLINE);
|
||||
}
|
||||
|
||||
for (String titleName : pdxEntry.getFieldNames()) {
|
||||
msg.append(pdxEntry.getField(titleName)).append('\t');
|
||||
msg.append(replaceReservedChars("" + pdxEntry.getField(titleName))).append(TAB);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleNumberEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
|
||||
if (!isTableHeaderSet) {
|
||||
msg.append("Result").append('\n');
|
||||
private void handleNumberEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
|
||||
if (!isHeaderSet) {
|
||||
msg.append("Result").append(NEWLINE);
|
||||
}
|
||||
msg.append((Number) entry);
|
||||
}
|
||||
|
||||
private void handleUnsupportedTypeEntry(boolean isTableHeaderSet,
|
||||
Object entry, StringBuilder msg) {
|
||||
if (!isTableHeaderSet) {
|
||||
msg.append("Unsuppoted Type").append('\n');
|
||||
private void handleUnsupportedTypeEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
|
||||
if (!isHeaderSet) {
|
||||
msg.append("Unsuppoted Type").append(NEWLINE);
|
||||
}
|
||||
msg.append("" + entry);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ public class GeodeOqlInterpreterTest {
|
|||
|
||||
@Test
|
||||
public void oqlNumberResponse() throws Exception {
|
||||
oqlTest(new ArrayList<Object>(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n");
|
||||
testOql(new ArrayList<Object>(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -56,15 +56,24 @@ public class GeodeOqlInterpreterTest {
|
|||
String[] fields = new String[] {"field1", "field2"};
|
||||
Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"});
|
||||
|
||||
oqlTest(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
|
||||
testOql(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
|
||||
"field1\tfield2\t\nval1\tval2\t\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void oqlStructResponseWithReservedCharacters() throws Exception {
|
||||
String[] fields = new String[] {"fi\teld1", "f\nield2"};
|
||||
Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"});
|
||||
|
||||
testOql(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
|
||||
"fi eld1\tf ield2\t\nv al 1\tval2\t\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void oqlPdxInstanceResponse() throws Exception {
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes());
|
||||
PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
|
||||
oqlTest(new ArrayList<Object>(Arrays.asList(pdxInstance)).iterator(), "\n\n");
|
||||
testOql(new ArrayList<Object>(Arrays.asList(pdxInstance)).iterator(), "\n\n");
|
||||
}
|
||||
|
||||
private static class DummyUnspportedType {
|
||||
|
|
@ -78,11 +87,11 @@ public class GeodeOqlInterpreterTest {
|
|||
public void oqlUnsupportedTypeResponse() throws Exception {
|
||||
DummyUnspportedType unspported1 = new DummyUnspportedType();
|
||||
DummyUnspportedType unspported2 = new DummyUnspportedType();
|
||||
oqlTest(new ArrayList<Object>(Arrays.asList(unspported1, unspported2)).iterator(),
|
||||
testOql(new ArrayList<Object>(Arrays.asList(unspported1, unspported2)).iterator(),
|
||||
"Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n");
|
||||
}
|
||||
|
||||
private void oqlTest(Iterator<Object> queryResponseIterator, String expectedOutput)
|
||||
private void testOql(Iterator<Object> queryResponseIterator, String expectedOutput)
|
||||
throws Exception {
|
||||
|
||||
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
|
||||
|
|
@ -124,12 +133,12 @@ public class GeodeOqlInterpreterTest {
|
|||
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
|
||||
|
||||
when(spyGeodeOqlInterpreter.getQueryService())
|
||||
.thenThrow(new RuntimeException("Test exception"));
|
||||
.thenThrow(new RuntimeException("Expected Test Exception!"));
|
||||
|
||||
InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
|
||||
|
||||
assertEquals(Code.ERROR, interpreterResult.code());
|
||||
assertEquals("Test exception", interpreterResult.message());
|
||||
assertEquals("Expected Test Exception!", interpreterResult.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in a new issue