mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Added HazelcastJet interpreter utils.
This commit is contained in:
parent
da6941db63
commit
83377ecacb
4 changed files with 321 additions and 4 deletions
|
|
@ -24,13 +24,23 @@ limitations under the License.
|
|||
|
||||
<div id="toc"></div>
|
||||
|
||||
## How to use
|
||||
## Overview
|
||||
[Hazelcast Jet](https://jet.hazelcast.org) is an open source application embeddable, distributed computing engine for In-Memory Streaming and Fast Batch Processing built on top of Hazelcast In-Memory Data Grid (IMDG).
|
||||
With Hazelcast IMDG providing storage functionality, Hazelcast Jet performs parallel execution to enable data-intensive applications to operate in near real-time.
|
||||
|
||||
## Why Hazelcast Jet?
|
||||
There are plenty of solutions which can solve some of these issues, so why choose Hazelcast Jet?
|
||||
When speed and simplicity is important.
|
||||
|
||||
Hazelcast Jet gives you all the infrastructure you need to build a distributed data processing pipeline within one 10Mb Java JAR: processing, storage and clustering.
|
||||
|
||||
As it is built on top of Hazelcast IMDG, Hazelcast Jet comes with in-memory operational storage that’s available out-of-the box. This storage is partitioned, distributed and replicated across the Hazelcast Jet cluster for capacity and resiliency. It can be used as an input data buffer, to publish the results of a Hazelcast Jet computation, to connect multiple Hazelcast Jet jobs or as a lookup cache for data enrichment.
|
||||
|
||||
## How to use the Hazelcast Jet interpreter
|
||||
Basically, you can write normal java code. You should write the main method inside a class because the interpreter invoke this main to execute the code. Unlike Zeppelin normal pattern, each paragraph is considered as a separate job, there isn't any relation to any other paragraph. For example, a variable defined in one paragraph cannot be used in another one as each paragraph is a self contained java main class that is executed and the output returned to Zeppelin.
|
||||
|
||||
|
||||
The following is a demonstration of a word count example with the result represented as an Hazelcast IMDG IMap sink and displayed leveraging Zeppelin's built in visualization using the utility method `JavaInterpreterUtils.displayTableFromSimpleMap`.
|
||||
|
||||
|
||||
```java
|
||||
%hazelcastjet
|
||||
|
||||
|
|
@ -51,7 +61,7 @@ import static com.hazelcast.jet.Traversers.traverseArray;
|
|||
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
|
||||
import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;
|
||||
|
||||
public class HelloWorld {
|
||||
public class DisplayTableFromSimpleMapExample {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
|
|
@ -89,3 +99,42 @@ public class HelloWorld {
|
|||
}
|
||||
```
|
||||
|
||||
The following is a demonstration where the Hazelcast DAG (directed acyclic graph) is displayed as a graph leveraging Zeppelin's built in visualization using the utility method `HazelcastJetInterpreterUtils.displayNetworkFromDAG`.
|
||||
This is particularly useful to understand how the high level Pipeline is then converted to the Jet’s low-level Core API.
|
||||
|
||||
```java
|
||||
%hazelcastjet
|
||||
|
||||
import com.hazelcast.jet.pipeline.Pipeline;
|
||||
import com.hazelcast.jet.pipeline.Sinks;
|
||||
import com.hazelcast.jet.pipeline.Sources;
|
||||
|
||||
import org.apache.zeppelin.hazelcastjet.HazelcastJetInterpreterUtils;
|
||||
|
||||
import static com.hazelcast.jet.Traversers.traverseArray;
|
||||
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
|
||||
import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;
|
||||
|
||||
public class DisplayNetworkFromDAGExample {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
// Create the specification of the computation pipeline. Note
|
||||
// it's a pure POJO: no instance of Jet needed to create it.
|
||||
Pipeline p = Pipeline.create();
|
||||
p.drawFrom(Sources.<String>list("text"))
|
||||
.flatMap(word ->
|
||||
traverseArray(word.toLowerCase().split("\\W+"))).setName("flat traversing")
|
||||
.filter(word -> !word.isEmpty())
|
||||
.groupingKey(wholeItem())
|
||||
.aggregate(counting())
|
||||
.drainTo(Sinks.map("counts"));
|
||||
|
||||
// Diplay the results with Zeppelin %network
|
||||
System.out.println(HazelcastJetInterpreterUtils.displayNetworkFromDAG(p.toDag()));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
|||
|
|
@ -15,3 +15,5 @@ Current interpreter implementation supports the static REPL. It compiles the cod
|
|||
* If there is any error during compilation, it can catch and redirect to Zeppelin.
|
||||
|
||||
* `JavaInterpreterUtils` contains useful methods to print out Java collections and leverage Zeppelin's built in visualization.
|
||||
|
||||
* `HazelcastJetInterpreterUtils` contains useful methods to print out Hazelcast specific classes (such as DAG) and leverage Zeppelin's built in visualization.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.hazelcastjet;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.hazelcast.jet.core.DAG;
|
||||
import org.apache.zeppelin.interpreter.graph.GraphResult;
|
||||
import org.apache.zeppelin.tabledata.Node;
|
||||
import org.apache.zeppelin.tabledata.Relationship;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Hazelcast Jet interpreter utility methods
|
||||
*/
|
||||
public class HazelcastJetInterpreterUtils {
|
||||
|
||||
private static final Gson gson = new Gson();
|
||||
|
||||
/**
|
||||
* Convert an Hazelcast Jet DAG to %network display system
|
||||
* to leverage Zeppelin's built in visualization
|
||||
* @param dag DAG object to convert
|
||||
* @return Zeppelin %network
|
||||
*/
|
||||
public static String displayNetworkFromDAG(DAG dag){
|
||||
GraphResult.Graph graph = new GraphResult.Graph();
|
||||
graph.setDirected(true);
|
||||
|
||||
// Map between vertex name (from DAG) and node id (for graph)
|
||||
Map<String, Integer> nodeIds = new HashMap<>();
|
||||
|
||||
// Create graph nodes based on dag vertices
|
||||
List<Node> nodes = new ArrayList<>();
|
||||
AtomicInteger nodeCount = new AtomicInteger(1);
|
||||
dag.forEach(v -> {
|
||||
// Assign an index to the vertex name
|
||||
nodeIds.put(v.getName(), nodeCount.getAndIncrement());
|
||||
Node node = new Node();
|
||||
node.setId(nodeIds.get(v.getName()));
|
||||
// Define node label from vertex name
|
||||
if (v.getName().toLowerCase().contains("sink"))
|
||||
node.setLabel("Sink");
|
||||
else if (v.getName().toLowerCase().contains("source"))
|
||||
node.setLabel("Source");
|
||||
else
|
||||
node.setLabel("Transform");
|
||||
// Add node description
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("description", v.getName());
|
||||
node.setData(data);
|
||||
nodes.add(node);
|
||||
});
|
||||
graph.setNodes(nodes);
|
||||
|
||||
// Set labels colors
|
||||
Map<String, String> labels = new HashMap<>();
|
||||
labels.put("Source", "#00317c");
|
||||
labels.put("Transform", "#ff7600");
|
||||
labels.put("Sink", "#00317c");
|
||||
graph.setLabels(labels);
|
||||
|
||||
// Map between edge name (from DAG) and relationship id (for graph)
|
||||
Map<String, Integer> edgeIds = new HashMap<>();
|
||||
|
||||
// Create graph relationships
|
||||
List<Relationship> rels = new ArrayList<>();
|
||||
AtomicInteger relCount = new AtomicInteger(1);
|
||||
dag.forEach(v -> {
|
||||
dag.getInboundEdges(v.getName()).forEach(e -> {
|
||||
String edgeName = e.getSourceName() + " to " + e.getDestName();
|
||||
if (edgeIds.get(edgeName) == null) {
|
||||
// Assign an index to the edge name if not found
|
||||
edgeIds.put(edgeName, relCount.getAndIncrement());
|
||||
Relationship rel = new Relationship();
|
||||
rel.setId(edgeIds.get(edgeName));
|
||||
rel.setSource(nodeIds.get(e.getSourceName()));
|
||||
rel.setTarget(nodeIds.get(e.getDestName()));
|
||||
// Add rel data
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("routing", e.getRoutingPolicy().toString());
|
||||
data.put("priority", e.getPriority());
|
||||
data.put("distributed", e.isDistributed());
|
||||
rel.setData(data);
|
||||
rels.add(rel);
|
||||
}
|
||||
});
|
||||
dag.getOutboundEdges(v.getName()).forEach(e -> {
|
||||
String edgeName = e.getSourceName() + " to " + e.getDestName();
|
||||
if (edgeIds.get(edgeName) == null) {
|
||||
// Assign an index to the edge name if not found
|
||||
edgeIds.put(edgeName, relCount.getAndIncrement());
|
||||
Relationship rel = new Relationship();
|
||||
rel.setId(edgeIds.get(edgeName));
|
||||
rel.setSource(nodeIds.get(e.getSourceName()));
|
||||
rel.setTarget(nodeIds.get(e.getDestName()));
|
||||
// Add rel data
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("routing", e.getRoutingPolicy().toString());
|
||||
data.put("priority", e.getPriority());
|
||||
data.put("distributed", e.isDistributed());
|
||||
rel.setData(data);
|
||||
rels.add(rel);
|
||||
}
|
||||
});
|
||||
});
|
||||
graph.setEdges(rels);
|
||||
|
||||
return "%network " + gson.toJson(graph);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.hazelcastjet;
|
||||
|
||||
import com.hazelcast.jet.pipeline.Pipeline;
|
||||
import com.hazelcast.jet.pipeline.Sinks;
|
||||
import com.hazelcast.jet.pipeline.Sources;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.Properties;
|
||||
|
||||
import static com.hazelcast.jet.Traversers.traverseArray;
|
||||
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
|
||||
import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class HazelcastJetInterpreterUtilsTest {
|
||||
|
||||
private static final String NETWORK_RESULT_1 = "%network " +
|
||||
"{\"nodes\":[" +
|
||||
"{\"id\":1,\"data\":{\"description\":\"listSource(text)\"},\"label\":\"Source\"}," +
|
||||
"{\"id\":2,\"data\":{\"description\":\"flat traversing\"},\"label\":\"Transform\"}," +
|
||||
"{\"id\":3,\"data\":{\"description\":\"filter\"},\"label\":\"Transform\"}," +
|
||||
"{\"id\":4,\"data\":{\"description\":\"group-and-aggregate-step1\"}," +
|
||||
"\"label\":\"Transform\"}," +
|
||||
"{\"id\":5,\"data\":{\"description\":\"group-and-aggregate-step2\"}," +
|
||||
"\"label\":\"Transform\"}," +
|
||||
"{\"id\":6,\"data\":{\"description\":\"mapSink(counts)\"},\"label\":\"Sink\"}]," +
|
||||
"\"edges\":[" +
|
||||
"{\"source\":1,\"target\":2,\"id\":1,\"data\":{\"routing\":\"UNICAST\"," +
|
||||
"\"distributed\":false,\"priority\":0}}," +
|
||||
"{\"source\":2,\"target\":3,\"id\":2,\"data\":{\"routing\":\"UNICAST\"," +
|
||||
"\"distributed\":false,\"priority\":0}}," +
|
||||
"{\"source\":3,\"target\":4,\"id\":3,\"data\":{\"routing\":\"PARTITIONED\"," +
|
||||
"\"distributed\":false,\"priority\":0}}," +
|
||||
"{\"source\":4,\"target\":5,\"id\":4,\"data\":{\"routing\":\"PARTITIONED\"," +
|
||||
"\"distributed\":true,\"priority\":0}}," +
|
||||
"{\"source\":5,\"target\":6,\"id\":5,\"data\":{\"routing\":\"UNICAST\"," +
|
||||
"\"distributed\":false,\"priority\":0}}]," +
|
||||
"\"labels\":{\"Sink\":\"#00317c\",\"Transform\":\"#ff7600\",\"Source\":\"#00317c\"}," +
|
||||
"\"directed\":true}";
|
||||
|
||||
private static HazelcastJetInterpreter jet;
|
||||
private static InterpreterContext context;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
Properties p = new Properties();
|
||||
jet = new HazelcastJetInterpreter(p);
|
||||
jet.open();
|
||||
context = InterpreterContext.builder().build();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
jet.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisplayNetworkFromDAGUtil() {
|
||||
|
||||
Pipeline p = Pipeline.create();
|
||||
p.drawFrom(Sources.<String>list("text"))
|
||||
.flatMap(word ->
|
||||
traverseArray(word.toLowerCase().split("\\W+"))).setName("flat traversing")
|
||||
.filter(word -> !word.isEmpty())
|
||||
.groupingKey(wholeItem())
|
||||
.aggregate(counting())
|
||||
.drainTo(Sinks.map("counts"));
|
||||
|
||||
assertEquals(
|
||||
NETWORK_RESULT_1,
|
||||
HazelcastJetInterpreterUtils.displayNetworkFromDAG(p.toDag())
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaticReplWithdisplayNetworkFromDAGUtilReturnNetworkType() {
|
||||
|
||||
StringWriter writer = new StringWriter();
|
||||
PrintWriter out = new PrintWriter(writer);
|
||||
out.println("import com.hazelcast.jet.pipeline.Pipeline;");
|
||||
out.println("import com.hazelcast.jet.pipeline.Sinks;");
|
||||
out.println("import com.hazelcast.jet.pipeline.Sources;");
|
||||
out.println("import org.apache.zeppelin.hazelcastjet.HazelcastJetInterpreterUtils;");
|
||||
out.println("import static com.hazelcast.jet.Traversers.traverseArray;");
|
||||
out.println("import static com.hazelcast.jet.aggregate.AggregateOperations.counting;");
|
||||
out.println("import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;");
|
||||
out.println("public class HelloWorld {");
|
||||
out.println(" public static void main(String args[]) {");
|
||||
out.println(" Pipeline p = Pipeline.create();");
|
||||
out.println(" p.drawFrom(Sources.<String>list(\"text\"))");
|
||||
out.println(" .flatMap(word ->");
|
||||
out.println(" traverseArray(word.toLowerCase().split(\"\\\\W+\")))" +
|
||||
".setName(\"flat traversing\")");
|
||||
out.println(" .filter(word -> !word.isEmpty())");
|
||||
out.println(" .groupingKey(wholeItem())");
|
||||
out.println(" .aggregate(counting())");
|
||||
out.println(" .drainTo(Sinks.map(\"counts\"));");
|
||||
out.println(" System.out.println(HazelcastJetInterpreterUtils" +
|
||||
".displayNetworkFromDAG(p.toDag()));");
|
||||
out.println(" }");
|
||||
out.println("}");
|
||||
out.close();
|
||||
|
||||
InterpreterResult res = jet.interpret(writer.toString(), context);
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, res.code());
|
||||
assertEquals(InterpreterResult.Type.NETWORK, res.message().get(0).getType());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue