mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1933. Set pig job name and allow to set pig property in pig interpreter setting
This commit is contained in:
parent
8e6bfb45ea
commit
9cee380d44
5 changed files with 44 additions and 0 deletions
|
|
@ -47,6 +47,8 @@ group: manual
|
|||
### How to configure interpreter
|
||||
|
||||
At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default.
|
||||
And you can set any pig properties here which will be passed to pig engine. (like tez.queue.name & mapred.job.queue.name).
|
||||
Besides, we use paragraph title as job name if it exists, else use the last line of pig script. So you can use that to find app running in YARN RM UI.
|
||||
|
||||
<table class="table-configuration">
|
||||
<tr>
|
||||
|
|
@ -69,6 +71,16 @@ At the Interpreters menu, you have to create a new Pig interpreter. Pig interpre
|
|||
<td>1000</td>
|
||||
<td>max row number displayed in <code>%pig.query</code></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>tez.queue.name</td>
|
||||
<td>default</td>
|
||||
<td>queue name for tez engine</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>mapred.job.queue.name</td>
|
||||
<td>default</td>
|
||||
<td>queue name for mapreduce engine</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
### Example
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.pig;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.pig.PigServer;
|
||||
import org.apache.pig.backend.BackendException;
|
||||
|
|
@ -97,4 +98,21 @@ public abstract class BasePigInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
public abstract PigServer getPigServer();
|
||||
|
||||
/**
|
||||
* Use paragraph title if it exists, else use the last line of pig script.
|
||||
* @param cmd
|
||||
* @param context
|
||||
* @return
|
||||
*/
|
||||
protected String createJobName(String cmd, InterpreterContext context) {
|
||||
String pTitle = context.getParagraphTitle();
|
||||
if (!StringUtils.isEmpty(pTitle)) {
|
||||
return pTitle;
|
||||
} else {
|
||||
// use the last line of pig script as the job name.
|
||||
String[] lines = cmd.split("\n");
|
||||
return lines[lines.length - 1];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.pig;
|
||||
|
||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.pig.PigServer;
|
||||
import org.apache.pig.impl.logicalLayer.FrontendException;
|
||||
|
|
@ -58,6 +59,12 @@ public class PigInterpreter extends BasePigInterpreter {
|
|||
}
|
||||
try {
|
||||
pigServer = new PigServer(execType);
|
||||
for (Map.Entry entry : getProperty().entrySet()) {
|
||||
if (!entry.getKey().toString().startsWith("zeppelin.")) {
|
||||
pigServer.getPigContext().getProperties().setProperty(entry.getKey().toString(),
|
||||
entry.getValue().toString());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Fail to initialize PigServer", e);
|
||||
throw new RuntimeException("Fail to initialize PigServer", e);
|
||||
|
|
@ -78,6 +85,7 @@ public class PigInterpreter extends BasePigInterpreter {
|
|||
ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream();
|
||||
File tmpFile = null;
|
||||
try {
|
||||
pigServer.setJobName(createJobName(cmd, contextInterpreter));
|
||||
tmpFile = PigUtils.createTempPigScript(cmd);
|
||||
System.setOut(new PrintStream(bytesOutput));
|
||||
// each thread should its own ScriptState & PigStats
|
||||
|
|
|
|||
|
|
@ -78,6 +78,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
|
|||
|
||||
StringBuilder resultBuilder = new StringBuilder("%table ");
|
||||
try {
|
||||
pigServer.setJobName(createJobName(st, context));
|
||||
File tmpScriptFile = PigUtils.createTempPigScript(queries);
|
||||
// each thread should its own ScriptState & PigStats
|
||||
ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState());
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ public class PigInterpreterTezTest {
|
|||
Properties properties = new Properties();
|
||||
properties.put("zeppelin.pig.execType", "tez_local");
|
||||
properties.put("zeppelin.pig.includeJobStats", includeJobStats + "");
|
||||
properties.put("tez.queue.name", "test");
|
||||
pigInterpreter = new PigInterpreter(properties);
|
||||
pigInterpreter.open();
|
||||
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null,
|
||||
|
|
@ -60,6 +61,10 @@ public class PigInterpreterTezTest {
|
|||
public void testBasics() throws IOException {
|
||||
setUpTez(false);
|
||||
|
||||
assertEquals("test",
|
||||
pigInterpreter.getPigServer().getPigContext().getProperties()
|
||||
.getProperty("tez.queue.name"));
|
||||
|
||||
String content = "1\tandy\n"
|
||||
+ "2\tpeter\n";
|
||||
File tmpFile = File.createTempFile("zeppelin", "test");
|
||||
|
|
|
|||
Loading…
Reference in a new issue