mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-2615. Upgrade pig to 0.17.0 to support spark engine
This commit is contained in:
parent
d3d6340bd7
commit
4b4e3dbd0d
7 changed files with 243 additions and 242 deletions
|
|
@ -34,6 +34,8 @@ which in turns enables them to handle very large data sets.
|
|||
- MapReduce
|
||||
- Tez_Local (Only Tez 0.7 is supported)
|
||||
- Tez (Only Tez 0.7 is supported)
|
||||
- Spark_Local (Only Spark 1.6.x is supported, by default it is Spark 1.6.3)
|
||||
- Spark (Only Spark 1.6.x is supported, by default it is Spark 1.6.3)
|
||||
|
||||
## How to use
|
||||
|
||||
|
|
@ -55,6 +57,20 @@ which in turns enables them to handle very large data sets.
|
|||
|
||||
HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
|
||||
|
||||
- Spark Local Mode
|
||||
|
||||
Nothing needs to be done for spark local mode
|
||||
|
||||
- Spark Mode
|
||||
|
||||
For now, only yarn-client mode is supported. To enable it, you need to set property SPARK_MASTER to yarn-client
|
||||
and set SPARK_JAR to the spark assembly file uploaded to hdfs.
|
||||
|
||||
### How to choose custom Spark Version
|
||||
|
||||
By default, Pig Interpreter would use Spark 1.6.3, if you want to use another Spark Version,
|
||||
you need to rebuild the zeppelin by specifying the custom spark version via -Dpig.spark.version=<custom_spark_version> in the maven build command.
|
||||
|
||||
### How to configure interpreter
|
||||
|
||||
At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default.
|
||||
|
|
@ -71,7 +87,7 @@ So you can use that to find app running in YARN RM UI.
|
|||
<tr>
|
||||
<td>zeppelin.pig.execType</td>
|
||||
<td>mapreduce</td>
|
||||
<td>Execution mode for pig runtime. local | mapreduce | tez_local | tez </td>
|
||||
<td>Execution mode for pig runtime. local | mapreduce | tez_local | tez | spark_local | spark </td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>zeppelin.pig.includeJobStats</td>
|
||||
|
|
@ -93,6 +109,16 @@ So you can use that to find app running in YARN RM UI.
|
|||
<td>default</td>
|
||||
<td>queue name for mapreduce engine</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>SPARK_MASTER</td>
|
||||
<td>local</td>
|
||||
<td>local | yarn-client</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>SPARK_JAR</td>
|
||||
<td></td>
|
||||
<td>The spark assembly jar you uploaded to hdfs</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
### Example
|
||||
|
|
|
|||
47
pig/pom.xml
47
pig/pom.xml
|
|
@ -36,9 +36,11 @@
|
|||
<url>http://zeppelin.apache.org</url>
|
||||
|
||||
<properties>
|
||||
<pig.version>0.16.0</pig.version>
|
||||
<pig.version>0.17.0</pig.version>
|
||||
<hadoop.version>2.6.0</hadoop.version>
|
||||
<tez.version>0.7.0</tez.version>
|
||||
<pig.spark.version>1.6.3</pig.spark.version>
|
||||
<pig.scala.version>2.11</pig.scala.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -68,10 +70,28 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.pig</groupId>
|
||||
<artifactId>pig</artifactId>
|
||||
<classifier>h2</classifier>
|
||||
<version>${pig.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>servlet-api-2.5</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.python</groupId>
|
||||
<artifactId>jython-standalone</artifactId>
|
||||
<version>2.7.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
|
|
@ -82,6 +102,12 @@
|
|||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-api</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
@ -94,6 +120,12 @@
|
|||
<groupId>org.apache.tez</groupId>
|
||||
<artifactId>tez-dag</artifactId>
|
||||
<version>${tez.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
@ -120,6 +152,17 @@
|
|||
<version>${tez.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_${pig.scala.version}</artifactId>
|
||||
<version>${pig.spark.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-yarn_${pig.scala.version}</artifactId>
|
||||
<version>${pig.spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.pig.PigServer;
|
||||
import org.apache.pig.impl.logicalLayer.FrontendException;
|
||||
import org.apache.pig.tools.pigscript.parser.ParseException;
|
||||
import org.apache.pig.tools.pigstats.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
|
@ -107,9 +108,12 @@ public class PigInterpreter extends BasePigInterpreter {
|
|||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
if (e.getCause() instanceof ParseException) {
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
PigStats stats = PigStats.get();
|
||||
if (stats != null) {
|
||||
String errorMsg = PigUtils.extactJobStats(stats);
|
||||
String errorMsg = stats.getDisplayString();
|
||||
if (errorMsg != null) {
|
||||
LOGGER.error("Fail to run pig script, " + errorMsg);
|
||||
return new InterpreterResult(Code.ERROR, errorMsg);
|
||||
|
|
@ -127,7 +131,7 @@ public class PigInterpreter extends BasePigInterpreter {
|
|||
StringBuilder outputBuilder = new StringBuilder();
|
||||
PigStats stats = PigStats.get();
|
||||
if (stats != null && includeJobStats) {
|
||||
String jobStats = PigUtils.extactJobStats(stats);
|
||||
String jobStats = stats.getDisplayString();
|
||||
if (jobStats != null) {
|
||||
outputBuilder.append(jobStats);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import org.apache.pig.PigServer;
|
|||
import org.apache.pig.data.Tuple;
|
||||
import org.apache.pig.impl.logicalLayer.FrontendException;
|
||||
import org.apache.pig.impl.logicalLayer.schema.Schema;
|
||||
import org.apache.pig.tools.pigscript.parser.ParseException;
|
||||
import org.apache.pig.tools.pigstats.PigStats;
|
||||
import org.apache.pig.tools.pigstats.ScriptState;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
|
|
@ -134,9 +135,12 @@ public class PigQueryInterpreter extends BasePigInterpreter {
|
|||
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
if (e.getCause() instanceof ParseException) {
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
PigStats stats = PigStats.get();
|
||||
if (stats != null) {
|
||||
String errorMsg = PigUtils.extactJobStats(stats);
|
||||
String errorMsg = stats.getDisplayString();
|
||||
if (errorMsg != null) {
|
||||
return new InterpreterResult(Code.ERROR, errorMsg);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.apache.pig.PigRunner;
|
||||
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
|
||||
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
|
||||
import org.apache.pig.tools.pigstats.InputStats;
|
||||
import org.apache.pig.tools.pigstats.JobStats;
|
||||
|
|
@ -29,6 +30,9 @@ import org.apache.pig.tools.pigstats.OutputStats;
|
|||
import org.apache.pig.tools.pigstats.PigStats;
|
||||
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
|
||||
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
|
||||
import org.apache.pig.tools.pigstats.spark.SparkJobStats;
|
||||
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
|
||||
import org.apache.pig.tools.pigstats.spark.SparkScriptState;
|
||||
import org.apache.pig.tools.pigstats.tez.TezDAGStats;
|
||||
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -39,10 +43,7 @@ import java.io.FileWriter;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -66,236 +67,4 @@ public class PigUtils {
|
|||
return createTempPigScript(StringUtils.join(lines, "\n"));
|
||||
}
|
||||
|
||||
public static String extactJobStats(PigStats stats) {
|
||||
if (stats instanceof SimplePigStats) {
|
||||
return extractFromSimplePigStats((SimplePigStats) stats);
|
||||
} else if (stats instanceof TezPigScriptStats) {
|
||||
return extractFromTezPigStats((TezPigScriptStats) stats);
|
||||
} else {
|
||||
throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
public static String extractFromSimplePigStats(SimplePigStats stats) {
|
||||
|
||||
try {
|
||||
Field userIdField = PigStats.class.getDeclaredField("userId");
|
||||
userIdField.setAccessible(true);
|
||||
String userId = (String) (userIdField.get(stats));
|
||||
Field startTimeField = PigStats.class.getDeclaredField("startTime");
|
||||
startTimeField.setAccessible(true);
|
||||
long startTime = (Long) (startTimeField.get(stats));
|
||||
Field endTimeField = PigStats.class.getDeclaredField("endTime");
|
||||
endTimeField.setAccessible(true);
|
||||
long endTime = (Long) (endTimeField.get(stats));
|
||||
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
|
||||
LOGGER.warn("unknown return code, can't display the results");
|
||||
return null;
|
||||
}
|
||||
if (stats.getPigContext() == null) {
|
||||
LOGGER.warn("unknown exec type, don't display the results");
|
||||
return null;
|
||||
}
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
|
||||
sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
|
||||
.append(userId).append("\t")
|
||||
.append(sdf.format(new Date(startTime))).append("\t")
|
||||
.append(sdf.format(new Date(endTime))).append("\t")
|
||||
.append(stats.getFeatures()).append("\n");
|
||||
sb.append("\n");
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
|
||||
sb.append("Success!\n");
|
||||
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Some jobs have failed! Stop running all dependent jobs\n");
|
||||
} else {
|
||||
sb.append("Failed!\n");
|
||||
}
|
||||
sb.append("\n");
|
||||
|
||||
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
|
||||
jobPlanField.setAccessible(true);
|
||||
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
|
||||
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
|
||||
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Job Stats (time in seconds):\n");
|
||||
sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
|
||||
List<JobStats> arr = jobPlan.getSuccessfulJobs();
|
||||
for (JobStats js : arr) {
|
||||
sb.append(js.getDisplayString());
|
||||
}
|
||||
sb.append("\n");
|
||||
}
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|
||||
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Failed Jobs:\n");
|
||||
sb.append(MRJobStats.FAILURE_HEADER).append("\n");
|
||||
List<JobStats> arr = jobPlan.getFailedJobs();
|
||||
for (JobStats js : arr) {
|
||||
sb.append(js.getDisplayString());
|
||||
}
|
||||
sb.append("\n");
|
||||
}
|
||||
sb.append("Input(s):\n");
|
||||
for (InputStats is : stats.getInputStats()) {
|
||||
sb.append(is.getDisplayString());
|
||||
}
|
||||
sb.append("\n");
|
||||
sb.append("Output(s):\n");
|
||||
for (OutputStats ds : stats.getOutputStats()) {
|
||||
sb.append(ds.getDisplayString());
|
||||
}
|
||||
|
||||
sb.append("\nCounters:\n");
|
||||
sb.append("Total records written : " + stats.getRecordWritten()).append("\n");
|
||||
sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n");
|
||||
sb.append("Spillable Memory Manager spill count : "
|
||||
+ stats.getSMMSpillCount()).append("\n");
|
||||
sb.append("Total bags proactively spilled: "
|
||||
+ stats.getProactiveSpillCountObjects()).append("\n");
|
||||
sb.append("Total records proactively spilled: "
|
||||
+ stats.getProactiveSpillCountRecords()).append("\n");
|
||||
sb.append("\nJob DAG:\n").append(jobPlan.toString());
|
||||
|
||||
return "Script Statistics: \n" + sb.toString();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Can not extract message from SimplePigStats", e);
|
||||
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String extractFromTezPigStats(TezPigScriptStats stats) {
|
||||
|
||||
try {
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
|
||||
LOGGER.warn("unknown return code, can't display the results");
|
||||
return null;
|
||||
}
|
||||
if (stats.getPigContext() == null) {
|
||||
LOGGER.warn("unknown exec type, don't display the results");
|
||||
return null;
|
||||
}
|
||||
|
||||
Field userIdField = PigStats.class.getDeclaredField("userId");
|
||||
userIdField.setAccessible(true);
|
||||
String userId = (String) (userIdField.get(stats));
|
||||
Field startTimeField = PigStats.class.getDeclaredField("startTime");
|
||||
startTimeField.setAccessible(true);
|
||||
long startTime = (Long) (startTimeField.get(stats));
|
||||
Field endTimeField = PigStats.class.getDeclaredField("endTime");
|
||||
endTimeField.setAccessible(true);
|
||||
long endTime = (Long) (endTimeField.get(stats));
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("\n");
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName()));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures()));
|
||||
sb.append("\n");
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
|
||||
sb.append("Success!\n");
|
||||
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
sb.append("Some tasks have failed! Stop running all dependent tasks\n");
|
||||
} else {
|
||||
sb.append("Failed!\n");
|
||||
}
|
||||
sb.append("\n");
|
||||
|
||||
// Print diagnostic info in case of failure
|
||||
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|
||||
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
|
||||
if (stats.getErrorMessage() != null) {
|
||||
String[] lines = stats.getErrorMessage().split("\n");
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
String s = lines[i].trim();
|
||||
if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
|
||||
sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
|
||||
}
|
||||
}
|
||||
sb.append("\n");
|
||||
}
|
||||
}
|
||||
|
||||
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
|
||||
tezDAGStatsMapField.setAccessible(true);
|
||||
Map<String, TezDAGStats> tezDAGStatsMap =
|
||||
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
|
||||
int count = 0;
|
||||
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
|
||||
sb.append("\n");
|
||||
sb.append("DAG " + count++ + ":\n");
|
||||
sb.append(dagStats.getDisplayString());
|
||||
sb.append("\n");
|
||||
}
|
||||
|
||||
sb.append("Input(s):\n");
|
||||
for (InputStats is : stats.getInputStats()) {
|
||||
sb.append(is.getDisplayString().trim()).append("\n");
|
||||
}
|
||||
sb.append("\n");
|
||||
sb.append("Output(s):\n");
|
||||
for (OutputStats os : stats.getOutputStats()) {
|
||||
sb.append(os.getDisplayString().trim()).append("\n");
|
||||
}
|
||||
return "Script Statistics:\n" + sb.toString();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Can not extract message from SimplePigStats", e);
|
||||
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> extractJobIds(PigStats stat) {
|
||||
if (stat instanceof SimplePigStats) {
|
||||
return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
|
||||
} else if (stat instanceof TezPigScriptStats) {
|
||||
return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
|
||||
} else {
|
||||
throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats stat) {
|
||||
List<String> jobIds = new ArrayList<>();
|
||||
try {
|
||||
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
|
||||
jobPlanField.setAccessible(true);
|
||||
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
|
||||
List<JobStats> arr = jobPlan.getJobList();
|
||||
for (JobStats js : arr) {
|
||||
jobIds.add(js.getJobId());
|
||||
}
|
||||
return jobIds;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Can not extract jobIds from SimpelPigStats", e);
|
||||
throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats stat) {
|
||||
List<String> jobIds = new ArrayList<>();
|
||||
try {
|
||||
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
|
||||
tezDAGStatsMapField.setAccessible(true);
|
||||
Map<String, TezDAGStats> tezDAGStatsMap =
|
||||
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
|
||||
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
|
||||
LOGGER.debug("Tez JobId:" + dagStats.getJobId());
|
||||
jobIds.add(dagStats.getJobId());
|
||||
}
|
||||
return jobIds;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
|
||||
throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,12 @@
|
|||
"propertyName": "zeppelin.pig.includeJobStats",
|
||||
"defaultValue": "false",
|
||||
"description": "flag to include job stats in output"
|
||||
},
|
||||
"zeppelin.pig.execType": {
|
||||
"envName": "SPARK_MASTER",
|
||||
"propertyName": "SPARK_MASTER",
|
||||
"defaultValue": "local",
|
||||
"description": "local | yarn-client"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
||||
public class PigInterpreterSparkTest {
|
||||
private PigInterpreter pigInterpreter;
|
||||
private InterpreterContext context;
|
||||
|
||||
public void setUpSpark(boolean includeJobStats) {
|
||||
Properties properties = new Properties();
|
||||
properties.put("zeppelin.pig.execType", "spark_local");
|
||||
properties.put("zeppelin.pig.includeJobStats", includeJobStats + "");
|
||||
pigInterpreter = new PigInterpreter(properties);
|
||||
pigInterpreter.open();
|
||||
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null,
|
||||
null, null);
|
||||
|
||||
}
|
||||
@After
|
||||
public void tearDown() {
|
||||
pigInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasics() throws IOException {
|
||||
setUpSpark(false);
|
||||
|
||||
String content = "1\tandy\n"
|
||||
+ "2\tpeter\n";
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
|
||||
// simple pig script using dump
|
||||
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "dump a;";
|
||||
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
|
||||
|
||||
// describe
|
||||
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
|
||||
|
||||
// syntax error (compilation error)
|
||||
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'"));
|
||||
|
||||
// syntax error
|
||||
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "foreach a generate $0;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("expecting one of"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncludeJobStats() throws IOException {
|
||||
setUpSpark(true);
|
||||
|
||||
String content = "1\tandy\n"
|
||||
+ "2\tpeter\n";
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
FileWriter writer = new FileWriter(tmpFile);
|
||||
IOUtils.write(content, writer);
|
||||
writer.close();
|
||||
|
||||
// simple pig script using dump
|
||||
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "dump a;";
|
||||
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("Spark Job"));
|
||||
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
|
||||
|
||||
// describe
|
||||
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
// no job is launched, so no jobStats
|
||||
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
|
||||
|
||||
// syntax error (compilation error)
|
||||
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
|
||||
+ "describe a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
// no job is launched, so no jobStats
|
||||
assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'"));
|
||||
|
||||
// execution error
|
||||
pigscript = "a = load 'invalid_path';"
|
||||
+ "dump a;";
|
||||
result = pigInterpreter.interpret(pigscript, context);
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertTrue(result.message().get(0).getData().contains("Failed to read data from"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in a new issue