ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin

This commit is contained in:
tzolov 2015-07-31 02:24:41 +02:00
parent 3a42a28b01
commit ef7defcaeb
5 changed files with 538 additions and 1 deletions

View file

@ -72,7 +72,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

156
geode/pom.xml Normal file
View file

@ -0,0 +1,156 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-geode</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Apache Geode Interpreter</name>
<url>http://geode.incubator.apache.org/</url>
<properties>
<gemfire.version>8.1.0</gemfire.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.gemstone.gemfire</groupId>
<artifactId>gemfire</artifactId>
<version>${gemfire.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/psql</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/psql</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-gemstone-releases</id>
<url>http://repo.springsource.org/gemstone-release-cache/</url>
</repository>
</repositories>
</project>

View file

@ -0,0 +1,255 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.zeppelin.geode;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.pdx.PdxInstance;
/**
* Apache Geode OQL Interpreter (http://geode.incubator.apache.org)
*
* <ul>
* <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li>
* <li>{@code geode.locator.port} - The Geode Locator {@code <PORT>} to connect to.</li>
* </ul>
* <p>
* Sample usage: <br/>
* {@code %geode.oql} <br/>
* {@code SELECT * FROM /regionEmployee e WHERE e.companyId > '95'}
* </p>
*
* The OQL spec and sample queries:
* http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html
*
* <p>
* Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query
* "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a
* PartitionedRegion.
* </p>
*/
public class GeodeOqlInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
int commandTimeOut = 600000;
private static final String TABLE_MAGIC = "%table ";
public static final String LOCATOR_HOST = "geode.locator.host";
public static final String LOCATOR_PORT = "geode.locator.port";
static {
Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(),
new InterpreterPropertyBuilder().add(LOCATOR_HOST, "localhost", "The Geode Locator Host.")
.add(LOCATOR_PORT, "10334", "The Geode Locator Port").build());
}
private ClientCache clientCache = null;
private QueryService queryService = null;
private Exception exceptionOnConnect;
public GeodeOqlInterpreter(Properties property) {
super(property);
}
protected ClientCache getClientCache() {
String locatorHost = getProperty(LOCATOR_HOST);
int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT));
ClientCache clientCache =
new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create();
return clientCache;
}
@Override
public void open() {
logger.info("Geode open connection called!");
try {
clientCache = getClientCache();
queryService = clientCache.getQueryService();
exceptionOnConnect = null;
logger.info("Successfully created Geode connection");
} catch (Exception e) {
logger.error("Cannot open connection", e);
exceptionOnConnect = e;
}
}
@Override
public void close() {
try {
if (clientCache != null) {
clientCache.close();
}
if (queryService != null) {
queryService.closeCqs();
}
} catch (Exception e) {
logger.error("Cannot close connection", e);
} finally {
clientCache = null;
queryService = null;
exceptionOnConnect = null;
}
}
private InterpreterResult executeOql(String oql) {
try {
if (getExceptionOnConnect() != null) {
return new InterpreterResult(Code.ERROR, getExceptionOnConnect().getMessage());
}
@SuppressWarnings("unchecked")
SelectResults<Object> results =
(SelectResults<Object>) getQueryService().newQuery(oql).execute();
StringBuilder msg = new StringBuilder(TABLE_MAGIC);
boolean isTableHeaderSet = false;
Iterator<Object> iterator = results.iterator();
while (iterator.hasNext()) {
Object entry = iterator.next();
if (entry instanceof Number) {
handleNumberEntry(isTableHeaderSet, entry, msg);
} else if (entry instanceof Struct) {
handleStructEntry(isTableHeaderSet, entry, msg);
} else if (entry instanceof PdxInstance) {
handlePdxInstanceEntry(isTableHeaderSet, entry, msg);
} else {
handleUnsupportedTypeEntry(isTableHeaderSet, entry, msg);
}
isTableHeaderSet = true;
msg.append('\n');
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
} catch (Exception ex) {
logger.error("Cannot run " + oql, ex);
return new InterpreterResult(Code.ERROR, ex.getMessage());
}
}
private void handleStructEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
Struct struct = (Struct) entry;
if (!isTableHeaderSet) {
for (String titleName : struct.getStructType().getFieldNames()) {
msg.append(titleName).append('\t');
}
msg.append('\n');
}
for (String titleName : struct.getStructType().getFieldNames()) {
msg.append(struct.get(titleName)).append('\t');
}
}
private void handlePdxInstanceEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
PdxInstance pdxEntry = (PdxInstance) entry;
if (!isTableHeaderSet) {
for (String titleName : pdxEntry.getFieldNames()) {
msg.append(titleName).append('\t');
}
msg.append('\n');
}
for (String titleName : pdxEntry.getFieldNames()) {
msg.append(pdxEntry.getField(titleName)).append('\t');
}
}
private void handleNumberEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) {
if (!isTableHeaderSet) {
msg.append("Result").append('\n');
}
msg.append((Number) entry);
}
private void handleUnsupportedTypeEntry(boolean isTableHeaderSet,
Object entry, StringBuilder msg) {
if (!isTableHeaderSet) {
msg.append("Unsuppoted Type").append('\n');
}
msg.append("" + entry);
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run OQL command '{}'", cmd);
return executeOql(cmd);
}
@Override
public void cancel(InterpreterContext context) {
// Do nothing
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
GeodeOqlInterpreter.class.getName() + this.hashCode());
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
// Test only
QueryService getQueryService() {
return this.queryService;
}
Exception getExceptionOnConnect() {
return this.exceptionOnConnect;
}
}

View file

@ -0,0 +1,125 @@
package org.apache.zeppelin.geode;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.Test;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.cache.query.internal.StructImpl;
import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
import com.gemstone.gemfire.pdx.PdxInstance;
import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
import com.gemstone.gemfire.pdx.internal.PdxType;
public class GeodeOqlInterpreterTest {
private static final String OQL_QUERY = "select * from /region";
@Test
public void oqlNumberResponse() throws Exception {
oqlTest(new ArrayList<Object>(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n");
}
@Test
public void oqlStructResponse() throws Exception {
String[] fields = new String[] {"field1", "field2"};
Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"});
oqlTest(new ArrayList<Object>(Arrays.asList(struct)).iterator(),
"field1\tfield2\t\nval1\tval2\t\n");
}
@Test
public void oqlPdxInstanceResponse() throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes());
PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4);
oqlTest(new ArrayList<Object>(Arrays.asList(pdxInstance)).iterator(), "\n\n");
}
private static class DummyUnspportedType {
@Override
public String toString() {
return "Unsupported Indeed";
}
}
@Test
public void oqlUnsupportedTypeResponse() throws Exception {
DummyUnspportedType unspported1 = new DummyUnspportedType();
DummyUnspportedType unspported2 = new DummyUnspportedType();
oqlTest(new ArrayList<Object>(Arrays.asList(unspported1, unspported2)).iterator(),
"Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n");
}
private void oqlTest(Iterator<Object> queryResponseIterator, String expectedOutput)
throws Exception {
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS);
when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService);
@SuppressWarnings("unchecked")
SelectResults<Object> mockResults = mock(SelectResults.class);
when(mockQueryService.newQuery(eq(OQL_QUERY)).execute()).thenReturn(mockResults);
when(mockResults.iterator()).thenReturn(queryResponseIterator);
InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
assertEquals(Code.SUCCESS, interpreterResult.code());
assertEquals(expectedOutput, interpreterResult.message());
}
@Test
public void oqlWithQueryException() throws Exception {
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
when(spyGeodeOqlInterpreter.getExceptionOnConnect()).thenReturn(
new RuntimeException("Test Exception On Connect"));
InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
assertEquals(Code.ERROR, interpreterResult.code());
assertEquals("Test Exception On Connect", interpreterResult.message());
}
@Test
public void oqlWithExceptionOnConnect() throws Exception {
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties()));
when(spyGeodeOqlInterpreter.getQueryService())
.thenThrow(new RuntimeException("Test exception"));
InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null);
assertEquals(Code.ERROR, interpreterResult.code());
assertEquals("Test exception", interpreterResult.message());
}
@Test
public void testFormType() {
assertEquals(FormType.SIMPLE, new GeodeOqlInterpreter(new Properties()).getFormType());
}
}

View file

@ -91,6 +91,7 @@
<module>angular</module>
<module>shell</module>
<module>hive</module>
<module>geode</module>
<module>tajo</module>
<module>flink</module>
<module>ignite</module>