upgrade to Apache Beam 2.0.0

This commit is contained in:
mingmxu 2017-08-15 13:15:17 -07:00
parent 340b326d47
commit 93b3e24db4
3 changed files with 13 additions and 16 deletions

View file

@ -8,12 +8,9 @@ Current interpreter implementation supports the static repl. It compiles the cod
You have to first build the Beam interpreter by enable the **beam** profile as follows:
```
mvn clean package -Pbeam -DskipTests
mvn clean package -Pbeam -DskipTests -Pscala-2.10
```
### Notice
- Flink runner comes with binary compiled for scala 2.10. So, currently we support only Scala 2.10
### Technical overview
* Upon starting an interpreter, an instance of `JavaCompiler` is created.

View file

@ -35,7 +35,7 @@
<properties>
<beam.hadoop.version>2.3.0</beam.hadoop.version>
<beam.spark.version>1.6.2</beam.spark.version>
<beam.beam.version>0.2.0-incubating</beam.beam.version>
<beam.beam.version>2.0.0</beam.beam.version>
<!-- library versions -->
<netty.version>4.1.1.Final</netty.version>
@ -211,6 +211,14 @@
<version>${beam.beam.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_${scala.binary.version}</artifactId>
<version>${beam.beam.version}</version>
<exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>

View file

@ -44,18 +44,10 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@ -89,12 +81,12 @@ public class MinimalWordCount {
};
static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
public static void main(String[] args) {
Options options = PipelineOptionsFactory.create().as(Options.class);
PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class);
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
@Override
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
if (!word.isEmpty()) {
@ -105,7 +97,7 @@ public class MinimalWordCount {
}))
.apply(Count.<String> perElement())
.apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
@Override
@ProcessElement
public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
throws Exception {
s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());