zeppelin/docs/interpreter/beam.md
mahmoudelgamal 403c8c4e79 [ZEPPELIN-682] New interpreter for Apache Beam (incubating)/DataFlow
## What is this PR for?
The PR is a interpreter for [Apache Beam](http://beam.incubator.apache.org) which is an open source unified platform for data processing pipelines. A pipeline can be build using one of the Beam SDKs.
The execution of the pipeline is done by different Runners . Currently, Beam supports Apache Flink Runner, Apache Spark Runner, and Google Dataflow Runner.

### What type of PR is it?
- Feature

### Todos
* Test case
* Review Comments
* Documentation

### What is the Jira issue?
* [ZEPPELIN-682]

### How should this be tested?
- Start the Zeppelin server
- The prefix of interpreter is `%beam` and then write your code with required imports and the runner

### Screenshots (if appropriate)
![](https://s9.postimg.org/s6eiwrbxb/beam_interpreter.png)
![](https://s9.postimg.org/eq3h8wsrz/visualisation_with_table.png)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? yes

Author: mahmoudelgamal <mahmoudf.elgamal@gmail.com>
Author: mfelgamal <mahmoudf.elgamal@gmail.com>
Author: Fouad <fuad.assayadi@gmail.com>

Closes #1334 from mfelgamal/beam-interpreter-static-repl-7 and squashes the following commits:

da66c27 [mahmoudelgamal] Modify condition of checking static modifier
55c1322 [mahmoudelgamal] set spark version to 1.6.2 and throw original exception
27d7690 [mahmoudelgamal] set spark version to 1.6.1 and some modifications
750041c [mahmoudelgamal] Add readme file and modify pom file and travis.yml
ca88f94 [mahmoudelgamal] edit pom file and .travis.yml
3d65427 [mahmoudelgamal] update .travis.yml file
f19f98d [mahmoudelgamal] Make easy example with imports ands some modifications
74c14ca [mahmoudelgamal] Update the licenses
acc7afb [mahmoudelgamal] Change beam to version 0.2.0
e821614 [mahmoudelgamal] Removing hadoop-core and print stack trace to failure
5cb7c7b [mahmoudelgamal] Add some changes to doc and pom file
75fc4f7 [mahmoudelgamal] add interpreter to navigation.html and remove extra spaces and lines
9b1b385 [mahmoudelgamal] put beam in alphabetical order
9c1e25d [mahmoudelgamal] Adding changes like logging and conventions and license
2aa6d65 [mahmoudelgamal] changing class name to StaticRepl and adding some modifications
7cf25fb [mahmoudelgamal] Adding some tests
3c5038f [mahmoudelgamal] Modifying the documentation
5695077 [mahmoudelgamal] Modifying pom file and Making documentation
26fc59b [mahmoudelgamal] Refactoring of the code
3a2bd85 [mahmoudelgamal] Adding the beam to zeppelin 7
ab7ee2d [mahmoudelgamal] beam interpreter
85957ff [mfelgamal] Merge pull request #10 from apache/master
852c3d3 [mfelgamal] Merge pull request #9 from apache/master
a4bcc0d [mfelgamal] Merge pull request #8 from apache/master
858f1e1 [mfelgamal] Merge pull request #7 from apache/master
03a1e80 [mfelgamal] Merge pull request #4 from apache/master
2586651 [Fouad] Merge pull request #2 from apache/master
2016-09-26 20:11:42 -07:00

4.7 KiB

layout title description group
page Beam interpreter in Apache Zeppelin Apache Beam is an open source, unified programming model that you can use to create a data processing pipeline. interpreter

{% include JB/setup %}

Beam interpreter for Apache Zeppelin

Overview

Apache Beam is an open source unified platform for data processing pipelines. A pipeline can be build using one of the Beam SDKs. The execution of the pipeline is done by different Runners. Currently, Beam supports Apache Flink Runner, Apache Spark Runner, and Google Dataflow Runner.

How to use

Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipeline. Unlike Zeppelin normal pattern, each paragraph is considered as a separate job, there isn't any relation to any other paragraph.

The following is a demonstration of a word count example with data represented in array of strings But it can read data from files by replacing Create.of(SENTENCES).withCoder(StringUtf8Coder.of()) with TextIO.Read.from("path/to/filename.txt")

%beam

// most used imports
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.SparkContext;
import org.apache.beam.runners.direct.*;
import org.apache.beam.sdk.runners.*;
import org.apache.beam.sdk.options.*;
import org.apache.beam.runners.spark.*;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.flink.*;
import org.apache.beam.runners.flink.examples.WordCount.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.options.PipelineOptions;

public class MinimalWordCount {
  static List<String> s = new ArrayList<>();
  
  static final String[] SENTENCES_ARRAY = new String[] {
    "Hadoop is the Elephant King!",
    "A yellow and elegant thing.",
    "He never forgets",
    "Useful data, or lets",
    "An extraneous element cling!",
    "A wonderful king is Hadoop.",
    "The elephant plays well with Sqoop.",
    "But what helps him to thrive",
    "Are Impala, and Hive,",
    "And HDFS in the group.",
    "Hadoop is an elegant fellow.",
    "An elephant gentle and mellow.",
    "He never gets mad,",
    "Or does anything bad,",
    "Because, at his core, he is yellow",
	};  
  static final List<String> SENTENCES = Arrays.asList(SENTENCES_ARRAY);
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.create().as(Options.class);
    options.setRunner(FlinkRunner.class);
    Pipeline p = Pipeline.create(options);
    p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of()))
         .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
           @Override
           public void processElement(ProcessContext c) {
             for (String word : c.element().split("[^a-zA-Z']+")) {
               if (!word.isEmpty()) {
                 c.output(word);
               }
             }
           }
         }))
        .apply(Count.<String> perElement())
        .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
          @Override
          public void processElement(DoFn<KV<String, Long>, String>.ProcessContext arg0)
            throws Exception {
            s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue());
          	}
        }));
    p.run();
    System.out.println("%table word\tcount");
    for (int i = 0; i < s.size(); i++) {
      System.out.print(s.get(i));
    }

  }
}