ZEPPELIN-1933. Set pig job name and allow to set pig property in pig interpreter setting

This commit is contained in:
Jeff Zhang 2017-01-11 15:58:18 +08:00
parent 8e6bfb45ea
commit 9cee380d44
5 changed files with 44 additions and 0 deletions

View file

@ -47,6 +47,8 @@ group: manual
### How to configure interpreter
At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default.
And you can set any pig properties here which will be passed to pig engine. (like tez.queue.name & mapred.job.queue.name).
Besides, we use paragraph title as job name if it exists, else use the last line of pig script. So you can use that to find app running in YARN RM UI.
<table class="table-configuration">
<tr>
@ -69,6 +71,16 @@ At the Interpreters menu, you have to create a new Pig interpreter. Pig interpre
<td>1000</td>
<td>max row number displayed in <code>%pig.query</code></td>
</tr>
<tr>
<td>tez.queue.name</td>
<td>default</td>
<td>queue name for tez engine</td>
</tr>
<tr>
<td>mapred.job.queue.name</td>
<td>default</td>
<td>queue name for mapreduce engine</td>
</tr>
</table>
### Example

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.pig;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.BackendException;
@ -97,4 +98,21 @@ public abstract class BasePigInterpreter extends Interpreter {
}
public abstract PigServer getPigServer();
/**
* Use paragraph title if it exists, else use the last line of pig script.
* @param cmd
* @param context
* @return
*/
protected String createJobName(String cmd, InterpreterContext context) {
String pTitle = context.getParagraphTitle();
if (!StringUtils.isEmpty(pTitle)) {
return pTitle;
} else {
// use the last line of pig script as the job name.
String[] lines = cmd.split("\n");
return lines[lines.length - 1];
}
}
}

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.pig;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pig.PigServer;
import org.apache.pig.impl.logicalLayer.FrontendException;
@ -58,6 +59,12 @@ public class PigInterpreter extends BasePigInterpreter {
}
try {
pigServer = new PigServer(execType);
for (Map.Entry entry : getProperty().entrySet()) {
if (!entry.getKey().toString().startsWith("zeppelin.")) {
pigServer.getPigContext().getProperties().setProperty(entry.getKey().toString(),
entry.getValue().toString());
}
}
} catch (IOException e) {
LOGGER.error("Fail to initialize PigServer", e);
throw new RuntimeException("Fail to initialize PigServer", e);
@ -78,6 +85,7 @@ public class PigInterpreter extends BasePigInterpreter {
ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream();
File tmpFile = null;
try {
pigServer.setJobName(createJobName(cmd, contextInterpreter));
tmpFile = PigUtils.createTempPigScript(cmd);
System.setOut(new PrintStream(bytesOutput));
// each thread should its own ScriptState & PigStats

View file

@ -78,6 +78,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
StringBuilder resultBuilder = new StringBuilder("%table ");
try {
pigServer.setJobName(createJobName(st, context));
File tmpScriptFile = PigUtils.createTempPigScript(queries);
// each thread should its own ScriptState & PigStats
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());

View file

@ -45,6 +45,7 @@ public class PigInterpreterTezTest {
Properties properties = new Properties();
properties.put("zeppelin.pig.execType", "tez_local");
properties.put("zeppelin.pig.includeJobStats", includeJobStats + "");
properties.put("tez.queue.name", "test");
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null,
@ -60,6 +61,10 @@ public class PigInterpreterTezTest {
public void testBasics() throws IOException {
setUpTez(false);
assertEquals("test",
pigInterpreter.getPigServer().getPigContext().getProperties()
.getProperty("tez.queue.name"));
String content = "1\tandy\n"
+ "2\tpeter\n";
File tmpFile = File.createTempFile("zeppelin", "test");