mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Initial implementation of interpreter for Apache Flink
This commit is contained in:
parent
109b0807fc
commit
f2a66df49e
9 changed files with 1178 additions and 2 deletions
|
|
@ -66,7 +66,7 @@
|
|||
|
||||
<property>
|
||||
<name>zeppelin.interpreters</name>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter</value>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value>
|
||||
<description>Comma separated interpreter configurations. First interpreter become a default</description>
|
||||
</property>
|
||||
|
||||
|
|
|
|||
390
flink/pom.xml
Normal file
390
flink/pom.xml
Normal file
|
|
@ -0,0 +1,390 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.5.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-flink</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.5.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: Flink</name>
|
||||
<description>Zeppelin flink support</description>
|
||||
<url>http://zeppelin.incubator.apache.org</url>
|
||||
|
||||
<properties>
|
||||
<flink.version>0.9.0-milestone-1</flink.version>
|
||||
<flink.akka.version>2.3.7</flink.akka.version>
|
||||
<flink.scala.binary.version>2.10</flink.scala.binary.version>
|
||||
<flink.scala.version>2.10.4</flink.scala.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-core</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-clients</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-runtime</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-actor_2.10</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-remote_2.10</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-slf4j_2.10</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-scala</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-actor_${flink.scala.binary.version}</artifactId>
|
||||
<version>${flink.akka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-remote_${flink.scala.binary.version}</artifactId>
|
||||
<version>${flink.akka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-slf4j_${flink.scala.binary.version}</artifactId>
|
||||
<version>${flink.akka.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-testkit_${flink.scala.binary.version}</artifactId>
|
||||
<version>${flink.akka.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>${flink.scala.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-compiler</artifactId>
|
||||
<version>${flink.scala.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-reflect</artifactId>
|
||||
<version>${flink.scala.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>**/.idea/</exclude>
|
||||
<exclude>**/*.iml</exclude>
|
||||
<exclude>.gitignore</exclude>
|
||||
<exclude>**/.settings/*</exclude>
|
||||
<exclude>**/.classpath</exclude>
|
||||
<exclude>**/.project</exclude>
|
||||
<exclude>**/target/**</exclude>
|
||||
<exclude>**/README.md</exclude>
|
||||
<exclude>dependency-reduced-pom.xml</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<!-- Scala Compiler -->
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>3.1.4</version>
|
||||
<executions>
|
||||
<!-- Run scala compiler in the process-resources phase, so that dependencies on
|
||||
scala classes can be resolved later in the (Java) compile phase -->
|
||||
<execution>
|
||||
<id>scala-compile-first</id>
|
||||
<phase>process-resources</phase>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
|
||||
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
|
||||
scala classes can be resolved later in the (Java) test-compile phase -->
|
||||
<execution>
|
||||
<id>scala-test-compile</id>
|
||||
<phase>process-test-resources</phase>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<jvmArgs>
|
||||
<jvmArg>-Xms128m</jvmArg>
|
||||
<jvmArg>-Xmx512m</jvmArg>
|
||||
</jvmArgs>
|
||||
<compilerPlugins combine.children="append">
|
||||
<compilerPlugin>
|
||||
<groupId>org.scalamacros</groupId>
|
||||
<artifactId>paradise_${scala.version}</artifactId>
|
||||
<version>${scala.macros.version}</version>
|
||||
</compilerPlugin>
|
||||
</compilerPlugins>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<!-- Eclipse Integration -->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-eclipse-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<configuration>
|
||||
<downloadSources>true</downloadSources>
|
||||
<projectnatures>
|
||||
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
|
||||
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
|
||||
</projectnatures>
|
||||
<buildcommands>
|
||||
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
|
||||
</buildcommands>
|
||||
<classpathContainers>
|
||||
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
|
||||
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
|
||||
</classpathContainers>
|
||||
<!-- excludes>
|
||||
<exclude>org.scala-lang:scala-library</exclude>
|
||||
<exclude>org.scala-lang:scala-compiler</exclude>
|
||||
</excludes -->
|
||||
<sourceIncludes>
|
||||
<sourceInclude>**/*.scala</sourceInclude>
|
||||
<sourceInclude>**/*.java</sourceInclude>
|
||||
</sourceIncludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<!-- Adding scala source directories to build path -->
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>build-helper-maven-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
<executions>
|
||||
<!-- Add src/main/scala to eclipse build path -->
|
||||
<execution>
|
||||
<id>add-source</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>src/main/scala</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
<!-- Add src/test/scala to eclipse build path -->
|
||||
<execution>
|
||||
<id>add-test-source</id>
|
||||
<phase>generate-test-sources</phase>
|
||||
<goals>
|
||||
<goal>add-test-source</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sources>
|
||||
<source>src/test/scala</source>
|
||||
</sources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.scalastyle</groupId>
|
||||
<artifactId>scalastyle-maven-plugin</artifactId>
|
||||
<version>0.5.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<verbose>false</verbose>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
<failOnWarning>false</failOnWarning>
|
||||
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
|
||||
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
|
||||
<configLocation>${project.basedir}/../_tools/scalastyle.xml</configLocation>
|
||||
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
|
||||
<outputEncoding>UTF-8</outputEncoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<configuration>
|
||||
<forkCount>1</forkCount>
|
||||
<reuseForks>false</reuseForks>
|
||||
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
@ -0,0 +1,130 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.flink;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.flink.api.common.JobExecutionResult;
|
||||
import org.apache.flink.api.common.PlanExecutor;
|
||||
import org.apache.flink.api.java.ExecutionEnvironment;
|
||||
import org.apache.flink.api.java.operators.translation.JavaPlan;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class FlinkEnvironment extends ExecutionEnvironment {
|
||||
Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
|
||||
|
||||
private String host;
|
||||
private int port;
|
||||
|
||||
private FlinkIMain imain;
|
||||
|
||||
public FlinkEnvironment(String host, int port, FlinkIMain imain) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.imain = imain;
|
||||
|
||||
logger.info("jobManager host={}, port={}", host, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobExecutionResult execute(String jobName) throws Exception {
|
||||
JavaPlan plan = createProgramPlan(jobName);
|
||||
|
||||
File jarFile = imain.jar();
|
||||
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
|
||||
jarFile.getAbsolutePath());
|
||||
|
||||
JobExecutionResult result = executor.executePlan(plan);
|
||||
|
||||
if (jarFile.isFile()) {
|
||||
jarFile.delete();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExecutionPlan() throws Exception {
|
||||
JavaPlan plan = createProgramPlan("unnamed", false);
|
||||
plan.setDefaultParallelism(getParallelism());
|
||||
registerCachedFilesWithPlan(plan);
|
||||
|
||||
File jarFile = imain.jar();
|
||||
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
|
||||
jarFile.getAbsolutePath());
|
||||
String jsonPlan = executor.getOptimizerPlanAsJSON(plan);
|
||||
|
||||
if (jarFile != null && jarFile.isFile()) {
|
||||
jarFile.delete();
|
||||
}
|
||||
|
||||
return jsonPlan;
|
||||
}
|
||||
|
||||
/*
|
||||
private File createJar() throws IOException {
|
||||
// create execution environment
|
||||
File jarFile = new File(System.getProperty("java.io.tmpdir")
|
||||
+ "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar");
|
||||
|
||||
|
||||
File[] classFiles = classDir.listFiles();
|
||||
if (classFiles == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
byte buffer[] = new byte[10240];
|
||||
// Open archive file
|
||||
FileOutputStream stream = new FileOutputStream(jarFile);
|
||||
JarOutputStream out = new JarOutputStream(stream, new Manifest());
|
||||
|
||||
for (int i = 0; i < classFiles.length; i++) {
|
||||
File classFile = classFiles[i];
|
||||
if (classFiles == null || !classFile.exists()
|
||||
|| classFile.isDirectory())
|
||||
continue;
|
||||
|
||||
|
||||
// Add class
|
||||
JarEntry jarAdd = new JarEntry(classFile.getName());
|
||||
jarAdd.setTime(classFile.lastModified());
|
||||
out.putNextEntry(jarAdd);
|
||||
logger.info("add class {} into jar", classFile);
|
||||
|
||||
// Write file to archive
|
||||
FileInputStream in = new FileInputStream(classFile);
|
||||
while (true) {
|
||||
int nRead = in.read(buffer, 0, buffer.length);
|
||||
if (nRead <= 0)
|
||||
break;
|
||||
out.write(buffer, 0, nRead);
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
|
||||
out.close();
|
||||
stream.close();
|
||||
return jarFile;
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
package org.apache.zeppelin.flink;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import scala.collection.Iterator;
|
||||
import scala.reflect.io.AbstractFile;
|
||||
import scala.reflect.io.VirtualDirectory;
|
||||
import scala.tools.nsc.Settings;
|
||||
import scala.tools.nsc.interpreter.IMain;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class FlinkIMain extends IMain {
|
||||
Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
|
||||
|
||||
public FlinkIMain(Settings setting, PrintWriter out) {
|
||||
super(setting, out);
|
||||
}
|
||||
|
||||
public File jar() throws IOException {
|
||||
VirtualDirectory classDir = virtualDirectory();
|
||||
// create execution environment
|
||||
File jarBuildDir = new File(System.getProperty("java.io.tmpdir")
|
||||
+ "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis());
|
||||
jarBuildDir.mkdirs();
|
||||
|
||||
File jarFile = new File(System.getProperty("java.io.tmpdir")
|
||||
+ "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar");
|
||||
|
||||
|
||||
Iterator<AbstractFile> vdIt = classDir.iterator();
|
||||
while (vdIt.hasNext()) {
|
||||
AbstractFile fi = vdIt.next();
|
||||
if (fi.isDirectory()) {
|
||||
Iterator<AbstractFile> fiIt = fi.iterator();
|
||||
while (fiIt.hasNext()) {
|
||||
AbstractFile f = fiIt.next();
|
||||
|
||||
// directory for compiled line
|
||||
File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name());
|
||||
lineDir.mkdirs();
|
||||
|
||||
// compiled classes for commands from shell
|
||||
File writeFile = new File(lineDir.getAbsolutePath(), f.name());
|
||||
FileOutputStream outputStream = new FileOutputStream(writeFile);
|
||||
InputStream inputStream = f.input();
|
||||
|
||||
// copy file contents
|
||||
org.apache.commons.io.IOUtils.copy(inputStream, outputStream);
|
||||
|
||||
inputStream.close();
|
||||
outputStream.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// jarr up
|
||||
JarHelper jh = new JarHelper();
|
||||
jh.jarDir(jarBuildDir, jarFile);
|
||||
|
||||
FileUtils.deleteDirectory(jarBuildDir);
|
||||
return jarFile;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,317 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.flink;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.flink.api.java.ExecutionEnvironment;
|
||||
import org.apache.flink.client.program.Client;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import scala.Console;
|
||||
import scala.None;
|
||||
import scala.Some;
|
||||
import scala.tools.nsc.Settings;
|
||||
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
|
||||
import scala.tools.nsc.settings.MutableSettings.PathSetting;
|
||||
|
||||
/**
|
||||
* Interpreter for Apache Flink (http://flink.apache.org)
|
||||
*/
|
||||
public class FlinkInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
|
||||
private Settings settings;
|
||||
private ByteArrayOutputStream out;
|
||||
private FlinkIMain imain;
|
||||
private File classDir;
|
||||
private Map<String, Object> binder;
|
||||
private ExecutionEnvironment env;
|
||||
private Configuration flinkConf;
|
||||
private LocalFlinkMiniCluster localFlinkCluster;
|
||||
private Client client;
|
||||
|
||||
public FlinkInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"flink",
|
||||
"flink",
|
||||
FlinkInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("local", "true", "Run flink locally")
|
||||
.add("jobmanager.rpc.address", "localhost", "Flink cluster")
|
||||
.add("jobmanager.rpc.port", "6123", "Flink cluster")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
URL[] urls = getClassloaderUrls();
|
||||
this.settings = new Settings();
|
||||
|
||||
// set classpath
|
||||
PathSetting pathSettings = settings.classpath();
|
||||
String classpath = "";
|
||||
List<File> paths = currentClassPath();
|
||||
for (File f : paths) {
|
||||
if (classpath.length() > 0) {
|
||||
classpath += File.pathSeparator;
|
||||
}
|
||||
classpath += f.getAbsolutePath();
|
||||
}
|
||||
|
||||
if (urls != null) {
|
||||
for (URL u : urls) {
|
||||
if (classpath.length() > 0) {
|
||||
classpath += File.pathSeparator;
|
||||
}
|
||||
classpath += u.getFile();
|
||||
}
|
||||
}
|
||||
|
||||
pathSettings.v_$eq(classpath);
|
||||
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
|
||||
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
|
||||
.getContextClassLoader()));
|
||||
BooleanSetting b = (BooleanSetting) settings.usejavacp();
|
||||
b.v_$eq(true);
|
||||
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
|
||||
|
||||
out = new ByteArrayOutputStream();
|
||||
imain = new FlinkIMain(settings, new PrintWriter(out));
|
||||
|
||||
initializeFlinkEnv();
|
||||
}
|
||||
|
||||
private boolean localMode() {
|
||||
return Boolean.parseBoolean(getProperty("local"));
|
||||
}
|
||||
|
||||
private String getRpcAddress() {
|
||||
if (localMode()) {
|
||||
return "localhost";
|
||||
} else {
|
||||
return getProperty("jobmanager.rpc.address");
|
||||
}
|
||||
}
|
||||
|
||||
private int getRpcPort() {
|
||||
if (localMode()) {
|
||||
return localFlinkCluster.getJobManagerRPCPort();
|
||||
} else {
|
||||
return Integer.parseInt(getProperty("jobmanager.rpc.port"));
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeFlinkEnv() {
|
||||
// prepare bindings
|
||||
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
|
||||
binder = (Map<String, Object>) getValue("_binder");
|
||||
|
||||
flinkConf = new org.apache.flink.configuration.Configuration();
|
||||
Properties intpProperty = getProperty();
|
||||
for (Object k : intpProperty.keySet()) {
|
||||
String key = (String) k;
|
||||
String val = toString(intpProperty.get(key));
|
||||
flinkConf.setString(key, val);
|
||||
}
|
||||
|
||||
if (localMode()) {
|
||||
startFlinkMiniCluster();
|
||||
}
|
||||
|
||||
env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain);
|
||||
binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env));
|
||||
|
||||
// do import and create val
|
||||
imain.interpret("@transient val env = "
|
||||
+ "_binder.get(\"env\")"
|
||||
+ ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]");
|
||||
|
||||
imain.interpret("import org.apache.flink.api.scala._");
|
||||
}
|
||||
|
||||
|
||||
private List<File> currentClassPath() {
|
||||
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
|
||||
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
|
||||
if (cps != null) {
|
||||
for (String cp : cps) {
|
||||
paths.add(new File(cp));
|
||||
}
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
|
||||
private List<File> classPath(ClassLoader cl) {
|
||||
List<File> paths = new LinkedList<File>();
|
||||
if (cl == null) {
|
||||
return paths;
|
||||
}
|
||||
|
||||
if (cl instanceof URLClassLoader) {
|
||||
URLClassLoader ucl = (URLClassLoader) cl;
|
||||
URL[] urls = ucl.getURLs();
|
||||
if (urls != null) {
|
||||
for (URL url : urls) {
|
||||
paths.add(new File(url.getFile()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
|
||||
public Object getValue(String name) {
|
||||
Object ret = imain.valueOfTerm(name);
|
||||
if (ret instanceof None) {
|
||||
return null;
|
||||
} else if (ret instanceof Some) {
|
||||
return ((Some) ret).get();
|
||||
} else {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
imain.close();
|
||||
|
||||
if (localMode()) {
|
||||
stopFlinkMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
|
||||
InterpreterResult result = interpret(line.split("\n"), context);
|
||||
return result;
|
||||
}
|
||||
|
||||
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
|
||||
Console.setOut(out);
|
||||
System.setOut(new PrintStream(out));
|
||||
out.reset();
|
||||
Code r = null;
|
||||
|
||||
String incomplete = "";
|
||||
for (String s : linesToRun) {
|
||||
scala.tools.nsc.interpreter.Results.Result res = null;
|
||||
try {
|
||||
res = imain.interpret(incomplete + s);
|
||||
} catch (Exception e) {
|
||||
logger.info("Interpreter exception", e);
|
||||
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
|
||||
r = getResultCode(res);
|
||||
|
||||
if (r == Code.ERROR) {
|
||||
return new InterpreterResult(r, out.toString());
|
||||
} else if (r == Code.INCOMPLETE) {
|
||||
incomplete += s + "\n";
|
||||
} else {
|
||||
incomplete = "";
|
||||
}
|
||||
}
|
||||
|
||||
if (r == Code.INCOMPLETE) {
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
} else {
|
||||
return new InterpreterResult(r, out.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
|
||||
if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
|
||||
return Code.SUCCESS;
|
||||
} else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
|
||||
return Code.INCOMPLETE;
|
||||
} else {
|
||||
return Code.ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return new LinkedList<String>();
|
||||
}
|
||||
|
||||
private void startFlinkMiniCluster() {
|
||||
localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
|
||||
localFlinkCluster.waitForTaskManagersToBeRegistered();
|
||||
}
|
||||
|
||||
private void stopFlinkMiniCluster() {
|
||||
if (localFlinkCluster != null) {
|
||||
localFlinkCluster.shutdown();
|
||||
localFlinkCluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
static final String toString(Object o) {
|
||||
return (o instanceof String) ? (String) o : "";
|
||||
}
|
||||
|
||||
}
|
||||
199
flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
Normal file
199
flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
package org.apache.zeppelin.flink;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarInputStream;
|
||||
import java.util.jar.JarOutputStream;
|
||||
|
||||
/**
|
||||
* Provides utility services for jarring and unjarring files and directories.
|
||||
* Note that a given instance of JarHelper is not threadsafe with respect to
|
||||
* multiple jar operations.
|
||||
*
|
||||
* Copied from
|
||||
* http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans
|
||||
* /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
|
||||
*
|
||||
* @author Patrick Calahan <pcal@bea.com>
|
||||
*/
|
||||
public class JarHelper {
|
||||
// ========================================================================
|
||||
// Constants
|
||||
|
||||
private static final int BUFFER_SIZE = 2156;
|
||||
|
||||
// ========================================================================
|
||||
// Variables
|
||||
|
||||
private byte[] mBuffer = new byte[BUFFER_SIZE];
|
||||
private int mByteCount = 0;
|
||||
private boolean mVerbose = false;
|
||||
private String mDestJarName = "";
|
||||
|
||||
// ========================================================================
|
||||
// Constructor
|
||||
|
||||
/**
|
||||
* Instantiates a new JarHelper.
|
||||
*/
|
||||
public JarHelper() {
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Public methods
|
||||
|
||||
/**
|
||||
* Jars a given directory or single file into a JarOutputStream.
|
||||
*/
|
||||
public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
|
||||
|
||||
if (dirOrFile2Jar == null || destJar == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
mDestJarName = destJar.getCanonicalPath();
|
||||
FileOutputStream fout = new FileOutputStream(destJar);
|
||||
JarOutputStream jout = new JarOutputStream(fout);
|
||||
// jout.setLevel(0);
|
||||
try {
|
||||
jarDir(dirOrFile2Jar, jout, null);
|
||||
} catch (IOException ioe) {
|
||||
throw ioe;
|
||||
} finally {
|
||||
jout.close();
|
||||
fout.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unjars a given jar file into a given directory.
|
||||
*/
|
||||
public void unjarDir(File jarFile, File destDir) throws IOException {
|
||||
BufferedOutputStream dest = null;
|
||||
FileInputStream fis = new FileInputStream(jarFile);
|
||||
unjar(fis, destDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an InputStream on a jar file, unjars the contents into the given
|
||||
* directory.
|
||||
*/
|
||||
public void unjar(InputStream in, File destDir) throws IOException {
|
||||
BufferedOutputStream dest = null;
|
||||
JarInputStream jis = new JarInputStream(in);
|
||||
JarEntry entry;
|
||||
while ((entry = jis.getNextJarEntry()) != null) {
|
||||
if (entry.isDirectory()) {
|
||||
File dir = new File(destDir, entry.getName());
|
||||
dir.mkdir();
|
||||
if (entry.getTime() != -1) {
|
||||
dir.setLastModified(entry.getTime());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
int count;
|
||||
byte[] data = new byte[BUFFER_SIZE];
|
||||
File destFile = new File(destDir, entry.getName());
|
||||
if (mVerbose) {
|
||||
System.out
|
||||
.println("unjarring " + destFile + " from " + entry.getName());
|
||||
}
|
||||
FileOutputStream fos = new FileOutputStream(destFile);
|
||||
dest = new BufferedOutputStream(fos, BUFFER_SIZE);
|
||||
while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
|
||||
dest.write(data, 0, count);
|
||||
}
|
||||
dest.flush();
|
||||
dest.close();
|
||||
if (entry.getTime() != -1) {
|
||||
destFile.setLastModified(entry.getTime());
|
||||
}
|
||||
}
|
||||
jis.close();
|
||||
}
|
||||
|
||||
public void setVerbose(boolean b) {
|
||||
mVerbose = b;
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Private methods
|
||||
|
||||
private static final char SEP = '/';
|
||||
|
||||
/**
|
||||
* Recursively jars up the given path under the given directory.
|
||||
*/
|
||||
private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
|
||||
throws IOException {
|
||||
if (mVerbose) {
|
||||
System.out.println("checking " + dirOrFile2jar);
|
||||
}
|
||||
if (dirOrFile2jar.isDirectory()) {
|
||||
String[] dirList = dirOrFile2jar.list();
|
||||
String subPath = (path == null) ? ""
|
||||
: (path + dirOrFile2jar.getName() + SEP);
|
||||
if (path != null) {
|
||||
JarEntry je = new JarEntry(subPath);
|
||||
je.setTime(dirOrFile2jar.lastModified());
|
||||
jos.putNextEntry(je);
|
||||
jos.flush();
|
||||
jos.closeEntry();
|
||||
}
|
||||
for (int i = 0; i < dirList.length; i++) {
|
||||
File f = new File(dirOrFile2jar, dirList[i]);
|
||||
jarDir(f, jos, subPath);
|
||||
}
|
||||
} else {
|
||||
if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
|
||||
if (mVerbose) {
|
||||
System.out.println("skipping " + dirOrFile2jar.getPath());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (mVerbose) {
|
||||
System.out.println("adding " + dirOrFile2jar.getPath());
|
||||
}
|
||||
FileInputStream fis = new FileInputStream(dirOrFile2jar);
|
||||
try {
|
||||
JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
|
||||
entry.setTime(dirOrFile2jar.lastModified());
|
||||
jos.putNextEntry(entry);
|
||||
while ((mByteCount = fis.read(mBuffer)) != -1) {
|
||||
jos.write(mBuffer, 0, mByteCount);
|
||||
if (mVerbose) {
|
||||
System.out.println("wrote " + mByteCount + " bytes");
|
||||
}
|
||||
}
|
||||
jos.flush();
|
||||
jos.closeEntry();
|
||||
} catch (IOException ioe) {
|
||||
throw ioe;
|
||||
} finally {
|
||||
fis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for debugging
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JarHelper jarname.jar directory");
|
||||
return;
|
||||
}
|
||||
|
||||
JarHelper jarHelper = new JarHelper();
|
||||
jarHelper.mVerbose = true;
|
||||
|
||||
File destJar = new File(args[0]);
|
||||
File dirOrFile2Jar = new File(args[1]);
|
||||
|
||||
jarHelper.jarDir(dirOrFile2Jar, destJar);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.flink;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FlinkInterpreterTest {
|
||||
|
||||
private FlinkInterpreter flink;
|
||||
private InterpreterContext context;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Properties p = new Properties();
|
||||
flink = new FlinkInterpreter(p);
|
||||
flink.open();
|
||||
context = new InterpreterContext(null, null, null, null, null, null, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
flink.close();
|
||||
flink.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleStatement() {
|
||||
InterpreterResult result = flink.interpret("val a=1", context);
|
||||
result = flink.interpret("print(a)", context);
|
||||
assertEquals("1", result.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWordCount() {
|
||||
flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
|
||||
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
|
||||
flink.interpret("counts.print()", context);
|
||||
InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
|
||||
assertEquals("", result.message());
|
||||
}
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -92,6 +92,7 @@
|
|||
<module>shell</module>
|
||||
<module>hive</module>
|
||||
<module>tajo</module>
|
||||
<module>flink</module>
|
||||
<module>zeppelin-web</module>
|
||||
<module>zeppelin-server</module>
|
||||
<module>zeppelin-distribution</module>
|
||||
|
|
|
|||
|
|
@ -389,7 +389,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.angular.AngularInterpreter,"
|
||||
+ "org.apache.zeppelin.shell.ShellInterpreter,"
|
||||
+ "org.apache.zeppelin.hive.HiveInterpreter,"
|
||||
+ "org.apache.zeppelin.tajo.TajoInterpreter"),
|
||||
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
|
||||
+ "org.apache.zeppelin.flink.FlinkInterpreter"),
|
||||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
|
|
|
|||
Loading…
Reference in a new issue