diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 13e4d1dca3..f48a960fab 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -72,7 +72,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/geode/pom.xml b/geode/pom.xml new file mode 100644 index 0000000000..806429e213 --- /dev/null +++ b/geode/pom.xml @@ -0,0 +1,156 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-geode + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: Apache Geode Interpreter + http://geode.incubator.apache.org/ + + + 8.1.0 + + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + com.gemstone.gemfire + gemfire + ${gemfire.version} + + + + org.apache.commons + commons-exec + 1.1 + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + junit + junit + test + + + + org.mockito + mockito-all + 1.9.5 + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/psql + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/psql + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + + spring-gemstone-releases + http://repo.springsource.org/gemstone-release-cache/ + + + diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java new file mode 100644 index 0000000000..9fb9599743 --- /dev/null +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.zeppelin.geode; + +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.gemstone.gemfire.cache.client.ClientCache; +import com.gemstone.gemfire.cache.client.ClientCacheFactory; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.pdx.PdxInstance; + +/** + * Apache Geode OQL Interpreter (http://geode.incubator.apache.org) + * + * + *

+ * Sample usage:
+ * {@code %geode.oql}
+ * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > '95'} + *

+ * + * The OQL spec and sample queries: + * http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html + * + *

+ * Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query + * "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a + * PartitionedRegion. + *

+ */ +public class GeodeOqlInterpreter extends Interpreter { + + Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); + int commandTimeOut = 600000; + + private static final String TABLE_MAGIC = "%table "; + + public static final String LOCATOR_HOST = "geode.locator.host"; + public static final String LOCATOR_PORT = "geode.locator.port"; + + static { + Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(), + new InterpreterPropertyBuilder().add(LOCATOR_HOST, "localhost", "The Geode Locator Host.") + .add(LOCATOR_PORT, "10334", "The Geode Locator Port").build()); + } + + private ClientCache clientCache = null; + private QueryService queryService = null; + private Exception exceptionOnConnect; + + public GeodeOqlInterpreter(Properties property) { + super(property); + } + + protected ClientCache getClientCache() { + + String locatorHost = getProperty(LOCATOR_HOST); + int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT)); + + ClientCache clientCache = + new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create(); + + return clientCache; + } + + @Override + public void open() { + logger.info("Geode open connection called!"); + try { + clientCache = getClientCache(); + queryService = clientCache.getQueryService(); + exceptionOnConnect = null; + logger.info("Successfully created Geode connection"); + } catch (Exception e) { + logger.error("Cannot open connection", e); + exceptionOnConnect = e; + } + } + + @Override + public void close() { + try { + if (clientCache != null) { + clientCache.close(); + } + + if (queryService != null) { + queryService.closeCqs(); + } + + } catch (Exception e) { + logger.error("Cannot close connection", e); + } finally { + clientCache = null; + queryService = null; + exceptionOnConnect = null; + } + } + + + private InterpreterResult executeOql(String oql) { + try { + + if (getExceptionOnConnect() != null) { + return new InterpreterResult(Code.ERROR, getExceptionOnConnect().getMessage()); + } + + @SuppressWarnings("unchecked") + SelectResults results = + (SelectResults) getQueryService().newQuery(oql).execute(); + + StringBuilder msg = new StringBuilder(TABLE_MAGIC); + boolean isTableHeaderSet = false; + + Iterator iterator = results.iterator(); + while (iterator.hasNext()) { + + Object entry = iterator.next(); + + if (entry instanceof Number) { + handleNumberEntry(isTableHeaderSet, entry, msg); + } else if (entry instanceof Struct) { + handleStructEntry(isTableHeaderSet, entry, msg); + } else if (entry instanceof PdxInstance) { + handlePdxInstanceEntry(isTableHeaderSet, entry, msg); + } else { + handleUnsupportedTypeEntry(isTableHeaderSet, entry, msg); + } + + isTableHeaderSet = true; + msg.append('\n'); + } + + return new InterpreterResult(Code.SUCCESS, msg.toString()); + + } catch (Exception ex) { + logger.error("Cannot run " + oql, ex); + return new InterpreterResult(Code.ERROR, ex.getMessage()); + } + } + + private void handleStructEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + Struct struct = (Struct) entry; + if (!isTableHeaderSet) { + for (String titleName : struct.getStructType().getFieldNames()) { + msg.append(titleName).append('\t'); + } + msg.append('\n'); + } + + for (String titleName : struct.getStructType().getFieldNames()) { + msg.append(struct.get(titleName)).append('\t'); + } + } + + private void handlePdxInstanceEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + PdxInstance pdxEntry = (PdxInstance) entry; + if (!isTableHeaderSet) { + for (String titleName : pdxEntry.getFieldNames()) { + msg.append(titleName).append('\t'); + } + msg.append('\n'); + } + + for (String titleName : pdxEntry.getFieldNames()) { + msg.append(pdxEntry.getField(titleName)).append('\t'); + } + } + + private void handleNumberEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + if (!isTableHeaderSet) { + msg.append("Result").append('\n'); + } + msg.append((Number) entry); + } + + private void handleUnsupportedTypeEntry(boolean isTableHeaderSet, + Object entry, StringBuilder msg) { + if (!isTableHeaderSet) { + msg.append("Unsuppoted Type").append('\n'); + } + msg.append("" + entry); + } + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Run OQL command '{}'", cmd); + return executeOql(cmd); + } + + @Override + public void cancel(InterpreterContext context) { + // Do nothing + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + GeodeOqlInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + + // Test only + QueryService getQueryService() { + return this.queryService; + } + + Exception getExceptionOnConnect() { + return this.exceptionOnConnect; + } +} diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java new file mode 100644 index 0000000000..70a5cb3513 --- /dev/null +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -0,0 +1,125 @@ +package org.apache.zeppelin.geode; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.Test; + +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.cache.query.internal.StructImpl; +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; +import com.gemstone.gemfire.pdx.PdxInstance; +import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl; +import com.gemstone.gemfire.pdx.internal.PdxType; + +public class GeodeOqlInterpreterTest { + + private static final String OQL_QUERY = "select * from /region"; + + @Test + public void oqlNumberResponse() throws Exception { + oqlTest(new ArrayList(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n"); + } + + @Test + public void oqlStructResponse() throws Exception { + String[] fields = new String[] {"field1", "field2"}; + Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"}); + + oqlTest(new ArrayList(Arrays.asList(struct)).iterator(), + "field1\tfield2\t\nval1\tval2\t\n"); + } + + @Test + public void oqlPdxInstanceResponse() throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes()); + PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); + oqlTest(new ArrayList(Arrays.asList(pdxInstance)).iterator(), "\n\n"); + } + + private static class DummyUnspportedType { + @Override + public String toString() { + return "Unsupported Indeed"; + } + } + + @Test + public void oqlUnsupportedTypeResponse() throws Exception { + DummyUnspportedType unspported1 = new DummyUnspportedType(); + DummyUnspportedType unspported2 = new DummyUnspportedType(); + oqlTest(new ArrayList(Arrays.asList(unspported1, unspported2)).iterator(), + "Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n"); + } + + private void oqlTest(Iterator queryResponseIterator, String expectedOutput) + throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS); + + when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService); + + @SuppressWarnings("unchecked") + SelectResults mockResults = mock(SelectResults.class); + + when(mockQueryService.newQuery(eq(OQL_QUERY)).execute()).thenReturn(mockResults); + + when(mockResults.iterator()).thenReturn(queryResponseIterator); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.SUCCESS, interpreterResult.code()); + assertEquals(expectedOutput, interpreterResult.message()); + } + + @Test + public void oqlWithQueryException() throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + when(spyGeodeOqlInterpreter.getExceptionOnConnect()).thenReturn( + new RuntimeException("Test Exception On Connect")); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.ERROR, interpreterResult.code()); + assertEquals("Test Exception On Connect", interpreterResult.message()); + } + + @Test + public void oqlWithExceptionOnConnect() throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + when(spyGeodeOqlInterpreter.getQueryService()) + .thenThrow(new RuntimeException("Test exception")); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.ERROR, interpreterResult.code()); + assertEquals("Test exception", interpreterResult.message()); + } + + @Test + public void testFormType() { + assertEquals(FormType.SIMPLE, new GeodeOqlInterpreter(new Properties()).getFormType()); + } +} diff --git a/pom.xml b/pom.xml index 9e5f54e700..ecdebddd75 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ angular shell hive + geode tajo flink ignite