diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index 2e633a2000..7c0086c7ca 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -34,6 +34,8 @@ which in turns enables them to handle very large data sets. - MapReduce - Tez_Local (Only Tez 0.7 is supported) - Tez (Only Tez 0.7 is supported) + - Spark_Local (Only Spark 1.6.x is supported, by default it is Spark 1.6.3) + - Spark (Only Spark 1.6.x is supported, by default it is Spark 1.6.3) ## How to use @@ -55,6 +57,20 @@ which in turns enables them to handle very large data sets. HADOOP\_CONF\_DIR and TEZ\_CONF\_DIR needs to be specified in `ZEPPELIN_HOME/conf/zeppelin-env.sh`. +- Spark Local Mode + + Nothing needs to be done for spark local mode + +- Spark Mode + + For now, only yarn-client mode is supported. To enable it, you need to set property SPARK_MASTER to yarn-client + and set SPARK_JAR to the spark assembly file uploaded to hdfs. + +### How to choose custom Spark Version + +By default, Pig Interpreter would use Spark 1.6.3, if you want to use another Spark Version, +you need to rebuild the zeppelin by specifying the custom spark version via -Dpig.spark.version= in the maven build command. + ### How to configure interpreter At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default. @@ -71,7 +87,7 @@ So you can use that to find app running in YARN RM UI. zeppelin.pig.execType mapreduce - Execution mode for pig runtime. local | mapreduce | tez_local | tez + Execution mode for pig runtime. local | mapreduce | tez_local | tez | spark_local | spark zeppelin.pig.includeJobStats @@ -93,6 +109,16 @@ So you can use that to find app running in YARN RM UI. default queue name for mapreduce engine + + SPARK_MASTER + local + local | yarn-client + + + SPARK_JAR + + The spark assembly jar you uploaded to hdfs + ### Example diff --git a/pig/pom.xml b/pig/pom.xml index e58a62ac7c..7060c51efb 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -36,9 +36,11 @@ http://zeppelin.apache.org - 0.16.0 + 0.17.0 2.6.0 0.7.0 + 1.6.3 + 2.11 @@ -68,10 +70,28 @@ org.apache.pig pig - h2 ${pig.version} + + + javax.servlet + servlet-api + + + org.mortbay.jetty + servlet-api + + + org.mortbay.jetty + servlet-api-2.5 + + + + org.python + jython-standalone + 2.7.0 + org.apache.hadoop hadoop-client @@ -82,6 +102,12 @@ org.apache.tez tez-api ${tez.version} + + + javax.servlet + servlet-api + + @@ -94,6 +120,12 @@ org.apache.tez tez-dag ${tez.version} + + + javax.servlet + servlet-api + + @@ -120,6 +152,17 @@ ${tez.version} + + org.apache.spark + spark-core_${pig.scala.version} + ${pig.spark.version} + + + org.apache.spark + spark-yarn_${pig.scala.version} + ${pig.spark.version} + + junit junit diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 973397cead..e7c05e151c 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.PigServer; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.tools.pigscript.parser.ParseException; import org.apache.pig.tools.pigstats.*; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -107,9 +108,12 @@ public class PigInterpreter extends BasePigInterpreter { return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } + if (e.getCause() instanceof ParseException) { + return new InterpreterResult(Code.ERROR, e.getMessage()); + } PigStats stats = PigStats.get(); if (stats != null) { - String errorMsg = PigUtils.extactJobStats(stats); + String errorMsg = stats.getDisplayString(); if (errorMsg != null) { LOGGER.error("Fail to run pig script, " + errorMsg); return new InterpreterResult(Code.ERROR, errorMsg); @@ -127,7 +131,7 @@ public class PigInterpreter extends BasePigInterpreter { StringBuilder outputBuilder = new StringBuilder(); PigStats stats = PigStats.get(); if (stats != null && includeJobStats) { - String jobStats = PigUtils.extactJobStats(stats); + String jobStats = stats.getDisplayString(); if (jobStats != null) { outputBuilder.append(jobStats); } diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java index 385ff45c71..d510c38c92 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -25,6 +25,7 @@ import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.tools.pigscript.parser.ParseException; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.zeppelin.interpreter.*; @@ -134,9 +135,12 @@ public class PigQueryInterpreter extends BasePigInterpreter { return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } } + if (e.getCause() instanceof ParseException) { + return new InterpreterResult(Code.ERROR, e.getMessage()); + } PigStats stats = PigStats.get(); if (stats != null) { - String errorMsg = PigUtils.extactJobStats(stats); + String errorMsg = stats.getDisplayString(); if (errorMsg != null) { return new InterpreterResult(Code.ERROR, errorMsg); } diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java index 43687a5e84..8fc69ed401 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUtils.java @@ -22,6 +22,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.PigRunner; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; @@ -29,6 +30,9 @@ import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; +import org.apache.pig.tools.pigstats.spark.SparkJobStats; +import org.apache.pig.tools.pigstats.spark.SparkPigStats; +import org.apache.pig.tools.pigstats.spark.SparkScriptState; import org.apache.pig.tools.pigstats.tez.TezDAGStats; import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.slf4j.Logger; @@ -39,10 +43,7 @@ import java.io.FileWriter; import java.io.IOException; import java.lang.reflect.Field; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @@ -66,236 +67,4 @@ public class PigUtils { return createTempPigScript(StringUtils.join(lines, "\n")); } - public static String extactJobStats(PigStats stats) { - if (stats instanceof SimplePigStats) { - return extractFromSimplePigStats((SimplePigStats) stats); - } else if (stats instanceof TezPigScriptStats) { - return extractFromTezPigStats((TezPigScriptStats) stats); - } else { - throw new RuntimeException("Unrecognized stats type:" + stats.getClass().getSimpleName()); - } - } - - public static String extractFromSimplePigStats(SimplePigStats stats) { - - try { - Field userIdField = PigStats.class.getDeclaredField("userId"); - userIdField.setAccessible(true); - String userId = (String) (userIdField.get(stats)); - Field startTimeField = PigStats.class.getDeclaredField("startTime"); - startTimeField.setAccessible(true); - long startTime = (Long) (startTimeField.get(stats)); - Field endTimeField = PigStats.class.getDeclaredField("endTime"); - endTimeField.setAccessible(true); - long endTime = (Long) (endTimeField.get(stats)); - - if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) { - LOGGER.warn("unknown return code, can't display the results"); - return null; - } - if (stats.getPigContext() == null) { - LOGGER.warn("unknown exec type, don't display the results"); - return null; - } - - SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); - StringBuilder sb = new StringBuilder(); - sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n"); - sb.append(stats.getHadoopVersion()).append("\t").append(stats.getPigVersion()).append("\t") - .append(userId).append("\t") - .append(sdf.format(new Date(startTime))).append("\t") - .append(sdf.format(new Date(endTime))).append("\t") - .append(stats.getFeatures()).append("\n"); - sb.append("\n"); - if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { - sb.append("Success!\n"); - } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Some jobs have failed! Stop running all dependent jobs\n"); - } else { - sb.append("Failed!\n"); - } - sb.append("\n"); - - Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); - jobPlanField.setAccessible(true); - PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stats); - - if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS - || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Job Stats (time in seconds):\n"); - sb.append(MRJobStats.SUCCESS_HEADER).append("\n"); - List arr = jobPlan.getSuccessfulJobs(); - for (JobStats js : arr) { - sb.append(js.getDisplayString()); - } - sb.append("\n"); - } - if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE - || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Failed Jobs:\n"); - sb.append(MRJobStats.FAILURE_HEADER).append("\n"); - List arr = jobPlan.getFailedJobs(); - for (JobStats js : arr) { - sb.append(js.getDisplayString()); - } - sb.append("\n"); - } - sb.append("Input(s):\n"); - for (InputStats is : stats.getInputStats()) { - sb.append(is.getDisplayString()); - } - sb.append("\n"); - sb.append("Output(s):\n"); - for (OutputStats ds : stats.getOutputStats()) { - sb.append(ds.getDisplayString()); - } - - sb.append("\nCounters:\n"); - sb.append("Total records written : " + stats.getRecordWritten()).append("\n"); - sb.append("Total bytes written : " + stats.getBytesWritten()).append("\n"); - sb.append("Spillable Memory Manager spill count : " - + stats.getSMMSpillCount()).append("\n"); - sb.append("Total bags proactively spilled: " - + stats.getProactiveSpillCountObjects()).append("\n"); - sb.append("Total records proactively spilled: " - + stats.getProactiveSpillCountRecords()).append("\n"); - sb.append("\nJob DAG:\n").append(jobPlan.toString()); - - return "Script Statistics: \n" + sb.toString(); - } catch (Exception e) { - LOGGER.error("Can not extract message from SimplePigStats", e); - return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); - } - } - - private static String extractFromTezPigStats(TezPigScriptStats stats) { - - try { - if (stats.getReturnCode() == PigRunner.ReturnCode.UNKNOWN) { - LOGGER.warn("unknown return code, can't display the results"); - return null; - } - if (stats.getPigContext() == null) { - LOGGER.warn("unknown exec type, don't display the results"); - return null; - } - - Field userIdField = PigStats.class.getDeclaredField("userId"); - userIdField.setAccessible(true); - String userId = (String) (userIdField.get(stats)); - Field startTimeField = PigStats.class.getDeclaredField("startTime"); - startTimeField.setAccessible(true); - long startTime = (Long) (startTimeField.get(stats)); - Field endTimeField = PigStats.class.getDeclaredField("endTime"); - endTimeField.setAccessible(true); - long endTime = (Long) (endTimeField.get(stats)); - - SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); - StringBuilder sb = new StringBuilder(); - sb.append("\n"); - sb.append(String.format("%1$20s: %2$-100s%n", "HadoopVersion", stats.getHadoopVersion())); - sb.append(String.format("%1$20s: %2$-100s%n", "PigVersion", stats.getPigVersion())); - sb.append(String.format("%1$20s: %2$-100s%n", "TezVersion", TezExecType.getTezVersion())); - sb.append(String.format("%1$20s: %2$-100s%n", "UserId", userId)); - sb.append(String.format("%1$20s: %2$-100s%n", "FileName", stats.getFileName())); - sb.append(String.format("%1$20s: %2$-100s%n", "StartedAt", sdf.format(new Date(startTime)))); - sb.append(String.format("%1$20s: %2$-100s%n", "FinishedAt", sdf.format(new Date(endTime)))); - sb.append(String.format("%1$20s: %2$-100s%n", "Features", stats.getFeatures())); - sb.append("\n"); - if (stats.getReturnCode() == PigRunner.ReturnCode.SUCCESS) { - sb.append("Success!\n"); - } else if (stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - sb.append("Some tasks have failed! Stop running all dependent tasks\n"); - } else { - sb.append("Failed!\n"); - } - sb.append("\n"); - - // Print diagnostic info in case of failure - if (stats.getReturnCode() == PigRunner.ReturnCode.FAILURE - || stats.getReturnCode() == PigRunner.ReturnCode.PARTIAL_FAILURE) { - if (stats.getErrorMessage() != null) { - String[] lines = stats.getErrorMessage().split("\n"); - for (int i = 0; i < lines.length; i++) { - String s = lines[i].trim(); - if (i == 0 || !org.apache.commons.lang.StringUtils.isEmpty(s)) { - sb.append(String.format("%1$20s: %2$-100s%n", i == 0 ? "ErrorMessage" : "", s)); - } - } - sb.append("\n"); - } - } - - Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); - tezDAGStatsMapField.setAccessible(true); - Map tezDAGStatsMap = - (Map) tezDAGStatsMapField.get(stats); - int count = 0; - for (TezDAGStats dagStats : tezDAGStatsMap.values()) { - sb.append("\n"); - sb.append("DAG " + count++ + ":\n"); - sb.append(dagStats.getDisplayString()); - sb.append("\n"); - } - - sb.append("Input(s):\n"); - for (InputStats is : stats.getInputStats()) { - sb.append(is.getDisplayString().trim()).append("\n"); - } - sb.append("\n"); - sb.append("Output(s):\n"); - for (OutputStats os : stats.getOutputStats()) { - sb.append(os.getDisplayString().trim()).append("\n"); - } - return "Script Statistics:\n" + sb.toString(); - } catch (Exception e) { - LOGGER.error("Can not extract message from SimplePigStats", e); - return "Can not extract message from SimpelPigStats," + ExceptionUtils.getStackTrace(e); - } - } - - public static List extractJobIds(PigStats stat) { - if (stat instanceof SimplePigStats) { - return extractJobIdsFromSimplePigStats((SimplePigStats) stat); - } else if (stat instanceof TezPigScriptStats) { - return extractJobIdsFromTezPigStats((TezPigScriptStats) stat); - } else { - throw new RuntimeException("Unrecognized stats type:" + stat.getClass().getSimpleName()); - } - } - - public static List extractJobIdsFromSimplePigStats(SimplePigStats stat) { - List jobIds = new ArrayList<>(); - try { - Field jobPlanField = PigStats.class.getDeclaredField("jobPlan"); - jobPlanField.setAccessible(true); - PigStats.JobGraph jobPlan = (PigStats.JobGraph) jobPlanField.get(stat); - List arr = jobPlan.getJobList(); - for (JobStats js : arr) { - jobIds.add(js.getJobId()); - } - return jobIds; - } catch (Exception e) { - LOGGER.error("Can not extract jobIds from SimpelPigStats", e); - throw new RuntimeException("Can not extract jobIds from SimpelPigStats", e); - } - } - - public static List extractJobIdsFromTezPigStats(TezPigScriptStats stat) { - List jobIds = new ArrayList<>(); - try { - Field tezDAGStatsMapField = TezPigScriptStats.class.getDeclaredField("tezDAGStatsMap"); - tezDAGStatsMapField.setAccessible(true); - Map tezDAGStatsMap = - (Map) tezDAGStatsMapField.get(stat); - for (TezDAGStats dagStats : tezDAGStatsMap.values()) { - LOGGER.debug("Tez JobId:" + dagStats.getJobId()); - jobIds.add(dagStats.getJobId()); - } - return jobIds; - } catch (Exception e) { - LOGGER.error("Can not extract jobIds from TezPigScriptStats", e); - throw new RuntimeException("Can not extract jobIds from TezPigScriptStats", e); - } - } } diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json index 583a1268ae..08ed5daa18 100644 --- a/pig/src/main/resources/interpreter-setting.json +++ b/pig/src/main/resources/interpreter-setting.json @@ -15,6 +15,12 @@ "propertyName": "zeppelin.pig.includeJobStats", "defaultValue": "false", "description": "flag to include job stats in output" + }, + "zeppelin.pig.execType": { + "envName": "SPARK_MASTER", + "propertyName": "SPARK_MASTER", + "defaultValue": "local", + "description": "local | yarn-client" } }, "editor": { diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java new file mode 100644 index 0000000000..e821bfea3d --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterSparkTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class PigInterpreterSparkTest { + private PigInterpreter pigInterpreter; + private InterpreterContext context; + + public void setUpSpark(boolean includeJobStats) { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "spark_local"); + properties.put("zeppelin.pig.includeJobStats", includeJobStats + ""); + pigInterpreter = new PigInterpreter(properties); + pigInterpreter.open(); + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null, + null, null); + + } + @After + public void tearDown() { + pigInterpreter.close(); + } + + @Test + public void testBasics() throws IOException { + setUpSpark(false); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'")); + + // syntax error + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "foreach a generate $0;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("expecting one of")); + } + + @Test + public void testIncludeJobStats() throws IOException { + setUpSpark(true); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using dump + String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "';" + + "dump a;"; + InterpreterResult result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("Spark Job")); + assertTrue(result.message().get(0).getData().contains("(1,andy)\n(2,peter)")); + + // describe + pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id: int, name: bytearray);" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // no job is launched, so no jobStats + assertTrue(result.message().get(0).getData().contains("a: {id: int,name: bytearray}")); + + // syntax error (compilation error) + pigscript = "a = loa '" + tmpFile.getAbsolutePath() + "';" + + "describe a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + // no job is launched, so no jobStats + assertTrue(result.message().get(0).getData().contains("Syntax error, unexpected symbol at or near 'a'")); + + // execution error + pigscript = "a = load 'invalid_path';" + + "dump a;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(result.message().get(0).getData().contains("Failed to read data from")); + } + +} + +