mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-116] Add Mahout Interpreters
This commit is contained in:
parent
c5ab10ddd4
commit
7e83832537
3 changed files with 1260 additions and 0 deletions
215
docs/interpreter/mahout.md
Normal file
215
docs/interpreter/mahout.md
Normal file
|
|
@ -0,0 +1,215 @@
|
|||
---
|
||||
layout: page
|
||||
title: "Mahout Interpreter for Apache Zeppelin"
|
||||
description: "Apache Mahout provides a unified API (the R-Like Scala DSL) for quickly creating machine learning algorithms on a variety of engines."
|
||||
group: interpreter
|
||||
---
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
{% include JB/setup %}
|
||||
|
||||
# Apache Mahout Interpreter for Apache Zeppelin
|
||||
|
||||
<div id="toc"></div>
|
||||
|
||||
## Installation
|
||||
|
||||
Apache Mahout is a collection of packages that enable machine learning and matrix algebra on underlying engines such as Apache Flink or Apache Spark. A convenience script for creating and configuring two Mahout enabled interpreters exists. The `%sparkMahout` and `%flinkMahout` interpreters do not exist by default but can be easily created using this script.
|
||||
|
||||
### Easy Installation
|
||||
To quickly and easily get up and running using Apache Mahout, run the following command from the top-level directory of the Zeppelin install:
|
||||
```bash
|
||||
python scripts/mahout/add_mahout.py
|
||||
```
|
||||
|
||||
This will create the `%sparkMahout` and `%flinkMahout` interpreters, and restart Zeppelin.
|
||||
|
||||
### Advanced Installation
|
||||
|
||||
The `add_mahout.py` script contains several command line arguments for advanced users.
|
||||
|
||||
<table class="table-configuration">
|
||||
<tr>
|
||||
<th>Argument</th>
|
||||
<th>Description</th>
|
||||
<th>Example</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>--zeppelin_home</td>
|
||||
<td>This is the path to the Zeppelin installation. This flag is not needed if the script is run from the top-level installation directory or from the `zeppelin/scripts/mahout` directory.</td>
|
||||
<td>/path/to/zeppelin</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>--mahout_home</td>
|
||||
<td>If the user has already installed Mahout, this flag can set the path to `MAHOUT_HOME`. If this is set, downloading Mahout will be skipped.</td>
|
||||
<td>/path/to/mahout_home</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>--restart_later</td>
|
||||
<td>Restarting is necessary for updates to take effect. By default the script will restart Zeppelin for you- restart will be skipped if this flag is set.</td>
|
||||
<td>NA</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>--force_download</td>
|
||||
<td>This flag will force the script to re-download the binary even if it already exists. This is useful for previously failed downloads.</td>
|
||||
<td>NA</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
__NOTE 1:__ Apache Mahout at this time only supports Spark 1.5 and Spark 1.6 and Scala 2.10. If the user is using another version of Spark (e.g. 2.0), the `%sparkMahout` will likely not work. The `%flinkMahout` interpreter will still work and the user is encouraged to develop with that engine as the code can be ported via copy and paste, as is evidenced by the tutorial notebook.
|
||||
|
||||
__NOTE 2:__ If using Apache Flink in cluster mode, the following libraries will also need to be coppied to `${FLINK_HOME}/lib`
|
||||
- mahout-math-0.12.2.jar
|
||||
- mahout-math-scala_2.10-0.12.2.jar
|
||||
- mahout-flink_2.10-0.12.2.jar
|
||||
- mahout-hdfs-0.12.2.jar
|
||||
- [com.google.guava:guava:14.0.1](http://central.maven.org/maven2/com/google/guava/guava/14.0.1/guava-14.0.1.jar)
|
||||
|
||||
## Overview
|
||||
|
||||
The [Apache Mahout](http://mahout.apache.org/)™ project's goal is to build an environment for quickly creating scalable performant machine learning applications.
|
||||
|
||||
Apache Mahout software provides three major features:
|
||||
|
||||
- A simple and extensible programming environment and framework for building scalable algorithms
|
||||
- A wide variety of premade algorithms for Scala + Apache Spark, H2O, Apache Flink
|
||||
- Samsara, a vector math experimentation environment with R-like syntax which works at scale
|
||||
|
||||
In other words:
|
||||
|
||||
*Apache Mahout provides a unified API for quickly creating machine learning algorithms on a variety of engines.*
|
||||
|
||||
## How to use
|
||||
|
||||
When starting a session with Apache Mahout, depending on which engine you are using (Spark or Flink), a few imports must be made and a _Distributed Context_ must be declared. Copy and paste the following code and run once to get started.
|
||||
|
||||
### Flink
|
||||
|
||||
```scala
|
||||
%flinkMahout
|
||||
|
||||
import org.apache.flink.api.scala._
|
||||
import org.apache.mahout.math.drm._
|
||||
import org.apache.mahout.math.drm.RLikeDrmOps._
|
||||
import org.apache.mahout.flinkbindings._
|
||||
import org.apache.mahout.math._
|
||||
import scalabindings._
|
||||
import RLikeOps._
|
||||
|
||||
implicit val ctx = new FlinkDistributedContext(benv)
|
||||
```
|
||||
|
||||
### Spark
|
||||
```scala
|
||||
%sparkMahout
|
||||
|
||||
import org.apache.mahout.math._
|
||||
import org.apache.mahout.math.scalabindings._
|
||||
import org.apache.mahout.math.drm._
|
||||
import org.apache.mahout.math.scalabindings.RLikeOps._
|
||||
import org.apache.mahout.math.drm.RLikeDrmOps._
|
||||
import org.apache.mahout.sparkbindings._
|
||||
|
||||
implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)
|
||||
```
|
||||
|
||||
### Same Code, Different Engines
|
||||
|
||||
After importing and setting up the distributed context, the Mahout R-Like DSL is consistent across engines. The following code will run in both `%flinkMahout` and `%sparkMahout`
|
||||
|
||||
```scala
|
||||
val drmData = drmParallelize(dense(
|
||||
(2, 2, 10.5, 10, 29.509541), // Apple Cinnamon Cheerios
|
||||
(1, 2, 12, 12, 18.042851), // Cap'n'Crunch
|
||||
(1, 1, 12, 13, 22.736446), // Cocoa Puffs
|
||||
(2, 1, 11, 13, 32.207582), // Froot Loops
|
||||
(1, 2, 12, 11, 21.871292), // Honey Graham Ohs
|
||||
(2, 1, 16, 8, 36.187559), // Wheaties Honey Gold
|
||||
(6, 2, 17, 1, 50.764999), // Cheerios
|
||||
(3, 2, 13, 7, 40.400208), // Clusters
|
||||
(3, 3, 13, 4, 45.811716)), numPartitions = 2)
|
||||
|
||||
drmData.collect(::, 0 until 4)
|
||||
|
||||
val drmX = drmData(::, 0 until 4)
|
||||
val y = drmData.collect(::, 4)
|
||||
val drmXtX = drmX.t %*% drmX
|
||||
val drmXty = drmX.t %*% y
|
||||
|
||||
|
||||
val XtX = drmXtX.collect
|
||||
val Xty = drmXty.collect(::, 0)
|
||||
val beta = solve(XtX, Xty)
|
||||
```
|
||||
|
||||
## Leveraging Resource Pools and R for Visualization
|
||||
|
||||
Resource Pools are a powerful Zeppelin feature that lets us share information between interpreters. A fun trick is to take the output of our work in Mahout and analyze it in other languages.
|
||||
|
||||
### Setting up a Resource Pool in Flink
|
||||
|
||||
In Spark based interpreters resource pools are accessed via the ZeppelinContext API. To put and get things from the resource pool one can be done simple
|
||||
```scala
|
||||
val myVal = 1
|
||||
z.put("foo", myVal)
|
||||
val myFetchedVal = z.get("foo")
|
||||
```
|
||||
|
||||
To add this functionality to a Flink based interpreter we declare the follwoing
|
||||
|
||||
```scala
|
||||
%flinkMahout
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext
|
||||
|
||||
val z = InterpreterContext.get().getResourcePool()
|
||||
```
|
||||
|
||||
Now we can access the resource pool in a consistent manner from the `%flinkMahout` interpreter.
|
||||
|
||||
|
||||
### Passing a variable from Mahout to R and Plotting
|
||||
|
||||
In this simple example, we use Mahout (on Flink or Spark, the code is the same) to create a random matrix and then take the Sin of each element. We then randomly sample the matrix and create a tab separated string. Finally we pass that string to R where it is read as a .tsv file, and a DataFrame is created and plotted using native R plotting libraries.
|
||||
|
||||
```scala
|
||||
val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234)
|
||||
val drmRand = drmParallelize(mxRnd)
|
||||
|
||||
|
||||
val drmSin = drmRand.mapBlock() {case (keys, block) =>
|
||||
val blockB = block.like()
|
||||
for (i <- 0 until block.nrow) {
|
||||
blockB(i, 0) = block(i, 0)
|
||||
blockB(i, 1) = Math.sin((block(i, 0) * 8))
|
||||
}
|
||||
keys -> blockB
|
||||
}
|
||||
|
||||
z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))
|
||||
```
|
||||
|
||||
And then in an R paragraph...
|
||||
|
||||
```r
|
||||
%spark.r {"imageWidth": "400px"}
|
||||
|
||||
library("ggplot2")
|
||||
|
||||
sinStr = z.get("flinkSinDrm")
|
||||
|
||||
data <- read.table(text= sinStr, sep="\t", header=FALSE)
|
||||
|
||||
plot(data, col="red")
|
||||
```
|
||||
772
notebook/2BYEZ5EVK/note.json
Normal file
772
notebook/2BYEZ5EVK/note.json
Normal file
File diff suppressed because one or more lines are too long
273
scripts/mahout/add_mahout.py
Normal file
273
scripts/mahout/add_mahout.py
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
# /**
|
||||
# * Licensed to the Apache Software Foundation (ASF) under one
|
||||
# * or more contributor license agreements. See the NOTICE file
|
||||
# * distributed with this work for additional information
|
||||
# * regarding copyright ownership. The ASF licenses this file
|
||||
# * to you under the Apache License, Version 2.0 (the
|
||||
# * "License"); you may not use this file except in compliance
|
||||
# * with the License. You may obtain a copy of the License at
|
||||
# *
|
||||
# * http://www.apache.org/licenses/LICENSE-2.0
|
||||
# *
|
||||
# * Unless required by applicable law or agreed to in writing, software
|
||||
# * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# * See the License for the specific language governing permissions and
|
||||
# * limitations under the License.
|
||||
# */
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
from os.path import isfile
|
||||
from os import getcwd
|
||||
|
||||
from subprocess import call
|
||||
|
||||
|
||||
#######################################################################################################################
|
||||
# I put these here so it will (hopeully) be easy(er) to bump versions / maintain
|
||||
# If there is demand, we could easily make parts or all comand line arguments as well
|
||||
#######################################################################################################################
|
||||
tar_name = "apache-mahout-distribution-0.12.2.tar.gz"
|
||||
mahout_bin_url = "http://apache.osuosl.org/mahout/0.12.2/%s" % tar_name
|
||||
mahout_version = "0.12.2"
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument("--force_download", help="force download Apache Mahout", action="store_true")
|
||||
parser.add_argument("--restart_later", help="force download Apache Mahout", action="store_true")
|
||||
parser.add_argument("--zeppelin_home", help="path to ZEPPELIN_HOME")
|
||||
parser.add_argument("--mahout_home", help="path to MAHOUT_HOME, use this if you have already installed Apache Mahout")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
|
||||
class ZeppelinTerpWrangler:
|
||||
def __init__(self, interpreter_json_path):
|
||||
self.interpreter_json_path = interpreter_json_path
|
||||
|
||||
def _getTerpID(self, terpName):
|
||||
terp_id = None
|
||||
for k, v in self.interpreter_json['interpreterSettings'].iteritems():
|
||||
if v['name'] == terpName:
|
||||
terp_id = k
|
||||
break
|
||||
|
||||
return terp_id
|
||||
|
||||
def createTerp(self, original_terp_name, new_terp_name ):
|
||||
|
||||
new_terp_id = new_terp_name
|
||||
orig_terp_id = self._getTerpID(original_terp_name)
|
||||
|
||||
from copy import deepcopy
|
||||
self.interpreter_json['interpreterSettings'][new_terp_id] = deepcopy(
|
||||
self.interpreter_json['interpreterSettings'][orig_terp_id])
|
||||
self.interpreter_json['interpreterSettings'][new_terp_id]['name'] = new_terp_name
|
||||
self.interpreter_json['interpreterSettings'][new_terp_id]['id'] = new_terp_id
|
||||
print "created new terp '%s' from terp '%s" % (new_terp_name, original_terp_name)
|
||||
|
||||
def _readTerpJson(self):
|
||||
with open(self.interpreter_json_path) as f:
|
||||
self.interpreter_json = json.load(f)
|
||||
|
||||
def _writeTerpJson(self):
|
||||
with open(self.interpreter_json_path, 'wb') as f:
|
||||
json.dump(self.interpreter_json, f, sort_keys=True, indent=4)
|
||||
|
||||
def _updateTerpProp(self, terpName, property, value):
|
||||
terp_id = self._getTerpID(terpName)
|
||||
self.interpreter_json['interpreterSettings'][terp_id]['properties'][property] = value
|
||||
|
||||
def _addTerpDep(self, terpName="", dep="", exclusions=None):
|
||||
if self.interpreter_json == {}:
|
||||
print "no interpreter.json loaded, reading last one downloaded"
|
||||
self._readTerpJson()
|
||||
terp_id = self._getTerpID(terpName)
|
||||
deps = self.interpreter_json['interpreterSettings'][terp_id]['dependencies']
|
||||
|
||||
dep_dict = {
|
||||
u'groupArtifactVersion': dep,
|
||||
u'local': False
|
||||
|
||||
}
|
||||
if exclusions != None:
|
||||
dep_dict["exclusions"] = exclusions
|
||||
deps.append(dep_dict)
|
||||
|
||||
## Remove Duplicate Dependencies
|
||||
seen = set()
|
||||
new_deps = list()
|
||||
for d in deps:
|
||||
t = d.items()
|
||||
if t[0] not in seen:
|
||||
seen.add(t[0])
|
||||
new_deps.append(d)
|
||||
|
||||
self.interpreter_json['interpreterSettings'][terp_id]['dependencies'] = new_deps
|
||||
|
||||
def addMahoutConfig(self, terpName, mahout_home, mahout_version = "0.12.2"):
|
||||
|
||||
print "updating '%s' with Apache Mahout dependencies and settings" % terpName
|
||||
|
||||
terpDeps = ["%s/mahout-math-%s.jar" % (mahout_home, mahout_version),
|
||||
"%s/mahout-math-scala_2.10-%s.jar" % (mahout_home, mahout_version)]
|
||||
|
||||
if "spark" in terpName.lower():
|
||||
configs = {
|
||||
"spark.kryo.referenceTracking": "false",
|
||||
"spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
|
||||
"spark.kryoserializer.buffer": "32k",
|
||||
"spark.kryoserializer.buffer.max": "600m",
|
||||
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
|
||||
}
|
||||
terpDeps.append('%s/mahout-spark_2.10-%s-dependency-reduced.jar' % (mahout_home, mahout_version))
|
||||
terpDeps.append("%s/mahout-spark_2.10-%s.jar" % (mahout_home, mahout_version))
|
||||
terpDeps.append("%s/mahout-spark-shell_2.10-%s.jar" % (mahout_home, mahout_version))
|
||||
|
||||
if "flink" in terpName.lower():
|
||||
configs = {
|
||||
"taskmanager.numberOfTaskSlots" : "12"
|
||||
}
|
||||
addlDeps = [
|
||||
"%s/mahout-flink_2.10-%s.jar" % (mahout_home, mahout_version),
|
||||
"%s/mahout-hdfs-%s.jar" % (mahout_home, mahout_version),
|
||||
"com.google.guava:guava:14.0.1"
|
||||
#"%s/guava-14.0.1.jar" % mahout_home ## reuired in lib dir if running against cluster
|
||||
]
|
||||
for t in addlDeps:
|
||||
terpDeps.append(t)
|
||||
|
||||
for k, v in configs.iteritems():
|
||||
self._updateTerpProp(terpName, k, v)
|
||||
|
||||
for t in terpDeps:
|
||||
self._addTerpDep(terpName, t)
|
||||
|
||||
|
||||
#######################################################################################################################
|
||||
# Need to be sure we know where Zeppelin Top directory is so we can edit conf files
|
||||
#
|
||||
#######################################################################################################################
|
||||
|
||||
def valid_zeppelin_home(path):
|
||||
return isfile(path + "/bin/zeppelin-daemon.sh")
|
||||
|
||||
if args.zeppelin_home == None:
|
||||
zeppelin_home = getcwd()
|
||||
if (zeppelin_home.split("/")[-1] == "bin") and (isfile("zeppelin-daemon.sh")):
|
||||
print "we're in the zeppelin/bin"
|
||||
zeppelin_home = "/".join(zeppelin_home.split("/")[:-1])
|
||||
print "--zeppelin_home not specified, using %s" % zeppelin_home
|
||||
else:
|
||||
zeppelin_home = args.zeppelin_home
|
||||
|
||||
|
||||
if not valid_zeppelin_home(zeppelin_home):
|
||||
print "%s does not appear to be a valid ZEPPELIN_HOME - e.g. the top level directory of the ZEPPELIN install" % zeppelin_home
|
||||
exit(1)
|
||||
else:
|
||||
print "ZEPPELIN_HOME validated"
|
||||
|
||||
interpreter_json_path = zeppelin_home + "/conf/interpreter.json"
|
||||
|
||||
if not isfile(interpreter_json_path):
|
||||
print "interpreter.json doesn't exist. Checking weather Zeppelin is running."
|
||||
status = call(["bin/zeppelin-daemon.sh", 'status'], cwd=zeppelin_home)
|
||||
if status == 1:
|
||||
print "Zeppelin doesn't appear to be running- it is possible that Zeppelin has never been run (interpreter.json is created when Zeppelin is run)"
|
||||
print "I'm going to try to start Zeppelin to create interpreter.json"
|
||||
call(["bin/zeppelin-daemon.sh", 'start'], cwd=zeppelin_home)
|
||||
from time import sleep
|
||||
sleep(3)
|
||||
else:
|
||||
print "We're in the correct top-level directory, Zeppelin appears to be running, but there is no 'interpreter.json'. \
|
||||
\nThis is a confusing case. Please try restarting Zeppelin, but if that doesn't work reach out on the mailing list."
|
||||
|
||||
if isfile(interpreter_json_path):
|
||||
z = ZeppelinTerpWrangler(interpreter_json_path)
|
||||
else:
|
||||
print "'interpreter.json' not found in %s/conf" % args.zeppelin_home
|
||||
exit(1)
|
||||
|
||||
#######################################################################################################################
|
||||
# If --mahout_home not set, download and untar Mahout in to ZEPPELIN_HOME
|
||||
# Set MAHOUT_HOME to ZEPPELIN_HOME/<mahout_untar_dir>
|
||||
#######################################################################################################################
|
||||
|
||||
def download_mahout():
|
||||
if args.force_download:
|
||||
print "--force_download: OK, deleting existing tar if it exists."
|
||||
call(["rm", "%s/%s" % (zeppelin_home, tar_name)])
|
||||
return True
|
||||
elif isfile("%s/%s" % (zeppelin_home, tar_name)):
|
||||
print "%s found, skipping download" % tar_name
|
||||
return False
|
||||
elif args.mahout_home:
|
||||
print "--mahout_home set, skipping download"
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
if download_mahout():
|
||||
call(['wget', mahout_bin_url], cwd= zeppelin_home)
|
||||
call(['tar', 'xzf', tar_name], cwd= zeppelin_home)
|
||||
|
||||
if args.mahout_home:
|
||||
mahout_home = args.mahout_home
|
||||
else:
|
||||
mahout_home = zeppelin_home + "/" + ".".join(tar_name.split(".")[:-2])
|
||||
|
||||
#######################################################################################################################
|
||||
# Create new interpreters
|
||||
#######################################################################################################################
|
||||
|
||||
z._readTerpJson()
|
||||
z.createTerp("spark", "sparkMahout")
|
||||
z.createTerp("flink", "flinkMahout")
|
||||
z.addMahoutConfig("sparkMahout", mahout_home, mahout_version)
|
||||
z.addMahoutConfig("flinkMahout", mahout_home, mahout_version)
|
||||
z._writeTerpJson()
|
||||
|
||||
#######################################################################################################################
|
||||
# Add "export MAHOUT_HOME=... to conf/zeppelin-env.sh
|
||||
# Create if doesn't exist.
|
||||
#######################################################################################################################
|
||||
|
||||
mahout_home_str = '\nexport MAHOUT_HOME=%s\n' % (mahout_home)
|
||||
|
||||
zeppelin_env_sh_path = '%s/conf/zeppelin-env.sh' % zeppelin_home
|
||||
if isfile(zeppelin_env_sh_path):
|
||||
with open(zeppelin_env_sh_path, 'rb') as f:
|
||||
zeppelin_env_sh = f.readlines()
|
||||
if any(["export MAHOUT_HOME=" in line for line in zeppelin_env_sh]):
|
||||
print "'export MAHOUT_HOME=...' already exists in zeppelin_env.sh, not appending"
|
||||
else:
|
||||
print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
|
||||
with open(zeppelin_env_sh_path, 'a') as f:
|
||||
f.write(mahout_home_str)
|
||||
else:
|
||||
print "appending '%s' to conf/zeppelin-env.sh" % mahout_home_str
|
||||
with open(zeppelin_env_sh_path, 'wb') as f:
|
||||
f.write(mahout_home_str)
|
||||
|
||||
|
||||
#######################################################################################################################
|
||||
# You have to restart Apache Zeppelin for new terps to show up... do this for user unless the specified otherwise
|
||||
#
|
||||
#######################################################################################################################
|
||||
if not args.restart_later:
|
||||
print "restarting Apache Zeppelin to load new interpreters..."
|
||||
call(["bin/zeppelin-daemon.sh", 'restart'], cwd= zeppelin_home)
|
||||
else:
|
||||
print "--restart_later flag detected: remember to restart Zeppelin to see new Mahout interpreters!!"
|
||||
|
||||
#######################################################################################################################
|
||||
# Good bye
|
||||
#######################################################################################################################
|
||||
|
||||
print "---------------------------------------------------------------------------------------------------------------"
|
||||
print "all done! Thanks for using Apache Mahout"
|
||||
print "bye"
|
||||
Loading…
Reference in a new issue