BigQuery Interpreter for Apazhe Zeppelin

This commit is contained in:
Babu Prasad Elumalai 2016-07-12 21:37:10 +00:00
parent 6bd4ede7e5
commit 2a2bedcf3f
3 changed files with 515 additions and 0 deletions

192
bigquery/pom.xml Normal file
View file

@ -0,0 +1,192 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-bigquery</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: BigQuery interpreter</name>
<url>http://www.apache.org</url>
<dependencies>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>v2-rev265-1.21.0</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
<version>${project.oauth.version}</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
<version>${project.http.version}</version>
</dependency>
<dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client-jetty</artifactId>
<version>${project.oauth.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mockrunner</groupId>
<artifactId>mockrunner-jdbc</artifactId>
<version>1.0.8</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<project.http.version>1.21.0</project.http.version>
<project.oauth.version>1.21.0</project.oauth.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/bqsql</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/bqsql</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>
org.apache.zeppelin.bigquery.bigQueryInterpreter
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,322 @@
/**
* Copyright 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.bigquery;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.client.json.GenericJson;
import com.google.api.services.bigquery.Bigquery.Datasets;
import com.google.api.services.bigquery.BigqueryRequest;
import com.google.api.services.bigquery.model.DatasetList;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.Collection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import java.io.PrintStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* BigQuery interpreter for Zeppelin.
*
* <ul>
* <li>{@code bigquery.project_id} - Project ID in GCP</li>
* </ul>
*
* <p>
* How to use: <br/>
* {@code %bqsql.sql<br/>
* {@code
* SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
* FROM [bigquery-samples:airline_ontime_data.flights]
* group by departure_airport
* order by 2 desc
* limit 10
* }
* </p>
*
*/
public class bigQueryInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(bigQueryInterpreter.class);
private static final char NEWLINE = '\n';
private static final char TAB = '\t';
private static Bigquery service = null;
//Mutex created to create the singleton in thread-safe fashion.
private static Object serviceLock = new Object();
static final String PROJECT_ID = "bqsql.project_id";
static final String DEFAULT_PROJECT_ID = "";
static final String WAIT_TIME = "bqsql.query_wait_time";
static final String DEFAULT_WAIT_TIME = "5000";
// Registering BigQuery Interpreter and defining attributes
static {
Interpreter.register(
"sql",
"bqsql",
bigQueryInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(PROJECT_ID, DEFAULT_PROJECT_ID, "Google Project ID")
.add(WAIT_TIME, DEFAULT_WAIT_TIME, "Query timeout in Milliseconds")
.build());
}
private static final List NO_COMPLETION = new ArrayList<>();
private Exception exceptionOnConnect;
private static final Function<CharSequence, String> sequenceToStringTransformer =
new Function<CharSequence, String>() {
public String apply(CharSequence seq) {
return seq.toString();
}
};
public bigQueryInterpreter(Properties property) {
super(property);
}
//Function to return valid BigQuery Service
@Override
public void open() {
if (service == null) {
synchronized (serviceLock) {
if (service == null) {
try {
service = createAuthorizedClient();
exceptionOnConnect = null;
logger.info("Opened BigQuery SQL Connection");
} catch (IOException e) {
logger.error("Cannot open connection", e);
exceptionOnConnect = e;
close();
}
}
}
}
}
//Function that Creates an authorized client to Google Bigquery.
private static Bigquery createAuthorizedClient() throws IOException {
HttpTransport transport = new NetHttpTransport();
JsonFactory jsonFactory = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
if (credential.createScopedRequired()) {
Collection<String> bigqueryScopes = BigqueryScopes.all();
credential = credential.createScoped(bigqueryScopes);
}
return new Bigquery.Builder(transport, jsonFactory, credential)
.setApplicationName("Zeppelin/1.0 (GPN:Apache Zeppelin;)").build();
}
//Function that generates and returns the schema and the rows as string
public static String printRows(final GetQueryResultsResponse response) {
StringBuilder msg = null;
msg = new StringBuilder();
for (TableFieldSchema schem: response.getSchema().getFields()) {
msg.append(schem.getName());
msg.append(TAB);
}
msg.append(NEWLINE);
for (TableRow row : response.getRows()) {
for (TableCell field : row.getF()) {
msg.append(field.getV().toString());
msg.append(TAB);
}
msg.append(NEWLINE);
}
return msg.toString();
}
//Function to poll a job for completion. Future use
public static Job pollJob(final Bigquery.Jobs.Get request, final long interval)
throws IOException, InterruptedException {
Job job = request.execute();
while (!job.getStatus().getState().equals("DONE")) {
System.out.println("Job is "
+ job.getStatus().getState()
+ " waiting " + interval + " milliseconds...");
Thread.sleep(interval);
job = request.execute();
}
return job;
}
//Function to page through the results of an arbitrary bigQuery request
public static <T extends GenericJson> Iterator<T> getPages(
final BigqueryRequest<T> requestTemplate) {
class PageIterator implements Iterator<T> {
private BigqueryRequest<T> request;
private boolean hasNext = true;
public PageIterator(final BigqueryRequest<T> requestTemplate) {
this.request = requestTemplate;
}
public boolean hasNext() {
return hasNext;
}
public T next() {
if (!hasNext) {
throw new NoSuchElementException();
}
try {
T response = request.execute();
if (response.containsKey("pageToken")) {
request = request.set("pageToken", response.get("pageToken"));
}
else {
hasNext = false;
}
return response;
}
catch (IOException e) {
e.printStackTrace();
return null;
}
}
public void remove() {
this.next();
}
}
return new PageIterator(requestTemplate);
}
//Function to call bigQuery to run SQL and return results to the Interpreter for output
private InterpreterResult executeSql(String sql) {
int counter = 0;
StringBuilder finalmessage = null;
finalmessage = new StringBuilder("%table ");
String projId = getProperty(PROJECT_ID);
long wTime = Long.parseLong(getProperty(WAIT_TIME));
Iterator<GetQueryResultsResponse> pages = run(sql, projId, wTime);
while (pages.hasNext()) {
finalmessage.append(printRows(pages.next()));
}
return new InterpreterResult(Code.SUCCESS, finalmessage.toString());
}
//Function to run the SQL on bigQuery service
public static Iterator<GetQueryResultsResponse> run(final String queryString,
final String projId, final long wTime) {
try {
QueryResponse query = service.jobs().query(
projId,
new QueryRequest().setTimeoutMs(wTime).setQuery(queryString))
.execute();
GetQueryResults getRequest = service.jobs().getQueryResults(
query.getJobReference().getProjectId(),
query.getJobReference().getJobId());
return getPages(getRequest);
}
catch (IOException e) {
e.printStackTrace();
return null;
}
}
@Override
public void close() {
logger.info("Close bqsql connection!");
service = null;
}
@Override
public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '{}'", sql);
return executeSql(sql);
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
bigQueryInterpreter.class.getName() + this.hashCode());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public void cancel(InterpreterContext context) {
logger.info("Cancel current query statement.");
if (service != null) {
service = null;
}
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return NO_COMPLETION;
}
}

View file

@ -73,6 +73,7 @@
<module>cassandra</module>
<module>elasticsearch</module>
<module>alluxio</module>
<module>bigquery</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>