mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
BigQuery Interpreter for Apazhe Zeppelin
This commit is contained in:
parent
6bd4ede7e5
commit
2a2bedcf3f
3 changed files with 515 additions and 0 deletions
192
bigquery/pom.xml
Normal file
192
bigquery/pom.xml
Normal file
|
|
@ -0,0 +1,192 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-bigquery</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
<name>Zeppelin: BigQuery interpreter</name>
|
||||
<url>http://www.apache.org</url>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.apis</groupId>
|
||||
<artifactId>google-api-services-bigquery</artifactId>
|
||||
<version>v2-rev265-1.21.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.oauth-client</groupId>
|
||||
<artifactId>google-oauth-client</artifactId>
|
||||
<version>${project.oauth.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.http-client</groupId>
|
||||
<artifactId>google-http-client-jackson2</artifactId>
|
||||
<version>${project.http.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.oauth-client</groupId>
|
||||
<artifactId>google-oauth-client-jetty</artifactId>
|
||||
<version>${project.oauth.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>2.6</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>jline</groupId>
|
||||
<artifactId>jline</artifactId>
|
||||
<version>2.12.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<version>1.9.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.mockrunner</groupId>
|
||||
<artifactId>mockrunner-jdbc</artifactId>
|
||||
<version>1.0.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<project.http.version>1.21.0</project.http.version>
|
||||
<project.oauth.version>1.21.0</project.oauth.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/bqsql</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/bqsql</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>
|
||||
org.apache.zeppelin.bigquery.bigQueryInterpreter
|
||||
</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptorRefs>
|
||||
<descriptorRef>jar-with-dependencies</descriptorRef>
|
||||
</descriptorRefs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,322 @@
|
|||
/**
|
||||
* Copyright 2016 Google Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.bigquery;
|
||||
|
||||
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
|
||||
|
||||
import com.google.api.client.http.HttpTransport;
|
||||
import com.google.api.client.http.javanet.NetHttpTransport;
|
||||
import com.google.api.client.json.JsonFactory;
|
||||
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
|
||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||
|
||||
import com.google.api.services.bigquery.Bigquery;
|
||||
import com.google.api.services.bigquery.BigqueryScopes;
|
||||
import com.google.api.client.json.GenericJson;
|
||||
import com.google.api.services.bigquery.Bigquery.Datasets;
|
||||
import com.google.api.services.bigquery.BigqueryRequest;
|
||||
import com.google.api.services.bigquery.model.DatasetList;
|
||||
import com.google.api.services.bigquery.model.Job;
|
||||
import com.google.api.services.bigquery.model.TableCell;
|
||||
import com.google.api.services.bigquery.model.TableFieldSchema;
|
||||
import com.google.api.services.bigquery.model.TableRow;
|
||||
import com.google.api.services.bigquery.model.TableSchema;
|
||||
import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults;
|
||||
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
|
||||
import com.google.api.services.bigquery.model.QueryRequest;
|
||||
import com.google.api.services.bigquery.model.QueryResponse;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Sets.SetView;
|
||||
import java.io.PrintStream;
|
||||
import java.io.Reader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* BigQuery interpreter for Zeppelin.
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@code bigquery.project_id} - Project ID in GCP</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>
|
||||
* How to use: <br/>
|
||||
* {@code %bqsql.sql<br/>
|
||||
* {@code
|
||||
* SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
|
||||
* FROM [bigquery-samples:airline_ontime_data.flights]
|
||||
* group by departure_airport
|
||||
* order by 2 desc
|
||||
* limit 10
|
||||
* }
|
||||
* </p>
|
||||
*
|
||||
*/
|
||||
|
||||
|
||||
public class bigQueryInterpreter extends Interpreter {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(bigQueryInterpreter.class);
|
||||
private static final char NEWLINE = '\n';
|
||||
private static final char TAB = '\t';
|
||||
private static Bigquery service = null;
|
||||
//Mutex created to create the singleton in thread-safe fashion.
|
||||
private static Object serviceLock = new Object();
|
||||
|
||||
static final String PROJECT_ID = "bqsql.project_id";
|
||||
static final String DEFAULT_PROJECT_ID = "";
|
||||
static final String WAIT_TIME = "bqsql.query_wait_time";
|
||||
static final String DEFAULT_WAIT_TIME = "5000";
|
||||
|
||||
// Registering BigQuery Interpreter and defining attributes
|
||||
static {
|
||||
Interpreter.register(
|
||||
"sql",
|
||||
"bqsql",
|
||||
bigQueryInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(PROJECT_ID, DEFAULT_PROJECT_ID, "Google Project ID")
|
||||
.add(WAIT_TIME, DEFAULT_WAIT_TIME, "Query timeout in Milliseconds")
|
||||
.build());
|
||||
}
|
||||
|
||||
private static final List NO_COMPLETION = new ArrayList<>();
|
||||
private Exception exceptionOnConnect;
|
||||
|
||||
private static final Function<CharSequence, String> sequenceToStringTransformer =
|
||||
new Function<CharSequence, String>() {
|
||||
public String apply(CharSequence seq) {
|
||||
return seq.toString();
|
||||
}
|
||||
};
|
||||
|
||||
public bigQueryInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
|
||||
//Function to return valid BigQuery Service
|
||||
@Override
|
||||
public void open() {
|
||||
if (service == null) {
|
||||
synchronized (serviceLock) {
|
||||
if (service == null) {
|
||||
try {
|
||||
service = createAuthorizedClient();
|
||||
exceptionOnConnect = null;
|
||||
logger.info("Opened BigQuery SQL Connection");
|
||||
} catch (IOException e) {
|
||||
logger.error("Cannot open connection", e);
|
||||
exceptionOnConnect = e;
|
||||
close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Function that Creates an authorized client to Google Bigquery.
|
||||
private static Bigquery createAuthorizedClient() throws IOException {
|
||||
HttpTransport transport = new NetHttpTransport();
|
||||
JsonFactory jsonFactory = new JacksonFactory();
|
||||
GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
|
||||
|
||||
if (credential.createScopedRequired()) {
|
||||
Collection<String> bigqueryScopes = BigqueryScopes.all();
|
||||
credential = credential.createScoped(bigqueryScopes);
|
||||
}
|
||||
|
||||
return new Bigquery.Builder(transport, jsonFactory, credential)
|
||||
.setApplicationName("Zeppelin/1.0 (GPN:Apache Zeppelin;)").build();
|
||||
}
|
||||
|
||||
//Function that generates and returns the schema and the rows as string
|
||||
public static String printRows(final GetQueryResultsResponse response) {
|
||||
StringBuilder msg = null;
|
||||
msg = new StringBuilder();
|
||||
for (TableFieldSchema schem: response.getSchema().getFields()) {
|
||||
msg.append(schem.getName());
|
||||
msg.append(TAB);
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
for (TableRow row : response.getRows()) {
|
||||
for (TableCell field : row.getF()) {
|
||||
msg.append(field.getV().toString());
|
||||
msg.append(TAB);
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
}
|
||||
return msg.toString();
|
||||
}
|
||||
|
||||
//Function to poll a job for completion. Future use
|
||||
public static Job pollJob(final Bigquery.Jobs.Get request, final long interval)
|
||||
throws IOException, InterruptedException {
|
||||
Job job = request.execute();
|
||||
while (!job.getStatus().getState().equals("DONE")) {
|
||||
System.out.println("Job is "
|
||||
+ job.getStatus().getState()
|
||||
+ " waiting " + interval + " milliseconds...");
|
||||
Thread.sleep(interval);
|
||||
job = request.execute();
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
//Function to page through the results of an arbitrary bigQuery request
|
||||
public static <T extends GenericJson> Iterator<T> getPages(
|
||||
final BigqueryRequest<T> requestTemplate) {
|
||||
class PageIterator implements Iterator<T> {
|
||||
private BigqueryRequest<T> request;
|
||||
private boolean hasNext = true;
|
||||
public PageIterator(final BigqueryRequest<T> requestTemplate) {
|
||||
this.request = requestTemplate;
|
||||
}
|
||||
public boolean hasNext() {
|
||||
return hasNext;
|
||||
}
|
||||
public T next() {
|
||||
if (!hasNext) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
try {
|
||||
T response = request.execute();
|
||||
if (response.containsKey("pageToken")) {
|
||||
request = request.set("pageToken", response.get("pageToken"));
|
||||
}
|
||||
else {
|
||||
hasNext = false;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
this.next();
|
||||
}
|
||||
}
|
||||
|
||||
return new PageIterator(requestTemplate);
|
||||
}
|
||||
|
||||
//Function to call bigQuery to run SQL and return results to the Interpreter for output
|
||||
private InterpreterResult executeSql(String sql) {
|
||||
int counter = 0;
|
||||
StringBuilder finalmessage = null;
|
||||
finalmessage = new StringBuilder("%table ");
|
||||
String projId = getProperty(PROJECT_ID);
|
||||
long wTime = Long.parseLong(getProperty(WAIT_TIME));
|
||||
Iterator<GetQueryResultsResponse> pages = run(sql, projId, wTime);
|
||||
while (pages.hasNext()) {
|
||||
finalmessage.append(printRows(pages.next()));
|
||||
}
|
||||
return new InterpreterResult(Code.SUCCESS, finalmessage.toString());
|
||||
}
|
||||
|
||||
//Function to run the SQL on bigQuery service
|
||||
public static Iterator<GetQueryResultsResponse> run(final String queryString,
|
||||
final String projId, final long wTime) {
|
||||
try {
|
||||
QueryResponse query = service.jobs().query(
|
||||
projId,
|
||||
new QueryRequest().setTimeoutMs(wTime).setQuery(queryString))
|
||||
.execute();
|
||||
GetQueryResults getRequest = service.jobs().getQueryResults(
|
||||
query.getJobReference().getProjectId(),
|
||||
query.getJobReference().getJobId());
|
||||
return getPages(getRequest);
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
logger.info("Close bqsql connection!");
|
||||
|
||||
service = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", sql);
|
||||
return executeSql(sql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
bigQueryInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
|
||||
logger.info("Cancel current query statement.");
|
||||
|
||||
if (service != null) {
|
||||
service = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor) {
|
||||
return NO_COMPLETION;
|
||||
}
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -73,6 +73,7 @@
|
|||
<module>cassandra</module>
|
||||
<module>elasticsearch</module>
|
||||
<module>alluxio</module>
|
||||
<module>bigquery</module>
|
||||
<module>zeppelin-web</module>
|
||||
<module>zeppelin-server</module>
|
||||
<module>zeppelin-distribution</module>
|
||||
|
|
|
|||
Loading…
Reference in a new issue