mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
refactor pig Interpreter
This commit is contained in:
parent
c28beb5889
commit
a09a7f78c6
14 changed files with 1263 additions and 357 deletions
|
|
@ -149,6 +149,24 @@ elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
|
|||
else
|
||||
echo "HBASE_HOME and HBASE_CONF_DIR are not set, configuration might not be loaded"
|
||||
fi
|
||||
elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
|
||||
# autodetect HADOOP_CONF_HOME by heuristic
|
||||
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
|
||||
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
|
||||
export HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
|
||||
elif [[ -d "/etc/hadoop/conf" ]]; then
|
||||
export HADOOP_CONF_DIR="/etc/hadoop/conf"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
|
||||
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
|
||||
fi
|
||||
|
||||
# autodetect TEZ_CONF_DIR
|
||||
TEZ_CONF_DIR = ${TEZ_CONF_DIR:=/etc/tez/conf}
|
||||
echo "TEZ_CONF_DIR:${TEZ_CONF_DIR}"
|
||||
ZEPPELIN_INTP_CLASSPATH+=":${TEZ_CONF_DIR}"
|
||||
fi
|
||||
|
||||
addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandr
|
|||
elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter
|
||||
file org.apache.zeppelin:zeppelin-file:0.6.1 HDFS file interpreter
|
||||
flink org.apache.zeppelin:zeppelin-flink_2.11:0.6.1 Flink interpreter built with Scala 2.11
|
||||
pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter
|
||||
hbase org.apache.zeppelin:zeppelin-hbase:0.6.1 Hbase interpreter
|
||||
ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.6.1 Ignite interpreter built with Scala 2.11
|
||||
jdbc org.apache.zeppelin:zeppelin-jdbc:0.6.1 Jdbc interpreter
|
||||
|
|
|
|||
|
|
@ -1,49 +0,0 @@
|
|||
---
|
||||
layout: page
|
||||
title: "Pig Interpreter"
|
||||
description: ""
|
||||
group: manual
|
||||
---
|
||||
{% include JB/setup %}
|
||||
|
||||
|
||||
## Pig interpreter for Apache Zeppelin
|
||||
[Apache Pig](https://pig.apache.org/) is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.
|
||||
|
||||
- Supported operations through Zeppelin
|
||||
- Play button: Run multiple lines of pig latin (delimited by semi-colon) entered into Zeppelin cell
|
||||
- Pause button: Cancel the exection of pig
|
||||
- Output: Output of jobs are displayed in Zeppelin (for both jobs that were sucessful and those that failed)
|
||||
|
||||
- Unsupported operations
|
||||
- Progress bar
|
||||
|
||||
### How to setup Pig
|
||||
Install Pig as you would normally do on the same node where Zeppelin is running - see [documentation](https://pig.apache.org/). If installing through Ambari, you just need to install the client on the Zeppelin node.
|
||||
|
||||
### How to configure interpreter
|
||||
At the "Interpreters" menu, you have to create a new Pig interpreter and provide next properties:
|
||||
|
||||
property | value | Description
|
||||
---------|----------|-----
|
||||
executable | pig | Path to pig executable. If pig is part of PATH, then just pig will work
|
||||
args | -useHCatalog -exectype local | Arguments to pass pig when starting. Launch 'pig -help' for list of supported options. For example, the options for exectype: local|mapreduce|tez
|
||||
timeout | 600000 | Time (in milliseconds) to wait for pig job before timing out
|
||||
|
||||
### How to test it's working
|
||||
|
||||
Run below example taken from [this tutorial](http://hortonworks.com/hadoop-tutorial/how-to-use-basic-pig-commands/)
|
||||
|
||||
```
|
||||
%sh
|
||||
rm -f infochimps_dataset_4778_download_16677-csv.zip
|
||||
wget https://s3.amazonaws.com/hw-sandbox/tutorial1/infochimps_dataset_4778_download_16677-csv.zip -O /tmp/infochimps_dataset_4778_download_16677-csv.zip
|
||||
unzip /tmp/infochimps_dataset_4778_download_16677-csv.zip -d /tmp
|
||||
```
|
||||
```
|
||||
%pig
|
||||
DIV_A = LOAD 'file:///tmp/infochimps_dataset_4778_download_16677/NYSE/NYSE_dividends_A.csv' using PigStorage(',') AS (exchange:chararray, symbol:chararray, date:chararray, dividend:float);
|
||||
B = FILTER DIV_A BY symbol=='AZZ';
|
||||
C = GROUP B BY dividend;
|
||||
dump C;
|
||||
```
|
||||
283
pig/pom.xml
283
pig/pom.xml
|
|
@ -16,134 +16,185 @@
|
|||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>zeppelin-pig</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
<name>Zeppelin: Apache Pig Interpreter</name>
|
||||
<description>Zeppelin interpreter for Apache Pig</description>
|
||||
<url>http://zeppelin.apache.org</url>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-pig</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: Apache Pig Interpreter</name>
|
||||
<description>Zeppelin interprter for Apache Pig</description>
|
||||
<url>http://zeppelin.incubator.apache.org</url>
|
||||
|
||||
<properties>
|
||||
<pig.version>0.15.0</pig.version>
|
||||
</properties>
|
||||
<properties>
|
||||
<pig.version>0.16.0</pig.version>
|
||||
<hadoop.version>2.6.0</hadoop.version>
|
||||
<tez.version>0.7.0</tez.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.pig</groupId>
|
||||
<artifactId>pig</artifactId>
|
||||
<version>${pig.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.pig</groupId>
|
||||
<artifactId>pig</artifactId>
|
||||
<classifier>h2</classifier>
|
||||
<version>${pig.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>1.3.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>2.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<version>1.9.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.mockrunner</groupId>
|
||||
<artifactId>mockrunner-jdbc</artifactId>
|
||||
<version>1.0.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-api</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-common</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-dag</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-runtime-library</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/pig</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/pig</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-runtime-internals</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
</plugins>
|
||||
</build>
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-mapreduce</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-yarn-timeline-history-with-acls</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.mockito</groupId>-->
|
||||
<!--<artifactId>mockito-all</artifactId>-->
|
||||
<!--<version>1.9.5</version>-->
|
||||
<!--<scope>test</scope>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>com.mockrunner</groupId>-->
|
||||
<!--<artifactId>mockrunner-jdbc</artifactId>-->
|
||||
<!--<version>1.0.8</version>-->
|
||||
<!--<scope>test</scope>-->
|
||||
<!--</dependency>-->
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/pig
|
||||
</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/pig
|
||||
</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.pig.PigServer;
|
||||
import org.apache.pig.backend.BackendException;
|
||||
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
|
||||
import org.apache.pig.backend.hadoop.executionengine.Launcher;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class BasePigInterpreter extends Interpreter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(BasePigInterpreter.class);
|
||||
|
||||
protected ConcurrentHashMap<String, PigScriptListener> listenerMap = new ConcurrentHashMap<>();
|
||||
|
||||
public BasePigInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
LOGGER.info("Cancel paragraph:" + context.getParagraphId());
|
||||
PigScriptListener listener = listenerMap.get(context.getParagraphId());
|
||||
if (listener != null) {
|
||||
Set<String> jobIds = listener.getJobIds();
|
||||
if (jobIds.isEmpty()) {
|
||||
LOGGER.info("No job is started, so can not cancel paragraph:" + context.getParagraphId());
|
||||
}
|
||||
for (String jobId : jobIds) {
|
||||
LOGGER.info("Kill jobId:" + jobId);
|
||||
HExecutionEngine engine =
|
||||
(HExecutionEngine) getPigServer().getPigContext().getExecutionEngine();
|
||||
try {
|
||||
Field launcherField = HExecutionEngine.class.getDeclaredField("launcher");
|
||||
launcherField.setAccessible(true);
|
||||
Launcher launcher = (Launcher) launcherField.get(engine);
|
||||
// It doesn't work for Tez Engine due to PIG-5035
|
||||
launcher.killJob(jobId, new Configuration());
|
||||
} catch (NoSuchFieldException | BackendException | IllegalAccessException e) {
|
||||
LOGGER.error("Fail to cancel paragraph:" + context.getParagraphId(), e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("No PigScriptListener found, can not cancel paragraph:"
|
||||
+ context.getParagraphId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
PigScriptListener listener = listenerMap.get(context.getParagraphId());
|
||||
if (listener != null) {
|
||||
return listener.getProgress();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
PigInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
public abstract PigServer getPigServer();
|
||||
}
|
||||
|
|
@ -17,139 +17,120 @@
|
|||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Arrays;
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.pig.PigServer;
|
||||
import org.apache.pig.impl.logicalLayer.FrontendException;
|
||||
import org.apache.pig.tools.pigstats.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Pig interpreter for Zeppelin.
|
||||
* Closely follows code for shell interpreter
|
||||
*/
|
||||
public class PigInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(PigInterpreter.class);
|
||||
public class PigInterpreter extends BasePigInterpreter {
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(PigInterpreter.class);
|
||||
|
||||
private static final String NEWLINE = "\n";
|
||||
|
||||
//Executable name used to start grunt shell
|
||||
public static final String PIG_START_EXE = "pig.executable";
|
||||
public static final String DEFAULT_START_EXE = "pig";
|
||||
|
||||
//Arguments to start pig with. More details available via 'pig -help'
|
||||
public static final String PIG_START_ARGS = "pig.start.args";
|
||||
public static final String DEFAULT_START_ARGS = "-useHCatalog -exectype local";
|
||||
|
||||
//How long to wait before timing out (ms)
|
||||
public static final String PIG_TIMEOUT_MS = "pig.timeout.ms";
|
||||
public static final String DEFAULT_TIMEOUT_MS = "600000";
|
||||
|
||||
DefaultExecutor executor = null;
|
||||
static {
|
||||
Interpreter.register(
|
||||
"pig",
|
||||
"pig",
|
||||
PigInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(PIG_START_EXE, DEFAULT_START_EXE, "Pig executable used to start grunt shell")
|
||||
.add(PIG_START_ARGS, DEFAULT_START_ARGS, "Starting arguments")
|
||||
.add(PIG_TIMEOUT_MS, DEFAULT_TIMEOUT_MS, "Timeout (ms)")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
private PigServer pigServer;
|
||||
private boolean includeJobStats = false;
|
||||
|
||||
public PigInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {}
|
||||
public void open() {
|
||||
String execType = getProperty("zeppelin.pig.execType");
|
||||
if (execType == null) {
|
||||
execType = "mapreduce";
|
||||
}
|
||||
String includeJobStats = getProperty("zeppelin.pig.includeJobStats");
|
||||
if (includeJobStats != null) {
|
||||
this.includeJobStats = Boolean.parseBoolean(includeJobStats);
|
||||
}
|
||||
try {
|
||||
pigServer = new PigServer(execType);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Fail to launch PigServer", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
public void close() {
|
||||
pigServer = null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
// use commandline to store string corresponding to pig shell command
|
||||
// start with pig exectable name (or full path if provided)...
|
||||
CommandLine cmdLine = CommandLine.parse(getProperty(PIG_START_EXE).trim());
|
||||
|
||||
// ...add any CLI arguments specified by user in interpreter settings
|
||||
String startArgs = getProperty(PIG_START_ARGS).trim();
|
||||
if (startArgs.length() > 0){
|
||||
logger.info("Start arguments passed to pig: " + startArgs);
|
||||
List<String> argList = Arrays.asList(startArgs.split("\\s+"));
|
||||
for (String arg : argList) {
|
||||
cmdLine.addArgument(arg, false);
|
||||
// remember the origial stdout, because we will redirect stdout to capture
|
||||
// the pig dump output.
|
||||
PrintStream originalStdOut = System.out;
|
||||
ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream();
|
||||
File tmpFile = null;
|
||||
try {
|
||||
tmpFile = PigUtils.createTempPigScript(cmd);
|
||||
System.setOut(new PrintStream(bytesOutput));
|
||||
// each thread should its own ScriptState & PigStats
|
||||
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
|
||||
// reset PigStats, otherwise you may get the PigStats of last job in the same thread
|
||||
// because PigStats is ThreadLocal variable
|
||||
PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
|
||||
PigScriptListener scriptListener = new PigScriptListener();
|
||||
ScriptState.get().registerListener(scriptListener);
|
||||
listenerMap.put(contextInterpreter.getParagraphId(), scriptListener);
|
||||
pigServer.registerScript(tmpFile.getAbsolutePath());
|
||||
} catch (IOException e) {
|
||||
if (e instanceof FrontendException) {
|
||||
FrontendException fe = (FrontendException) e;
|
||||
if (!fe.getMessage().contains("Backend error :")) {
|
||||
// If the error message contains "Backend error :", that means the exception is from
|
||||
// backend.
|
||||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
PigStats stats = PigStats.get();
|
||||
if (stats != null) {
|
||||
String errorMsg = PigUtils.extactJobStats(stats);
|
||||
if (errorMsg != null) {
|
||||
LOGGER.debug("Error Message:" + errorMsg);
|
||||
return new InterpreterResult(Code.ERROR, errorMsg);
|
||||
}
|
||||
}
|
||||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
} finally {
|
||||
System.setOut(originalStdOut);
|
||||
listenerMap.remove(contextInterpreter.getParagraphId());
|
||||
if (tmpFile != null) {
|
||||
tmpFile.delete();
|
||||
}
|
||||
}
|
||||
// ...finally add contents of pig cell after the -e flag
|
||||
logger.info("Run pig command '" + cmd + "'");
|
||||
long start = System.currentTimeMillis();
|
||||
cmdLine.addArgument("-e", false);
|
||||
cmdLine.addArgument(cmd, false);
|
||||
|
||||
// execute command and return success/failure based on its exit value
|
||||
executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream));
|
||||
|
||||
int commandTimeOut = Integer.parseInt(getProperty(PIG_TIMEOUT_MS));
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
try {
|
||||
int exitValue = executor.execute(cmdLine);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage() + NEWLINE + outputStream.toString());
|
||||
} catch (IOException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage() + NEWLINE + outputStream.toString());
|
||||
StringBuilder outputBuilder = new StringBuilder();
|
||||
PigStats stats = PigStats.get();
|
||||
if (stats != null && includeJobStats) {
|
||||
String jobStats = PigUtils.extactJobStats(stats);
|
||||
if (jobStats != null) {
|
||||
outputBuilder.append(jobStats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
if (executor != null) {
|
||||
executor.getWatchdog().destroyProcess();
|
||||
if (!outputBuilder.toString().isEmpty() || !bytesOutput.toString().isEmpty()) {
|
||||
outputBuilder.append("------------- Pig Output --------------\n");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
outputBuilder.append(bytesOutput.toString());
|
||||
return new InterpreterResult(Code.SUCCESS, outputBuilder.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
PigInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
public PigServer getPigServer() {
|
||||
return pigServer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.pig.PigServer;
|
||||
import org.apache.pig.data.Tuple;
|
||||
import org.apache.pig.impl.logicalLayer.FrontendException;
|
||||
import org.apache.pig.impl.logicalLayer.schema.Schema;
|
||||
import org.apache.pig.tools.pigstats.PigStats;
|
||||
import org.apache.pig.tools.pigstats.ScriptState;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PigQueryInterpreter extends BasePigInterpreter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(PigQueryInterpreter.class);
|
||||
private PigServer pigServer;
|
||||
private int maxResult;
|
||||
|
||||
public PigQueryInterpreter(Properties properties) {
|
||||
super(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
pigServer = getPigInterpreter().getPigServer();
|
||||
maxResult = Integer.parseInt(getProperty("zeppelin.pig.maxResult"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String alias = "paragraph_" + context.getParagraphId().replace("-", "_");
|
||||
String[] lines = st.split("\n");
|
||||
List<String> queries = new ArrayList<String>();
|
||||
for (int i = 0; i < lines.length; ++i) {
|
||||
if (i == lines.length - 1) {
|
||||
lines[i] = alias + " = " + lines[i];
|
||||
}
|
||||
queries.add(lines[i]);
|
||||
}
|
||||
|
||||
StringBuilder resultBuilder = new StringBuilder("%table ");
|
||||
try {
|
||||
File tmpScriptFile = PigUtils.createTempPigScript(queries);
|
||||
// each thread should its own ScriptState & PigStats
|
||||
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
|
||||
// reset PigStats, otherwise you may get the PigStats of last job in the same thread
|
||||
// because PigStats is ThreadLocal variable
|
||||
PigStats.start(pigServer.getPigContext().getExecutionEngine().instantiatePigStats());
|
||||
PigScriptListener scriptListener = new PigScriptListener();
|
||||
ScriptState.get().registerListener(scriptListener);
|
||||
listenerMap.put(context.getParagraphId(), scriptListener);
|
||||
pigServer.registerScript(tmpScriptFile.getAbsolutePath());
|
||||
Schema schema = pigServer.dumpSchema(alias);
|
||||
boolean schemaKnown = (schema != null);
|
||||
if (schemaKnown) {
|
||||
for (int i = 0; i < schema.size(); ++i) {
|
||||
Schema.FieldSchema field = schema.getField(i);
|
||||
resultBuilder.append(field.alias);
|
||||
if (i != schema.size() - 1) {
|
||||
resultBuilder.append("\t");
|
||||
}
|
||||
}
|
||||
resultBuilder.append("\n");
|
||||
}
|
||||
Iterator<Tuple> iter = pigServer.openIterator(alias);
|
||||
boolean firstRow = true;
|
||||
int index = 0;
|
||||
while (iter.hasNext() && index <= maxResult) {
|
||||
index++;
|
||||
Tuple tuple = iter.next();
|
||||
if (firstRow && !schemaKnown) {
|
||||
for (int i = 0; i < tuple.size(); ++i) {
|
||||
resultBuilder.append("c_" + i + "\t");
|
||||
}
|
||||
resultBuilder.append("\n");
|
||||
firstRow = false;
|
||||
}
|
||||
resultBuilder.append(StringUtils.join(tuple, "\t"));
|
||||
resultBuilder.append("\n");
|
||||
}
|
||||
if (index >= maxResult && iter.hasNext()) {
|
||||
resultBuilder.append("\n<font color=red>Results are limited by " + maxResult + ".</font>");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Extract error in the following order
|
||||
// 1. catch FrontendException, FrontendException happens in the query compilation phase.
|
||||
// 2. PigStats, This is execution error
|
||||
// 3. Other errors.
|
||||
if (e instanceof FrontendException) {
|
||||
FrontendException fe = (FrontendException) e;
|
||||
if (!fe.getMessage().contains("Backend error :")) {
|
||||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
PigStats stats = PigStats.get();
|
||||
if (stats != null) {
|
||||
String errorMsg = PigUtils.extactJobStats(stats);
|
||||
if (errorMsg != null) {
|
||||
return new InterpreterResult(Code.ERROR, errorMsg);
|
||||
}
|
||||
}
|
||||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
} finally {
|
||||
listenerMap.remove(context.getParagraphId());
|
||||
}
|
||||
return new InterpreterResult(Code.SUCCESS, resultBuilder.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PigServer getPigServer() {
|
||||
return this.pigServer;
|
||||
}
|
||||
|
||||
private PigInterpreter getPigInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
PigInterpreter pig = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
pig = (PigInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return pig;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import org.apache.pig.impl.plan.OperatorPlan;
|
||||
import org.apache.pig.tools.pigstats.JobStats;
|
||||
import org.apache.pig.tools.pigstats.OutputStats;
|
||||
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PigScriptListener implements PigProgressNotificationListener {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(PigScriptListener.class);
|
||||
|
||||
private Set<String> jobIds = new HashSet();
|
||||
private int progress;
|
||||
|
||||
@Override
|
||||
public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void launchStartedNotification(String scriptId, int numJobsToLaunch) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void jobStartedNotification(String scriptId, String assignedJobId) {
|
||||
this.jobIds.add(assignedJobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void jobFinishedNotification(String scriptId, JobStats jobStats) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void jobFailedNotification(String scriptId, JobStats jobStats) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void outputCompletedNotification(String scriptId, OutputStats outputStats) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void progressUpdatedNotification(String scriptId, int progress) {
|
||||
LOGGER.debug("scriptId:" + scriptId + ", progress:" + progress);
|
||||
this.progress = progress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
|
||||
|
||||
}
|
||||
|
||||
public Set<String> getJobIds() {
|
||||
return jobIds;
|
||||
}
|
||||
|
||||
public int getProgress() {
|
||||
return progress;
|
||||
}
|
||||
}
|
||||
290
pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
Normal file
290
pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java
Normal file
|
|
@ -0,0 +1,290 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.pig.PigRunner;
|
||||
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
|
||||
import org.apache.pig.tools.pigstats.InputStats;
|
||||
import org.apache.pig.tools.pigstats.JobStats;
|
||||
import org.apache.pig.tools.pigstats.OutputStats;
|
||||
import org.apache.pig.tools.pigstats.PigStats;
|
||||
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
|
||||
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
|
||||
import org.apache.pig.tools.pigstats.tez.TezDAGStats;
|
||||
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PigUtils {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(PigUtils.class);
|
||||
|
||||
protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
public static File createTempPigScript(String content) throws IOException {
|
||||
File tmpFile = File.createTempFile("zeppelin", "pig");
|
||||
LOGGER.debug("Create pig script file:" + tmpFile.getAbsolutePath());
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
return tmpFile.getAbsoluteFile();
|
||||
}
|
||||
|
||||
public static File createTempPigScript(List<String> lines) throws IOException {
|
||||
return createTempPigScript(StringUtils.join(lines, "\n"));
|
||||
}
|
||||
|
||||
public static String extactJobStats(PigStats stats) {
|
||||
if (stats instanceof SimplePigStats) {
|
||||
return extractFromSimplePigStats((SimplePigStats) stats);
|
||||
} else if (stats instanceof TezPigScriptStats) {
|
||||
return extractFromTezPigStats((TezPigScriptStats) stats);
|
||||
} else {
|
||||
throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
public static String extractFromSimplePigStats(SimplePigStats stats) {
|
||||
|
||||
try {
|
||||
Field userIdField = PigStats.class.getDeclaredField("userId");
|
||||
userIdField.setAccessible(true);
|
||||
String userId = (String) (userIdField.get(stats));
|
||||
Field startTimeField = PigStats.class.getDeclaredField("startTime");
|
||||
startTimeField.setAccessible(true);
|
||||
long startTime = (Long) (startTimeField.get(stats));
|
||||
Field endTimeField = PigStats.class.getDeclaredField("endTime");
|
||||
endTimeField.setAccessible(true);
|
||||
long endTime = (Long) (endTimeField.get(stats));
|
||||
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
|
||||
LOGGER.warn("unknown return code, can't display the results");
|
||||
return null;
|
||||
}
|
||||
if (stats.getPigContext() == null) {
|
||||
LOGGER.warn("unknown exec type, don't display the results");
|
||||
return null;
|
||||
}
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
|
||||
sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
|
||||
.append(userId).append("\t")
|
||||
.append(sdf.format(new Date(startTime))).append("\t")
|
||||
.append(sdf.format(new Date(endTime))).append("\t")
|
||||
.append(stats.getFeatures()).append("\n");
|
||||
sb.append("\n");
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
|
||||
sb.append("Success!\n");
|
||||
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Some jobs have failed! Stop running all dependent jobs\n");
|
||||
} else {
|
||||
sb.append("Failed!\n");
|
||||
}
|
||||
sb.append("\n");
|
||||
|
||||
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
|
||||
jobPlanField.setAccessible(true);
|
||||
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
|
||||
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
|
||||
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Job Stats (time in seconds):\n");
|
||||
sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
|
||||
List<JobStats> arr = jobPlan.getSuccessfulJobs();
|
||||
for (JobStats js : arr) {
|
||||
sb.append(js.getDisplayString());
|
||||
}
|
||||
sb.append("\n");
|
||||
}
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|
||||
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Failed Jobs:\n");
|
||||
sb.append(MRJobStats.FAILURE_HEADER).append("\n");
|
||||
List<JobStats> arr = jobPlan.getFailedJobs();
|
||||
for (JobStats js : arr) {
|
||||
sb.append(js.getDisplayString());
|
||||
}
|
||||
sb.append("\n");
|
||||
}
|
||||
sb.append("Input(s):\n");
|
||||
for (InputStats is : stats.getInputStats()) {
|
||||
sb.append(is.getDisplayString());
|
||||
}
|
||||
sb.append("\n");
|
||||
sb.append("Output(s):\n");
|
||||
for (OutputStats ds : stats.getOutputStats()) {
|
||||
sb.append(ds.getDisplayString());
|
||||
}
|
||||
|
||||
sb.append("\nCounters:\n");
|
||||
sb.append("Total records written : " + stats.getRecordWritten()).append("\n");
|
||||
sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n");
|
||||
sb.append("Spillable Memory Manager spill count : "
|
||||
+ stats.getSMMSpillCount()).append("\n");
|
||||
sb.append("Total bags proactively spilled: "
|
||||
+ stats.getProactiveSpillCountObjects()).append("\n");
|
||||
sb.append("Total records proactively spilled: "
|
||||
+ stats.getProactiveSpillCountRecords()).append("\n");
|
||||
sb.append("\nJob DAG:\n").append(jobPlan.toString());
|
||||
|
||||
return "Script Statistics: \n" + sb.toString();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Can not extract message from SimplePigStats", e);
|
||||
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String extractFromTezPigStats(TezPigScriptStats stats) {
|
||||
|
||||
try {
|
||||
Field userIdField = PigStats.class.getDeclaredField("userId");
|
||||
userIdField.setAccessible(true);
|
||||
String userId = (String) (userIdField.get(stats));
|
||||
Field startTimeField = PigStats.class.getDeclaredField("startTime");
|
||||
startTimeField.setAccessible(true);
|
||||
long startTime = (Long) (startTimeField.get(stats));
|
||||
Field endTimeField = PigStats.class.getDeclaredField("endTime");
|
||||
endTimeField.setAccessible(true);
|
||||
long endTime = (Long) (endTimeField.get(stats));
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("\n");
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures()));
|
||||
sb.append("\n");
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
|
||||
sb.append("Success!\n");
|
||||
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Some tasks have failed! Stop running all dependent tasks\n");
|
||||
} else {
|
||||
sb.append("Failed!\n");
|
||||
}
|
||||
sb.append("\n");
|
||||
|
||||
// Print diagnostic info in case of failure
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|
||||
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
if (stats.getErrorMessage() != null) {
|
||||
String[] lines = stats.getErrorMessage().split("\n");
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
String s = lines[i].trim();
|
||||
if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
|
||||
}
|
||||
}
|
||||
sb.append("\n");
|
||||
}
|
||||
}
|
||||
|
||||
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
|
||||
tezDAGStatsMapField.setAccessible(true);
|
||||
Map<String, TezDAGStats> tezDAGStatsMap =
|
||||
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
|
||||
int count = 0;
|
||||
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
|
||||
sb.append("\n");
|
||||
sb.append("DAG " + count++ + ":\n");
|
||||
sb.append(dagStats.getDisplayString());
|
||||
sb.append("\n");
|
||||
}
|
||||
|
||||
sb.append("Input(s):\n");
|
||||
for (InputStats is : stats.getInputStats()) {
|
||||
sb.append(is.getDisplayString().trim()).append("\n");
|
||||
}
|
||||
sb.append("\n");
|
||||
sb.append("Output(s):\n");
|
||||
for (OutputStats os : stats.getOutputStats()) {
|
||||
sb.append(os.getDisplayString().trim()).append("\n");
|
||||
}
|
||||
return "Script Statistics:\n" + sb.toString();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Can not extract message from SimplePigStats", e);
|
||||
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> extractJobIds(PigStats stat) {
|
||||
if (stat instanceof SimplePigStats) {
|
||||
return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
|
||||
} else if (stat instanceof TezPigScriptStats) {
|
||||
return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
|
||||
} else {
|
||||
throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats stat) {
|
||||
List<String> jobIds = new ArrayList<>();
|
||||
try {
|
||||
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
|
||||
jobPlanField.setAccessible(true);
|
||||
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
|
||||
List<JobStats> arr = jobPlan.getJobList();
|
||||
for (JobStats js : arr) {
|
||||
jobIds.add(js.getJobId());
|
||||
}
|
||||
return jobIds;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats stat) {
|
||||
List<String> jobIds = new ArrayList<>();
|
||||
try {
|
||||
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
|
||||
tezDAGStatsMapField.setAccessible(true);
|
||||
Map<String, TezDAGStats> tezDAGStatsMap =
|
||||
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
|
||||
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
|
||||
LOGGER.info("Tez JobId:" + dagStats.getJobId());
|
||||
jobIds.add(dagStats.getJobId());
|
||||
}
|
||||
return jobIds;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
46
pig/src/main/resources/interpreter-setting.json
Normal file
46
pig/src/main/resources/interpreter-setting.json
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
[
|
||||
{
|
||||
"group": "pig",
|
||||
"name": "script",
|
||||
"className": "org.apache.zeppelin.pig.PigInterpreter",
|
||||
"properties": {
|
||||
"zeppelin.pig.execType": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.pig.execType",
|
||||
"defaultValue": "mapreduce",
|
||||
"description": "local | mapreduce | tez"
|
||||
},
|
||||
"zeppelin.pig.includeJobStats": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.pig.includeJobStats",
|
||||
"defaultValue": "false",
|
||||
"description": "flag to include job stats in output"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
"language": "pig"
|
||||
}
|
||||
},
|
||||
{
|
||||
"group": "pig",
|
||||
"name": "query",
|
||||
"className": "org.apache.zeppelin.pig.PigQueryInterpreter",
|
||||
"properties": {
|
||||
"zeppelin.pig.execType": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.pig.execType",
|
||||
"defaultValue": "mapreduce",
|
||||
"description": "local | mapreduce | tez"
|
||||
},
|
||||
"zeppelin.pig.maxResult": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.pig.maxResult",
|
||||
"defaultValue": "20",
|
||||
"description": "max row number for %pig.query"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
"language": "pig"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
@ -6,120 +6,150 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import static org.apache.zeppelin.pig.PigInterpreter.*;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class PigInterpreterTest {
|
||||
|
||||
private static PigInterpreter pig;
|
||||
private static InterpreterContext context;
|
||||
private static final String PASSWD_FILE = "/tmp/tmp_zeppelin_dummypasswd";
|
||||
private static final String USERS_FILE = "/tmp/tmp_zeppelin_dummyusers";
|
||||
private PigInterpreter pigInterpreter;
|
||||
private InterpreterContext context;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
@Before
|
||||
public void setUp() {
|
||||
Properties properties = new Properties();
|
||||
properties.put(PIG_START_EXE, DEFAULT_START_EXE);
|
||||
properties.put(PIG_START_ARGS, DEFAULT_START_ARGS);
|
||||
properties.put(PIG_TIMEOUT_MS, DEFAULT_TIMEOUT_MS);
|
||||
|
||||
pig = new PigInterpreter(properties);
|
||||
pig.open();
|
||||
|
||||
context = new InterpreterContext(null, null, null, null, null, null, null, null);
|
||||
properties.put("zeppelin.pig.execType", "local");
|
||||
pigInterpreter = new PigInterpreter(properties);
|
||||
pigInterpreter.open();
|
||||
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
|
||||
null, null);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
pig.close();
|
||||
pig.destroy();
|
||||
try {
|
||||
org.apache.commons.io.FileUtils.forceDelete(new File(PASSWD_FILE));
|
||||
org.apache.commons.io.FileUtils.forceDelete(new File(USERS_FILE));
|
||||
} catch (IOException e) {
|
||||
StringWriter sw = new StringWriter();
|
||||
e.printStackTrace(new PrintWriter(sw));
|
||||
fail("Unable to cleanup temp pig files:\n" + sw.toString());
|
||||
}
|
||||
@After
|
||||
public void tearDown() {
|
||||
pigInterpreter.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract users from a dummy passwd file
|
||||
* https://pig.apache.org/docs/r0.10.0/start.html#run
|
||||
*/
|
||||
@Test
|
||||
public void testExtractUsers() {
|
||||
PrintWriter out = null;
|
||||
StringWriter sw = new StringWriter();
|
||||
public void testBasics() throws IOException {
|
||||
String content = "1\tandy\n"
|
||||
+ "2\tpeter\n";
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
|
||||
try {
|
||||
out = new PrintWriter(new File(PASSWD_FILE));
|
||||
out.println("user1:pass1");
|
||||
out.println("user2:pass2");
|
||||
out.println("user3:pass3");
|
||||
out.println("user4:pass4");
|
||||
out.flush();
|
||||
} catch (FileNotFoundException e) {
|
||||
e.printStackTrace(new PrintWriter(sw));
|
||||
fail("Unable to write to "+PASSWD_FILE+":\n" + sw.toString());
|
||||
} finally {
|
||||
if (out != null){
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
// simple pig script using dump
|
||||
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "dump a;";
|
||||
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
|
||||
|
||||
try {
|
||||
FileUtils.deleteDirectory(new File(USERS_FILE));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace(new PrintWriter(sw));
|
||||
fail("Unable to delete: "+USERS_FILE+". Error: " + sw.toString());
|
||||
}
|
||||
String pigScript = "A = load 'file://"+PASSWD_FILE+"' using PigStorage(':');";
|
||||
pigScript += "B = foreach A generate $0 as id;";
|
||||
pigScript += "store B into 'file://"+USERS_FILE+"';";
|
||||
// describe
|
||||
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
|
||||
|
||||
InterpreterResult result = pig.interpret(pigScript, context);
|
||||
|
||||
String readusers = null;
|
||||
try {
|
||||
readusers = new String(Files.readAllBytes(Paths.get(USERS_FILE+"/part-m-00000")));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace(new PrintWriter(sw));
|
||||
fail("Unable read output of pig job from: "+USERS_FILE+"/part-m-00000. Error:\n" + sw.toString());
|
||||
}
|
||||
assertEquals(readusers, "user1\nuser2\nuser3\nuser4\n");
|
||||
// syntax error (compilation error)
|
||||
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.ERROR, result.code());
|
||||
assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
|
||||
|
||||
// execution error
|
||||
pigscript = "a = load 'invalid_path';"
|
||||
+ "dump a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.ERROR, result.code());
|
||||
assertTrue(result.message().contains("Input path does not exist"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testIncludeJobStats() throws IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.put("zeppelin.pig.execType", "local");
|
||||
properties.put("zeppelin.pig.includeJobStats", "true");
|
||||
pigInterpreter = new PigInterpreter(properties);
|
||||
pigInterpreter.open();
|
||||
|
||||
String content = "1\tandy\n"
|
||||
+ "2\tpeter\n";
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
|
||||
// simple pig script using dump
|
||||
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "dump a;";
|
||||
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().contains("Counters:"));
|
||||
assertTrue(result.message().contains("(1,andy)\n(2,peter)"));
|
||||
|
||||
// describe
|
||||
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.SUCCESS, result.code());
|
||||
// no job is launched, so no jobStats
|
||||
assertTrue(!result.message().contains("Counters:"));
|
||||
assertTrue(result.message().contains("a: {id: int,name: bytearray}"));
|
||||
|
||||
// syntax error (compilation error)
|
||||
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.ERROR, result.code());
|
||||
// no job is launched, so no jobStats
|
||||
assertTrue(!result.message().contains("Counters:"));
|
||||
assertTrue(result.message().contains("Syntax error, unexpected symbol at or near 'a'"));
|
||||
|
||||
// execution error
|
||||
pigscript = "a = load 'invalid_path';"
|
||||
+ "dump a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(Type.TEXT, result.type());
|
||||
assertEquals(Code.ERROR, result.code());
|
||||
assertTrue(result.message().contains("Counters:"));
|
||||
assertTrue(result.message().contains("Input path does not exist"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,153 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PigQueryInterpreterTest {
|
||||
|
||||
private PigInterpreter pigInterpreter;
|
||||
private PigQueryInterpreter pigQueryInterpreter;
|
||||
private InterpreterContext context;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Properties properties = new Properties();
|
||||
properties.put("zeppelin.pig.execType", "local");
|
||||
properties.put("zeppelin.pig.maxResult", "20");
|
||||
|
||||
pigInterpreter = new PigInterpreter(properties);
|
||||
pigQueryInterpreter = new PigQueryInterpreter(properties);
|
||||
List<Interpreter> interpreters = new ArrayList();
|
||||
interpreters.add(pigInterpreter);
|
||||
interpreters.add(pigQueryInterpreter);
|
||||
InterpreterGroup group = new InterpreterGroup();
|
||||
group.put("note_id", interpreters);
|
||||
pigInterpreter.setInterpreterGroup(group);
|
||||
pigQueryInterpreter.setInterpreterGroup(group);
|
||||
pigInterpreter.open();
|
||||
pigQueryInterpreter.open();
|
||||
|
||||
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
|
||||
null, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
pigInterpreter.close();
|
||||
pigQueryInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasics() throws IOException {
|
||||
String content = "andy\tmale\t10\n"
|
||||
+ "peter\tmale\t20\n"
|
||||
+ "amy\tfemale\t14\n";
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
|
||||
// run script in PigInterpreter
|
||||
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (name, gender, age);\n"
|
||||
+ "a2 = load 'invalid_path' as (name, gender, age);\n"
|
||||
+ "dump a;";
|
||||
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
|
||||
|
||||
// run single line query in PigQueryInterpreter
|
||||
String query = "foreach a generate name, age;";
|
||||
result = pigQueryInterpreter.interpret(query, context);
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message());
|
||||
|
||||
// run multiple line query in PigQueryInterpreter
|
||||
query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;";
|
||||
result = pigQueryInterpreter.interpret(query, context);
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message());
|
||||
|
||||
// syntax error in PigQueryInterpereter
|
||||
query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;";
|
||||
result = pigQueryInterpreter.interpret(query, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema"));
|
||||
|
||||
// execution error in PigQueryInterpreter
|
||||
query = "foreach a2 generate name, age;";
|
||||
result = pigQueryInterpreter.interpret(query, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertTrue(result.message().contains("Input path does not exist"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxResult() throws IOException {
|
||||
StringBuilder content = new StringBuilder();
|
||||
for (int i=0;i<30;++i) {
|
||||
content.append(i + "\tname_" + i + "\n");
|
||||
}
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
|
||||
// run script in PigInterpreter
|
||||
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);";
|
||||
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
// empty output
|
||||
assertTrue(result.message().isEmpty());
|
||||
|
||||
// run single line query in PigQueryInterpreter
|
||||
String query = "foreach a generate id;";
|
||||
result = pigQueryInterpreter.interpret(query, context);
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().contains("id\n0\n1\n2"));
|
||||
assertTrue(result.message().contains("Results are limited by 20"));
|
||||
}
|
||||
}
|
||||
22
pig/src/test/resources/log4j.properties
Normal file
22
pig/src/test/resources/log4j.properties
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
log4j.rootLogger = INFO, stdout
|
||||
|
||||
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
|
||||
|
|
@ -545,7 +545,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
|
||||
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
|
||||
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
|
||||
+ "scalding,jdbc,hbase,bigquery,beam"),
|
||||
+ "scalding,jdbc,hbase,bigquery,beam,pig"),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
// use specified notebook (id) as homescreen
|
||||
|
|
|
|||
Loading…
Reference in a new issue