ZEPPELIN-2615. Upgrade pig to 0.17.0 to support spark engine

This commit is contained in:
Jeff Zhang 2017-06-22 20:30:51 +08:00
parent d3d6340bd7
commit 4b4e3dbd0d
7 changed files with 243 additions and 242 deletions

View file

@ -34,6 +34,8 @@ which in turns enables them to handle very large data sets.
- MapReduce
- Tez_Local (Only Tez 0.7 is supported)
- Tez (Only Tez 0.7 is supported)
- Spark_Local (Only Spark 1.6.x is supported, by default it is Spark 1.6.3)
- Spark (Only Spark 1.6.x is supported, by default it is Spark 1.6.3)
## How to use
@ -55,6 +57,20 @@ which in turns enables them to handle very large data sets.
HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`.
- Spark Local Mode
Nothing needs to be done for spark local mode
- Spark Mode
For now, only yarn-client mode is supported. To enable it, you need to set property SPARK_MASTER to yarn-client
and set SPARK_JAR to the spark assembly file uploaded to hdfs.
### How to choose custom Spark Version
By default, Pig Interpreter would use Spark 1.6.3, if you want to use another Spark Version,
you need to rebuild the zeppelin by specifying the custom spark version via -Dpig.spark.version=<custom_spark_version> in the maven build command.
### How to configure interpreter
At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default.
@ -71,7 +87,7 @@ So you can use that to find app running in YARN RM UI.
<tr>
<td>zeppelin.pig.execType</td>
<td>mapreduce</td>
<td>Execution mode for pig runtime. local | mapreduce | tez_local | tez </td>
<td>Execution mode for pig runtime. local | mapreduce | tez_local | tez | spark_local | spark </td>
</tr>
<tr>
<td>zeppelin.pig.includeJobStats</td>
@ -93,6 +109,16 @@ So you can use that to find app running in YARN RM UI.
<td>default</td>
<td>queue name for mapreduce engine</td>
</tr>
<tr>
<td>SPARK_MASTER</td>
<td>local</td>
<td>local | yarn-client</td>
</tr>
<tr>
<td>SPARK_JAR</td>
<td></td>
<td>The spark assembly jar you uploaded to hdfs</td>
</tr>
</table>
### Example

View file

@ -36,9 +36,11 @@
<url>http://zeppelin.apache.org</url>
<properties>
<pig.version>0.16.0</pig.version>
<pig.version>0.17.0</pig.version>
<hadoop.version>2.6.0</hadoop.version>
<tez.version>0.7.0</tez.version>
<pig.spark.version>1.6.3</pig.spark.version>
<pig.scala.version>2.11</pig.scala.version>
</properties>
<dependencies>
@ -68,10 +70,28 @@
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<classifier>h2</classifier>
<version>${pig.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api-2.5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -82,6 +102,12 @@
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -94,6 +120,12 @@
<groupId>org.apache.tez</groupId>
<artifactId>tez-dag</artifactId>
<version>${tez.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -120,6 +152,17 @@
<version>${tez.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${pig.scala.version}</artifactId>
<version>${pig.spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${pig.scala.version}</artifactId>
<version>${pig.spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View file

@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.PigServer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigstats.*;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -107,9 +108,12 @@ public class PigInterpreter extends BasePigInterpreter {
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}
if (e.getCause() instanceof ParseException) {
return new InterpreterResult(Code.ERROR, e.getMessage());
}
PigStats stats = PigStats.get();
if (stats != null) {
String errorMsg = PigUtils.extactJobStats(stats);
String errorMsg = stats.getDisplayString();
if (errorMsg != null) {
LOGGER.error("Fail to run pig script, " + errorMsg);
return new InterpreterResult(Code.ERROR, errorMsg);
@ -127,7 +131,7 @@ public class PigInterpreter extends BasePigInterpreter {
StringBuilder outputBuilder = new StringBuilder();
PigStats stats = PigStats.get();
if (stats != null && includeJobStats) {
String jobStats = PigUtils.extactJobStats(stats);
String jobStats = stats.getDisplayString();
if (jobStats != null) {
outputBuilder.append(jobStats);
}

View file

@ -25,6 +25,7 @@ import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.zeppelin.interpreter.*;
@ -134,9 +135,12 @@ public class PigQueryInterpreter extends BasePigInterpreter {
return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
}
}
if (e.getCause() instanceof ParseException) {
return new InterpreterResult(Code.ERROR, e.getMessage());
}
PigStats stats = PigStats.get();
if (stats != null) {
String errorMsg = PigUtils.extactJobStats(stats);
String errorMsg = stats.getDisplayString();
if (errorMsg != null) {
return new InterpreterResult(Code.ERROR, errorMsg);
}

View file

@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.PigRunner;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
@ -29,6 +30,9 @@ import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
import org.apache.pig.tools.pigstats.spark.SparkJobStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkScriptState;
import org.apache.pig.tools.pigstats.tez.TezDAGStats;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.slf4j.Logger;
@ -39,10 +43,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
*
@ -66,236 +67,4 @@ public class PigUtils {
return createTempPigScript(StringUtils.join(lines, "\n"));
}
public static String extactJobStats(PigStats stats) {
if (stats instanceof SimplePigStats) {
return extractFromSimplePigStats((SimplePigStats) stats);
} else if (stats instanceof TezPigScriptStats) {
return extractFromTezPigStats((TezPigScriptStats) stats);
} else {
throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName());
}
}
public static String extractFromSimplePigStats(SimplePigStats stats) {
try {
Field userIdField = PigStats.class.getDeclaredField("userId");
userIdField.setAccessible(true);
String userId = (String) (userIdField.get(stats));
Field startTimeField = PigStats.class.getDeclaredField("startTime");
startTimeField.setAccessible(true);
long startTime = (Long) (startTimeField.get(stats));
Field endTimeField = PigStats.class.getDeclaredField("endTime");
endTimeField.setAccessible(true);
long endTime = (Long) (endTimeField.get(stats));
if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
LOGGER.warn("unknown return code, can't display the results");
return null;
}
if (stats.getPigContext() == null) {
LOGGER.warn("unknown exec type, don't display the results");
return null;
}
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
StringBuilder sb = new StringBuilder();
sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t")
.append(userId).append("\t")
.append(sdf.format(new Date(startTime))).append("\t")
.append(sdf.format(new Date(endTime))).append("\t")
.append(stats.getFeatures()).append("\n");
sb.append("\n");
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
sb.append("Success!\n");
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Some jobs have failed! Stop running all dependent jobs\n");
} else {
sb.append("Failed!\n");
}
sb.append("\n");
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
jobPlanField.setAccessible(true);
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats);
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Job Stats (time in seconds):\n");
sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
List<JobStats> arr = jobPlan.getSuccessfulJobs();
for (JobStats js : arr) {
sb.append(js.getDisplayString());
}
sb.append("\n");
}
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Failed Jobs:\n");
sb.append(MRJobStats.FAILURE_HEADER).append("\n");
List<JobStats> arr = jobPlan.getFailedJobs();
for (JobStats js : arr) {
sb.append(js.getDisplayString());
}
sb.append("\n");
}
sb.append("Input(s):\n");
for (InputStats is : stats.getInputStats()) {
sb.append(is.getDisplayString());
}
sb.append("\n");
sb.append("Output(s):\n");
for (OutputStats ds : stats.getOutputStats()) {
sb.append(ds.getDisplayString());
}
sb.append("\nCounters:\n");
sb.append("Total records written : " + stats.getRecordWritten()).append("\n");
sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n");
sb.append("Spillable Memory Manager spill count : "
+ stats.getSMMSpillCount()).append("\n");
sb.append("Total bags proactively spilled: "
+ stats.getProactiveSpillCountObjects()).append("\n");
sb.append("Total records proactively spilled: "
+ stats.getProactiveSpillCountRecords()).append("\n");
sb.append("\nJob DAG:\n").append(jobPlan.toString());
return "Script Statistics: \n" + sb.toString();
} catch (Exception e) {
LOGGER.error("Can not extract message from SimplePigStats", e);
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
}
}
private static String extractFromTezPigStats(TezPigScriptStats stats) {
try {
if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) {
LOGGER.warn("unknown return code, can't display the results");
return null;
}
if (stats.getPigContext() == null) {
LOGGER.warn("unknown exec type, don't display the results");
return null;
}
Field userIdField = PigStats.class.getDeclaredField("userId");
userIdField.setAccessible(true);
String userId = (String) (userIdField.get(stats));
Field startTimeField = PigStats.class.getDeclaredField("startTime");
startTimeField.setAccessible(true);
long startTime = (Long) (startTimeField.get(stats));
Field endTimeField = PigStats.class.getDeclaredField("endTime");
endTimeField.setAccessible(true);
long endTime = (Long) (endTimeField.get(stats));
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
StringBuilder sb = new StringBuilder();
sb.append("\n");
sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion()));
sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion()));
sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion()));
sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId));
sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName()));
sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime))));
sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime))));
sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures()));
sb.append("\n");
if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) {
sb.append("Success!\n");
} else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
sb.append("Some tasks have failed! Stop running all dependent tasks\n");
} else {
sb.append("Failed!\n");
}
sb.append("\n");
// Print diagnostic info in case of failure
if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE
|| stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) {
if (stats.getErrorMessage() != null) {
String[] lines = stats.getErrorMessage().split("\n");
for (int i = 0; i < lines.length; i++) {
String s = lines[i].trim();
if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) {
sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s));
}
}
sb.append("\n");
}
}
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
tezDAGStatsMapField.setAccessible(true);
Map<String, TezDAGStats> tezDAGStatsMap =
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stats);
int count = 0;
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
sb.append("\n");
sb.append("DAG " + count++ + ":\n");
sb.append(dagStats.getDisplayString());
sb.append("\n");
}
sb.append("Input(s):\n");
for (InputStats is : stats.getInputStats()) {
sb.append(is.getDisplayString().trim()).append("\n");
}
sb.append("\n");
sb.append("Output(s):\n");
for (OutputStats os : stats.getOutputStats()) {
sb.append(os.getDisplayString().trim()).append("\n");
}
return "Script Statistics:\n" + sb.toString();
} catch (Exception e) {
LOGGER.error("Can not extract message from SimplePigStats", e);
return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e);
}
}
public static List<String> extractJobIds(PigStats stat) {
if (stat instanceof SimplePigStats) {
return extractJobIdsFromSimplePigStats((SimplePigStats) stat);
} else if (stat instanceof TezPigScriptStats) {
return extractJobIdsFromTezPigStats((TezPigScriptStats) stat);
} else {
throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName());
}
}
public static List<String> extractJobIdsFromSimplePigStats(SimplePigStats stat) {
List<String> jobIds = new ArrayList<>();
try {
Field jobPlanField = PigStats.class.getDeclaredField("jobPlan");
jobPlanField.setAccessible(true);
PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat);
List<JobStats> arr = jobPlan.getJobList();
for (JobStats js : arr) {
jobIds.add(js.getJobId());
}
return jobIds;
} catch (Exception e) {
LOGGER.error("Can not extract jobIds from SimpelPigStats", e);
throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e);
}
}
public static List<String> extractJobIdsFromTezPigStats(TezPigScriptStats stat) {
List<String> jobIds = new ArrayList<>();
try {
Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap");
tezDAGStatsMapField.setAccessible(true);
Map<String, TezDAGStats> tezDAGStatsMap =
(Map<String, TezDAGStats>) tezDAGStatsMapField.get(stat);
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
LOGGER.debug("Tez JobId:" + dagStats.getJobId());
jobIds.add(dagStats.getJobId());
}
return jobIds;
} catch (Exception e) {
LOGGER.error("Can not extract jobIds from TezPigScriptStats", e);
throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e);
}
}
}

View file

@ -15,6 +15,12 @@
"propertyName": "zeppelin.pig.includeJobStats",
"defaultValue": "false",
"description": "flag to include job stats in output"
},
"zeppelin.pig.execType": {
"envName": "SPARK_MASTER",
"propertyName": "SPARK_MASTER",
"defaultValue": "local",
"description": "local | yarn-client"
}
},
"editor": {

View file

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.pig;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PigInterpreterSparkTest {
private PigInterpreter pigInterpreter;
private InterpreterContext context;
public void setUpSpark(boolean includeJobStats) {
Properties properties = new Properties();
properties.put("zeppelin.pig.execType", "spark_local");
properties.put("zeppelin.pig.includeJobStats", includeJobStats + "");
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null,
null, null);
}
@After
public void tearDown() {
pigInterpreter.close();
}
@Test
public void testBasics() throws IOException {
setUpSpark(false);
String content = "1\tandy\n"
+ "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'"));
// syntax error
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ "foreach a generate $0;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().get(0).getData().contains("expecting one of"));
}
@Test
public void testIncludeJobStats() throws IOException {
setUpSpark(true);
String content = "1\tandy\n"
+ "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");
FileWriter writer = new FileWriter(tmpFile);
IOUtils.write(content, writer);
writer.close();
// simple pig script using dump
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("Spark Job"));
assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)"));
// describe
pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// no job is launched, so no jobStats
assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}"));
// syntax error (compilation error)
pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';"
+ "describe a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
// no job is launched, so no jobStats
assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'"));
// execution error
pigscript = "a = load 'invalid_path';"
+ "dump a;";
result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().get(0).getData().contains("Failed to read data from"));
}
}