ZEPPELIN-189: Handle responses containing reserved characters

This commit is contained in:
tzolov 2015-08-01 15:56:10 +02:00
parent 14b8a37d05
commit cd6294be73
3 changed files with 72 additions and 35 deletions

View file

@ -29,11 +29,12 @@
<artifactId>zeppelin-geode</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Apache Geode Interpreter</name>
<name>Zeppelin: Apache Geode interpreter</name>
<url>http://geode.incubator.apache.org/</url>
<properties>
<gemfire.version>8.1.0</gemfire.version>
<geode.version>1.0.0-incubating-SNAPSHOT</geode.version>
<!-- <gemfire.version>8.1.0</gemfire.version> -->
</properties>
<dependencies>
@ -44,10 +45,18 @@
<scope>provided</scope>
</dependency>
<!--
<dependency>
<groupId>com.gemstone.gemfire</groupId>
<artifactId>gemfire</artifactId>
<version>${gemfire.version}</version>
</dependency>
-->
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>gemfire-core</artifactId>
<version>${geode.version}</version>
</dependency>
<dependency>
@ -147,10 +156,12 @@
</plugin>
</plugins>
</build>
<!--
<repositories>
<repository>
<id>spring-gemstone-releases</id>
<url>http://repo.springsource.org/gemstone-release-cache/</url>
</repository>
</repositories>
-->
</project>

View file

@ -18,6 +18,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
@ -45,7 +46,7 @@ import com.gemstone.gemfire.pdx.PdxInstance;
* <p>
* Sample usage: <br/>
* {@code %geode.oql} <br/>
* {@code SELECT * FROM /regionEmployee e WHERE e.companyId > '95'}
* {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95}
* </p>
*
* The OQL spec and sample queries:
@ -59,18 +60,24 @@ import com.gemstone.gemfire.pdx.PdxInstance;
*/
public class GeodeOqlInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
int commandTimeOut = 600000;
private static final String DEFAULT_PORT = "10334";
private static final String DEFAULT_HOST = "localhost";
private static final String TABLE_MAGIC = "%table ";
private static final char NEWLINE = '\n';
private static final char TAB = '\t';
private static final char WHITESPACE = ' ';
Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
private static final String TABLE_MAGIC_TAG = "%table ";
public static final String LOCATOR_HOST = "geode.locator.host";
public static final String LOCATOR_PORT = "geode.locator.port";
static {
Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(),
new InterpreterPropertyBuilder().add(LOCATOR_HOST, "localhost", "The Geode Locator Host.")
.add(LOCATOR_PORT, "10334", "The Geode Locator Port").build());
new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
.add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port").build());
}
private ClientCache clientCache = null;
@ -126,7 +133,6 @@ public class GeodeOqlInterpreter extends Interpreter {
}
}
private InterpreterResult executeOql(String oql) {
try {
@ -138,7 +144,7 @@ public class GeodeOqlInterpreter extends Interpreter {
SelectResults<Object> results =
(SelectResults<Object>) getQueryService().newQuery(oql).execute();
StringBuilder msg = new StringBuilder(TABLE_MAGIC);
StringBuilder msg = new StringBuilder(TABLE_MAGIC_TAG);
boolean isTableHeaderSet = false;
Iterator<Object> iterator = results.iterator();
@ -157,7 +163,7 @@ public class GeodeOqlInterpreter extends Interpreter {
}
isTableHeaderSet = true;
msg.append('\n');
msg.append(NEWLINE);
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
@ -168,45 +174,56 @@ public class GeodeOqlInterpreter extends Interpreter {
}
}
private void handleStructEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
/**
* For %table response replace Tab and Newline characters from the content.
*/
private String replaceReservedChars(String str) {
if (StringUtils.isBlank(str)) {
return str;
}
return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
}
private void handleStructEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
Struct struct = (Struct) entry;
if (!isTableHeaderSet) {
if (!isHeaderSet) {
for (String titleName : struct.getStructType().getFieldNames()) {
msg.append(titleName).append('\t');
msg.append(replaceReservedChars(titleName)).append(TAB);
}
msg.append('\n');
msg.append(NEWLINE);
}
for (String titleName : struct.getStructType().getFieldNames()) {
msg.append(struct.get(titleName)).append('\t');
msg.append(replaceReservedChars("" + struct.get(titleName))).append(TAB);
}
}
private void handlePdxInstanceEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
private void handlePdxInstanceEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
PdxInstance pdxEntry = (PdxInstance) entry;
if (!isTableHeaderSet) {
if (!isHeaderSet) {
for (String titleName : pdxEntry.getFieldNames()) {
msg.append(titleName).append('\t');
msg.append(replaceReservedChars(titleName)).append(TAB);
}
msg.append('\n');
msg.append(NEWLINE);
}
for (String titleName : pdxEntry.getFieldNames()) {
msg.append(pdxEntry.getField(titleName)).append('\t');
msg.append(replaceReservedChars("" + pdxEntry.getField(titleName))).append(TAB);
}
}
private void handleNumberEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
if (!isTableHeaderSet) {
msg.append("Result").append('\n');
private void handleNumberEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
if (!isHeaderSet) {
msg.append("Result").append(NEWLINE);
}
msg.append((Number) entry);
}
private void handleUnsupportedTypeEntry(boolean isTableHeaderSet,
Object entry, StringBuilder msg) {
if (!isTableHeaderSet) {
msg.append("Unsuppoted Type").append('\n');
private void handleUnsupportedTypeEntry(boolean isHeaderSet, Object entry, StringBuilder msg) {
if (!isHeaderSet) {
msg.append("Unsuppoted Type").append(NEWLINE);
}
msg.append("" + entry);
}

View file

@ -48,7 +48,7 @@ public class GeodeOqlInterpreterTest {
@Test
public void oqlNumberResponse() throws Exception {
oqlTest(new ArrayList<Object>(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n");
testOql(new ArrayList<Object>(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n");
}
@Test
@ -56,15 +56,24 @@ public class GeodeOqlInterpreterTest {
String[] fields = new String[] {"field1", "field2"};
Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"});
oqlTest(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
testOql(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
"field1\tfield2\t\nval1\tval2\t\n");
}
@Test
public void oqlStructResponseWithReservedCharacters() throws Exception {
String[] fields = new String[] {"fi\teld1", "f\nield2"};
Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"});
testOql(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
"fi eld1\tf ield2\t\nv al 1\tval2\t\n");
}
@Test
public void oqlPdxInstanceResponse() throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes());
PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
oqlTest(new ArrayList<Object>(Arrays.asList(pdxInstance)).iterator(), "\n\n");
testOql(new ArrayList<Object>(Arrays.asList(pdxInstance)).iterator(), "\n\n");
}
private static class DummyUnspportedType {
@ -78,11 +87,11 @@ public class GeodeOqlInterpreterTest {
public void oqlUnsupportedTypeResponse() throws Exception {
DummyUnspportedType unspported1 = new DummyUnspportedType();
DummyUnspportedType unspported2 = new DummyUnspportedType();
oqlTest(new ArrayList<Object>(Arrays.asList(unspported1, unspported2)).iterator(),
testOql(new ArrayList<Object>(Arrays.asList(unspported1, unspported2)).iterator(),
"Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n");
}
private void oqlTest(Iterator<Object> queryResponseIterator, String expectedOutput)
private void testOql(Iterator<Object> queryResponseIterator, String expectedOutput)
throws Exception {
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
@ -124,12 +133,12 @@ public class GeodeOqlInterpreterTest {
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
when(spyGeodeOqlInterpreter.getQueryService())
.thenThrow(new RuntimeException("Test exception"));
.thenThrow(new RuntimeException("Expected Test Exception!"));
InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
assertEquals(Code.ERROR, interpreterResult.code());
assertEquals("Test exception", interpreterResult.message());
assertEquals("Expected Test Exception!", interpreterResult.message());
}
@Test