diff --git a/bigquery/pom.xml b/bigquery/pom.xml new file mode 100644 index 0000000000..29781ed7d6 --- /dev/null +++ b/bigquery/pom.xml @@ -0,0 +1,192 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + + + org.apache.zeppelin + zeppelin-bigquery + jar + 0.7.0-SNAPSHOT + Zeppelin: BigQuery interpreter + http://www.apache.org + + + + + com.google.apis + google-api-services-bigquery + v2-rev265-1.21.0 + + + com.google.oauth-client + google-oauth-client + ${project.oauth.version} + + + com.google.http-client + google-http-client-jackson2 + ${project.http.version} + + + com.google.oauth-client + google-oauth-client-jetty + ${project.oauth.version} + + + com.google.code.gson + gson + 2.6 + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + + com.google.guava + guava + + + + jline + jline + 2.12.1 + + + + junit + junit + test + + + + org.mockito + mockito-all + 1.9.5 + test + + + + com.mockrunner + mockrunner-jdbc + 1.0.8 + test + + + + + 1.21.0 + 1.21.0 + UTF-8 + + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/bqsql + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/bqsql + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + maven-assembly-plugin + + + + + org.apache.zeppelin.bigquery.bigQueryInterpreter + + + + + jar-with-dependencies + + + + + + diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/bigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/bigQueryInterpreter.java new file mode 100644 index 0000000000..2b24368152 --- /dev/null +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/bigQueryInterpreter.java @@ -0,0 +1,322 @@ +/** + * Copyright 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.bigquery; + +import static org.apache.commons.lang.StringUtils.containsIgnoreCase; + +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.json.jackson2.JacksonFactory; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.BigqueryScopes; +import com.google.api.client.json.GenericJson; +import com.google.api.services.bigquery.Bigquery.Datasets; +import com.google.api.services.bigquery.BigqueryRequest; +import com.google.api.services.bigquery.model.DatasetList; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults; +import com.google.api.services.bigquery.model.GetQueryResultsResponse; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.gson.Gson; + +import java.io.IOException; +import java.util.Collection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Properties; +import java.util.Set; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; +import java.io.PrintStream; +import java.io.Reader; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * BigQuery interpreter for Zeppelin. + * + * + * + *

+ * How to use:
+ * {@code %bqsql.sql
+ * {@code + * SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays + * FROM [bigquery-samples:airline_ontime_data.flights] + * group by departure_airport + * order by 2 desc + * limit 10 + * } + *

+ * + */ + + +public class bigQueryInterpreter extends Interpreter { + + private Logger logger = LoggerFactory.getLogger(bigQueryInterpreter.class); + private static final char NEWLINE = '\n'; + private static final char TAB = '\t'; + private static Bigquery service = null; + //Mutex created to create the singleton in thread-safe fashion. + private static Object serviceLock = new Object(); + + static final String PROJECT_ID = "bqsql.project_id"; + static final String DEFAULT_PROJECT_ID = ""; + static final String WAIT_TIME = "bqsql.query_wait_time"; + static final String DEFAULT_WAIT_TIME = "5000"; + + // Registering BigQuery Interpreter and defining attributes + static { + Interpreter.register( + "sql", + "bqsql", + bigQueryInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(PROJECT_ID, DEFAULT_PROJECT_ID, "Google Project ID") + .add(WAIT_TIME, DEFAULT_WAIT_TIME, "Query timeout in Milliseconds") + .build()); + } + + private static final List NO_COMPLETION = new ArrayList<>(); + private Exception exceptionOnConnect; + + private static final Function sequenceToStringTransformer = + new Function() { + public String apply(CharSequence seq) { + return seq.toString(); + } + }; + + public bigQueryInterpreter(Properties property) { + super(property); + } + + + //Function to return valid BigQuery Service + @Override + public void open() { + if (service == null) { + synchronized (serviceLock) { + if (service == null) { + try { + service = createAuthorizedClient(); + exceptionOnConnect = null; + logger.info("Opened BigQuery SQL Connection"); + } catch (IOException e) { + logger.error("Cannot open connection", e); + exceptionOnConnect = e; + close(); + } + } + } + } + } + + //Function that Creates an authorized client to Google Bigquery. + private static Bigquery createAuthorizedClient() throws IOException { + HttpTransport transport = new NetHttpTransport(); + JsonFactory jsonFactory = new JacksonFactory(); + GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory); + + if (credential.createScopedRequired()) { + Collection bigqueryScopes = BigqueryScopes.all(); + credential = credential.createScoped(bigqueryScopes); + } + + return new Bigquery.Builder(transport, jsonFactory, credential) + .setApplicationName("Zeppelin/1.0 (GPN:Apache Zeppelin;)").build(); + } + + //Function that generates and returns the schema and the rows as string + public static String printRows(final GetQueryResultsResponse response) { + StringBuilder msg = null; + msg = new StringBuilder(); + for (TableFieldSchema schem: response.getSchema().getFields()) { + msg.append(schem.getName()); + msg.append(TAB); + } + msg.append(NEWLINE); + for (TableRow row : response.getRows()) { + for (TableCell field : row.getF()) { + msg.append(field.getV().toString()); + msg.append(TAB); + } + msg.append(NEWLINE); + } + return msg.toString(); + } + + //Function to poll a job for completion. Future use + public static Job pollJob(final Bigquery.Jobs.Get request, final long interval) + throws IOException, InterruptedException { + Job job = request.execute(); + while (!job.getStatus().getState().equals("DONE")) { + System.out.println("Job is " + + job.getStatus().getState() + + " waiting " + interval + " milliseconds..."); + Thread.sleep(interval); + job = request.execute(); + } + return job; + } + + //Function to page through the results of an arbitrary bigQuery request + public static Iterator getPages( + final BigqueryRequest requestTemplate) { + class PageIterator implements Iterator { + private BigqueryRequest request; + private boolean hasNext = true; + public PageIterator(final BigqueryRequest requestTemplate) { + this.request = requestTemplate; + } + public boolean hasNext() { + return hasNext; + } + public T next() { + if (!hasNext) { + throw new NoSuchElementException(); + } + try { + T response = request.execute(); + if (response.containsKey("pageToken")) { + request = request.set("pageToken", response.get("pageToken")); + } + else { + hasNext = false; + } + return response; + } + catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + public void remove() { + this.next(); + } + } + + return new PageIterator(requestTemplate); + } + + //Function to call bigQuery to run SQL and return results to the Interpreter for output + private InterpreterResult executeSql(String sql) { + int counter = 0; + StringBuilder finalmessage = null; + finalmessage = new StringBuilder("%table "); + String projId = getProperty(PROJECT_ID); + long wTime = Long.parseLong(getProperty(WAIT_TIME)); + Iterator pages = run(sql, projId, wTime); + while (pages.hasNext()) { + finalmessage.append(printRows(pages.next())); + } + return new InterpreterResult(Code.SUCCESS, finalmessage.toString()); + } + + //Function to run the SQL on bigQuery service + public static Iterator run(final String queryString, + final String projId, final long wTime) { + try { + QueryResponse query = service.jobs().query( + projId, + new QueryRequest().setTimeoutMs(wTime).setQuery(queryString)) + .execute(); + GetQueryResults getRequest = service.jobs().getQueryResults( + query.getJobReference().getProjectId(), + query.getJobReference().getJobId()); + return getPages(getRequest); + } + catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void close() { + + logger.info("Close bqsql connection!"); + + service = null; + } + + @Override + public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) { + logger.info("Run SQL command '{}'", sql); + return executeSql(sql); + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + bigQueryInterpreter.class.getName() + this.hashCode()); + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public void cancel(InterpreterContext context) { + + logger.info("Cancel current query statement."); + + if (service != null) { + service = null; + } + } + + @Override + public List completion(String buf, int cursor) { + return NO_COMPLETION; + } +} diff --git a/pom.xml b/pom.xml index 607fce9187..064a18785d 100755 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ cassandra elasticsearch alluxio + bigquery zeppelin-web zeppelin-server zeppelin-distribution