Make easy example with imports ands some modifications

This commit is contained in:
mahmoudelgamal 2016-09-07 15:09:07 +02:00
parent 74c14ca9e2
commit f19f98d8e4
4 changed files with 377 additions and 328 deletions

View file

@ -10,298 +10,299 @@
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
License for the specific language governing permissions and ~ limitations
under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-beam</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: Beam interpreter</name>
<properties>
<beam.hadoop.version>2.3.0</beam.hadoop.version>
<beam.spark.version>1.4.1</beam.spark.version>
<beam.beam.version>0.2.0-incubating</beam.beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${beam.spark.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>akka-actor_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
<exclusion>
<artifactId>akka-remote_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
<exclusion>
<artifactId>akka-slf4j_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${beam.spark.version}</version>
</dependency>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<dependency>
<groupId>com.thoughtworks.qdox</groupId>
<artifactId>qdox</artifactId>
<version>2.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>${beam.beam.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10-examples</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-beam</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: Beam interpreter</name>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.beam.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/beam</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/beam</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<beam.hadoop.version>2.3.0</beam.hadoop.version>
<beam.spark.version>1.4.1</beam.spark.version>
<beam.beam.version>0.2.0-incubating</beam.beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${beam.spark.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>akka-actor_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
<exclusion>
<artifactId>akka-remote_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
<exclusion>
<artifactId>akka-slf4j_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${beam.spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.thoughtworks.qdox</groupId>
<artifactId>qdox</artifactId>
<version>2.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>${beam.beam.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10-examples</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.beam.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/beam</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/beam</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.beam;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
@ -30,12 +31,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Beam interpreter
*
*/
* Beam interpreter
*
*/
public class BeamInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(BeamInterpreter.class);
Logger logger = LoggerFactory.getLogger(BeamInterpreter.class);
public BeamInterpreter(Properties property) {
super(property);
@ -52,8 +53,9 @@ public class BeamInterpreter extends Interpreter {
// delete all .class files created while compilation process
for (int i = 0; i < dir.list().length; i++) {
File f = dir.listFiles()[i];
if (f.getAbsolutePath().contains(".class"))
if (f.getAbsolutePath().contains(".class")) {
f.delete();
}
}
}
@ -67,7 +69,7 @@ public class BeamInterpreter extends Interpreter {
String res = StaticRepl.execute(generatedClassName, code);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, res);
} catch (Exception e) {
LOGGER.error("Exception in Interpreter while interpret", e);
logger.error("Exception in Interpreter while interpret", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
@ -91,7 +93,7 @@ public class BeamInterpreter extends Interpreter {
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
return Collections.emptyList();
}
}

View file

@ -35,9 +35,7 @@ import com.thoughtworks.qdox.model.JavaSource;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URL;
@ -46,11 +44,12 @@ import java.util.Arrays;
import java.util.List;
/**
*
* StaticRepl for compling the java code in memory
*
*/
public class StaticRepl {
static Logger LOGGER = LoggerFactory.getLogger(StaticRepl.class);
static Logger logger = LoggerFactory.getLogger(StaticRepl.class);
public static String execute(String generatedClassName, String code) throws Exception {
@ -70,22 +69,21 @@ public class StaticRepl {
boolean hasMain = false;
for (int j = 0; j < classes.get(i).getMethods().size(); j++) {
if (classes.get(i).getMethods().get(j).getName().equals("main")) {
mainClassName = classes.get(i).getName();
hasMain = true;
break;
}
}
if (hasMain == true)
if (hasMain == true) {
break;
}
}
// if there isn't Main method, will retuen error
if (mainClassName == null) {
LOGGER.error("Exception for Main method", "There isn't any class containing Main method.");
logger.error("Exception for Main method", "There isn't any class containing Main method.");
throw new Exception("There isn't any class containing Main method.");
}
@ -116,8 +114,9 @@ public class StaticRepl {
// if success is false will get error
if (!success) {
for (Diagnostic diagnostic : diagnostics.getDiagnostics()) {
if (diagnostic.getLineNumber() == -1)
if (diagnostic.getLineNumber() == -1) {
continue;
}
System.err.println("line " + diagnostic.getLineNumber() + " : "
+ diagnostic.getMessage(null));
}
@ -126,7 +125,7 @@ public class StaticRepl {
System.setOut(oldOut);
System.setErr(oldErr);
LOGGER.error("Exception in Interpreter while compilation", baosErr.toString());
logger.error("Exception in Interpreter while compilation", baosErr.toString());
throw new Exception(baosErr.toString());
} else {
try {
@ -149,25 +148,25 @@ public class StaticRepl {
return baosOut.toString();
} catch (ClassNotFoundException e) {
LOGGER.error("Exception in Interpreter while Class not found", e);
logger.error("Exception in Interpreter while Class not found", e);
System.err.println("Class not found: " + e);
e.printStackTrace(newErr);
throw new Exception(baosErr.toString());
} catch (NoSuchMethodException e) {
LOGGER.error("Exception in Interpreter while No such method", e);
logger.error("Exception in Interpreter while No such method", e);
System.err.println("No such method: " + e);
e.printStackTrace(newErr);
throw new Exception(baosErr.toString());
} catch (IllegalAccessException e) {
LOGGER.error("Exception in Interpreter while Illegal access", e);
logger.error("Exception in Interpreter while Illegal access", e);
System.err.println("Illegal access: " + e);
e.printStackTrace(newErr);
throw new Exception(baosErr.toString());
} catch (InvocationTargetException e) {
LOGGER.error("Exception in Interpreter while Invocation target", e);
logger.error("Exception in Interpreter while Invocation target", e);
System.err.println("Invocation target: " + e);
e.printStackTrace(newErr);
throw new Exception(baosErr.toString());

View file

@ -31,38 +31,85 @@ The execution of the pipeline is done by different Runners. Currently, Beam supp
## How to use
Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipeline. Unlike Zeppelin normal pattern, each paragraph is considered as a separate job, there isn't any relation to any other paragraph.
The following is a demonstration of a word count example
The following is a demonstration of a word count example with data represented in array of strings
But it can read data from files by replacing `Create.of(SENTENCES).withCoder(StringUtf8Coder.of())` with `TextIO.Read.from("path/to/filename.txt")`
```java
%beam
// imports are omitted to save space
// most used imports
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.options.PipelineOptions;
public class MinimalWordCount {
static List<String> s = new ArrayList<>();
static final String[] SENTENCES_ARRAY = new String[] {
"Hadoop is the Elephant King!",
"A yellow and elegant thing.",
"He never forgets",
"Useful data, or lets",
"An extraneous element cling!",
"A wonderful king is Hadoop.",
"The elephant plays well with Sqoop.",
"But what helps him to thrive",
"Are Impala, and Hive,",
"And HDFS in the group.",
"Hadoop is an elegant fellow.",
"An elephant gentle and mellow.",
"He never gets mad,",
"Or does anything bad,",
"Because, at his core, he is yellow",
};
static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
public static void main(String[] args) {
Options options = PipelineOptionsFactory.create().as(Options.class);
options.setRunner(FlinkPipelineRunner.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("/home/admin/mahmoud/work/bigdata/beam/shakespeare/input/file1.txt"))
.apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}))
.apply(Count.<String> perElement())
.apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
throws Exception {
throws Exception {
s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
}
}
}));
p.run();
System.out.println("%table word\tcount");