Interpreter modification, License, doc changes

This commit is contained in:
Babu Prasad Elumalai 2016-07-15 04:46:54 +00:00
parent d85abd26c7
commit 764385c9bf
4 changed files with 87 additions and 41 deletions

View file

@ -40,6 +40,7 @@ import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.JobCancelResponse;
import com.google.gson.Gson;
import java.io.IOException;
@ -76,12 +77,14 @@ import java.util.NoSuchElementException;
* BigQuery interpreter for Zeppelin.
*
* <ul>
* <li>{@code bigquery.project_id} - Project ID in GCP</li>
* <li>{@code zeppelin.bigquery.project_id} - Project ID in GCP</li>
* <li>{@code zeppelin.bigquery.wait_time} - Query Timeout in ms</li>
* <li>{@code zeppelin.bigquery.max_no_of_rows} - Max Result size</li>
* </ul>
*
* <p>
* How to use: <br/>
* {@code %bqsql.sql<br/>
* {@code %bigquery.sql<br/>
* {@code
* SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
* FROM [bigquery-samples:airline_ontime_data.flights]
@ -103,22 +106,12 @@ public class bigQueryInterpreter extends Interpreter {
//Mutex created to create the singleton in thread-safe fashion.
private static Object serviceLock = new Object();
static final String PROJECT_ID = "bqsql.project_id";
static final String DEFAULT_PROJECT_ID = "";
static final String WAIT_TIME = "bqsql.query_wait_time";
static final String DEFAULT_WAIT_TIME = "5000";
static final String PROJECT_ID = "zeppelin.bigquery.project_id";
static final String WAIT_TIME = "zeppelin.bigquery.wait_time";
static final String MAX_ROWS = "zeppelin.bigquery.max_no_of_rows";
// Registering BigQuery Interpreter and defining attributes
static {
Interpreter.register(
"sql",
"bqsql",
bigQueryInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(PROJECT_ID, DEFAULT_PROJECT_ID, "Google Project ID")
.add(WAIT_TIME, DEFAULT_WAIT_TIME, "Query timeout in Milliseconds")
.build());
}
private static String jobId = null;
private static String projectId = null;
private static final List NO_COMPLETION = new ArrayList<>();
private Exception exceptionOnConnect;
@ -174,19 +167,24 @@ public class bigQueryInterpreter extends Interpreter {
public static String printRows(final GetQueryResultsResponse response) {
StringBuilder msg = null;
msg = new StringBuilder();
for (TableFieldSchema schem: response.getSchema().getFields()) {
msg.append(schem.getName());
msg.append(TAB);
}
msg.append(NEWLINE);
for (TableRow row : response.getRows()) {
for (TableCell field : row.getF()) {
msg.append(field.getV().toString());
try {
for (TableFieldSchema schem: response.getSchema().getFields()) {
msg.append(schem.getName());
msg.append(TAB);
}
}
msg.append(NEWLINE);
for (TableRow row : response.getRows()) {
for (TableCell field : row.getF()) {
msg.append(field.getV().toString());
msg.append(TAB);
}
msg.append(NEWLINE);
}
return msg.toString();
}
catch ( NullPointerException ex ) {
throw new NullPointerException("SQL Execution returned an error!");
}
return msg.toString();
}
//Function to poll a job for completion. Future use
@ -250,24 +248,32 @@ public class bigQueryInterpreter extends Interpreter {
finalmessage = new StringBuilder("%table ");
String projId = getProperty(PROJECT_ID);
long wTime = Long.parseLong(getProperty(WAIT_TIME));
Iterator<GetQueryResultsResponse> pages = run(sql, projId, wTime);
while (pages.hasNext()) {
finalmessage.append(printRows(pages.next()));
long maxRows = Long.parseLong(getProperty(MAX_ROWS));
Iterator<GetQueryResultsResponse> pages = run(sql, projId, wTime, maxRows);
try {
while (pages.hasNext()) {
finalmessage.append(printRows(pages.next()));
}
return new InterpreterResult(Code.SUCCESS, finalmessage.toString());
}
catch ( NullPointerException ex ) {
return new InterpreterResult(Code.ERROR, ex.getMessage());
}
return new InterpreterResult(Code.SUCCESS, finalmessage.toString());
}
//Function to run the SQL on bigQuery service
public static Iterator<GetQueryResultsResponse> run(final String queryString,
final String projId, final long wTime) {
final String projId, final long wTime, final long maxRows) {
try {
QueryResponse query = service.jobs().query(
projId,
new QueryRequest().setTimeoutMs(wTime).setQuery(queryString))
new QueryRequest().setTimeoutMs(wTime).setQuery(queryString).setMaxResults(maxRows))
.execute();
jobId = query.getJobReference().getJobId();
projectId = query.getJobReference().getProjectId();
GetQueryResults getRequest = service.jobs().getQueryResults(
query.getJobReference().getProjectId(),
query.getJobReference().getJobId());
projectId,
jobId);
return getPages(getRequest);
}
catch (IOException e) {
@ -309,10 +315,21 @@ public class bigQueryInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
logger.info("Cancel current query statement.");
logger.info("Trying to Cancel current query statement.");
if (service != null) {
service = null;
if (service != null && jobId != null && projectId != null) {
try {
Bigquery.Jobs.Cancel request = service.jobs().cancel(projectId, jobId);
JobCancelResponse response = request.execute();
jobId = null;
logger.info("Query Execution cancelled");
}
catch (IOException ex) {
logger.error("Could not cancel the SQL execution");
}
}
else {
logger.info("Query Execution was already cancelled");
}
}

View file

@ -66,6 +66,7 @@
<li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
<li><a href="{{BASE_PATH}}/interpreter/shell.html">Shell</a></li>
<li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
<li><a href="{{BASE_PATH}}/interpreter/bigquery.html">BigQuery</a></li>
</ul>
</li>
<li>
@ -113,4 +114,4 @@
</nav><!--/.navbar-collapse -->
</div>
</div>

View file

@ -12,6 +12,31 @@ group: interpreter
## Overview
[BigQuery](https://cloud.google.com/bigquery/what-is-bigquery) is a highly scalable no-ops data warehouse in the Google Cloud Platform. Querying massive datasets can be time consuming and expensive without the right hardware and infrastructure. Google BigQuery solves this problem by enabling super-fast SQL queries against append-only tables using the processing power of Google's infrastructure. Simply move your data into BigQuery and let us handle the hard work. You can control access to both the project and your data based on your business needs, such as giving others the ability to view or query your data.
## Configuration
<table class="table-configuration">
<tr>
<th>Name</th>
<th>Default Value</th>
<th>Description</th>
</tr>
<tr>
<td>zeppelin.bigquery.project_id</td>
<td> </td>
<td>Google Project Id</td>
</tr>
<tr>
<td>zeppelin.bigquery.wait_time</td>
<td>5000</td>
<td>Query Timeout in Milliseconds</td>
</tr>
<tr>
<td>zeppelin.bigquery.max_no_of_rows</td>
<td>100000</td>
<td>Max result set size</td>
</tr>
</table>
## BigQuery API
Zeppelin is built against BigQuery API version v2-rev265-1.21.0.

View file

@ -74,7 +74,7 @@ The following components are provided under Apache License.
(Apache 2.0) json-flattener (com.github.wnameless:json-flattener:0.1.6 - https://github.com/wnameless/json-flattener)
(Apache 2.0) Spatial4J (com.spatial4j:spatial4j:0.4.1 - https://github.com/spatial4j/spatial4j)
(Apache 2.0) T-Digest (com.tdunning:t-digest:3.0 - https://github.com/tdunning/t-digest)
(Apache 2.0) Netty (io.netty:netty:3.8.0.Final - http://netty.io/)
(Apache 2.0) Netty (io.netty:netty:3.10.5.Final - http://netty.io/)
(Apache 2.0) Lucene Common Analyzers (org.apache.lucene:lucene-analyzers-common:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-analyzers-common)
(Apache 2.0) Lucene Memory (org.apache.lucene:lucene-backward-codecs:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-backward-codecs)
(Apache 2.0) Lucene Core (org.apache.lucene:lucene-core:5.3.1 - http://lucene.apache.org/lucene-parent/lucene-core)
@ -101,8 +101,12 @@ The following components are provided under Apache License.
(Apache 2.0) Alluxio Underfs Local (org.alluxio:alluxio-underfs-local:1.0.0 - http://alluxio.org)
(Apache 2.0) Microsoft Azure Storage Library for Java (com.microsoft.azure:azure-storage:4.0.0 - https://github.com/Azure/azure-storage-java)
(Apache 2.0) Roboto Font (https://github.com/google/roboto/)
<<<<<<< HEAD
(Apache 2.0) stream (com.clearspring.analytics:stream:2.7.0) - https://github.com/addthis/stream-lib/blob/v2.7.0/LICENSE.txt
(Apache 2.0) io.dropwizard.metrics:3.1.2 - https://github.com/dropwizard/metrics/blob/v3.1.2/LICENSE
=======
(Apache 2.0) Google BigQuery API for Java (com.google.api.services.bigquery:v2-rev265-1.21.0 - https://cloud.google.com/bigquery/)
>>>>>>> Interpreter modification, License, doc changes
========================================================================
@ -172,7 +176,6 @@ The following components are provided under the BSD-style License.
(New BSD License) JGit (org.eclipse.jgit:org.eclipse.jgit:jar:4.1.1.201511131810-r - https://eclipse.org/jgit/)
(New BSD License) Kryo (com.esotericsoftware.kryo:kryo:3.0.3 - http://code.google.com/p/kryo/)
(New BSD License) leveldbjni (org.fusesource.leveldbjni:leveldbjni-all:1.8) - https://github.com/fusesource/leveldbjni/blob/leveldbjni-1.8/license.txt
(New BSD License) MinLog (com.esotericsoftware.minlog:minlog:1.3 - http://code.google.com/p/minlog/)
(New BSD License) ReflectASM (com.esotericsoftware.reflectasm:reflectasm:1.07 - http://code.google.com/p/reflectasm/)
(BSD-like) Scala Library (org.scala-lang:scala-library:2.11.7 - http://www.scala-lang.org/)