HTTP-based Elasticsearch client

This commit is contained in:
Bruno Bonnin 2017-01-02 17:48:30 +01:00
parent a9788ff580
commit f4c5ac39fe
11 changed files with 1070 additions and 188 deletions

View file

@ -26,16 +26,16 @@
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-elasticsearch</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: Elasticsearch interpreter</name>
<properties>
<elasticsearch.version>2.3.3</elasticsearch.version>
<httpasyncclient.version>4.0.2</httpasyncclient.version>
<guava.version>18.0</guava.version>
<json-flattener.version>0.1.6</json-flattener.version>
<unirest.version>1.4.9</unirest.version>
</properties>
<dependencies>
@ -51,6 +51,12 @@
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>${httpasyncclient.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@ -64,6 +70,12 @@
<version>${json-flattener.version}</version>
</dependency>
<dependency>
<groupId>com.mashape.unirest</groupId>
<artifactId>unirest-java</artifactId>
<version>${unirest.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View file

@ -18,7 +18,6 @@
package org.apache.zeppelin.elasticsearch;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -34,26 +33,19 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
import org.apache.zeppelin.elasticsearch.client.ElasticsearchClient;
import org.apache.zeppelin.elasticsearch.client.HttpBasedClient;
import org.apache.zeppelin.elasticsearch.client.TransportBasedClient;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -66,7 +58,6 @@ import org.slf4j.LoggerFactory;
import com.github.wnameless.json.flattener.JsonFlattener;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
/**
@ -77,75 +68,82 @@ public class ElasticsearchInterpreter extends Interpreter {
private static Logger logger = LoggerFactory.getLogger(ElasticsearchInterpreter.class);
private static final String HELP = "Elasticsearch interpreter:\n"
+ "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
+ " - indices: list of indices separated by commas (depends on the command)\n"
+ " - types: list of document types separated by commas (depends on the command)\n"
+ "Commands:\n"
+ " - search /indices/types <query>\n"
+ " . indices and types can be omitted (at least, you have to provide '/')\n"
+ " . a query is either a JSON-formatted query, nor a lucene query\n"
+ " - size <value>\n"
+ " . defines the size of the result set (default value is in the config)\n"
+ " . if used, this command must be declared before a search command\n"
+ " - count /indices/types <query>\n"
+ " . same comments as for the search\n"
+ " - get /index/type/id\n"
+ " - delete /index/type/id\n"
+ " - index /ndex/type/id <json-formatted document>\n"
+ " . the id can be omitted, elasticsearch will generate one";
+ "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
+ " - indices: list of indices separated by commas (depends on the command)\n"
+ " - types: list of document types separated by commas (depends on the command)\n"
+ "Commands:\n"
+ " - search /indices/types <query>\n"
+ " . indices and types can be omitted (at least, you have to provide '/')\n"
+ " . a query is either a JSON-formatted query, nor a lucene query\n"
+ " - size <value>\n"
+ " . defines the size of the result set (default value is in the config)\n"
+ " . if used, this command must be declared before a search command\n"
+ " - count /indices/types <query>\n"
+ " . same comments as for the search\n"
+ " - get /index/type/id\n"
+ " - delete /index/type/id\n"
+ " - index /ndex/type/id <json-formatted document>\n"
+ " . the id can be omitted, elasticsearch will generate one";
protected static final List<String> COMMANDS = Arrays.asList(
"count", "delete", "get", "help", "index", "search");
"count", "delete", "get", "help", "index", "search");
private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)");
public static final String ELASTICSEARCH_HOST = "elasticsearch.host";
public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";
public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size";
public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username";
public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password";
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private Client client;
private String host = "localhost";
private int port = 9300;
private String clusterName = "elasticsearch";
private ElasticsearchClient elsClient;
private int resultSize = 10;
public ElasticsearchInterpreter(Properties property) {
super(property);
this.host = getProperty(ELASTICSEARCH_HOST);
this.port = Integer.parseInt(getProperty(ELASTICSEARCH_PORT));
this.clusterName = getProperty(ELASTICSEARCH_CLUSTER_NAME);
try {
this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
} catch (NumberFormatException e) {
this.resultSize = 10;
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
property.get(ELASTICSEARCH_RESULT_SIZE), e);
}
}
@Override
public void open() {
logger.info("Properties: {}", getProperty());
String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE);
clientType = clientType == null ? null : clientType.toLowerCase();
try {
logger.info("prop={}", getProperty());
final Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put(getProperty())
.build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
}
catch (IOException e) {
catch (final NumberFormatException e) {
this.resultSize = 10;
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
property.get(ELASTICSEARCH_RESULT_SIZE), e);
}
try {
if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) {
elsClient = new TransportBasedClient(getProperty());
}
else if ("http".equals(clientType)) {
elsClient = new HttpBasedClient(getProperty());
}
else {
logger.error("Unknown type of Elasticsearch client: " + clientType);
}
}
catch (final IOException e) {
logger.error("Open connection with Elasticsearch", e);
}
}
@Override
public void close() {
if (client != null) {
client.close();
if (elsClient != null) {
elsClient.close();
}
}
@ -159,7 +157,7 @@ public class ElasticsearchInterpreter extends Interpreter {
int currentResultSize = resultSize;
if (client == null) {
if (elsClient == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Problem with the Elasticsearch client, please check your configuration (host, port,...)");
}
@ -178,7 +176,7 @@ public class ElasticsearchInterpreter extends Interpreter {
if (lines.length < 2) {
return processHelp(InterpreterResult.Code.ERROR,
"Size cmd must be followed by a search");
"Size cmd must be followed by a search");
}
final String[] sizeLine = StringUtils.split(lines[0], " ", 2);
@ -219,7 +217,7 @@ public class ElasticsearchInterpreter extends Interpreter {
return processHelp(InterpreterResult.Code.ERROR, "Unknown command");
}
catch (Exception e) {
catch (final Exception e) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Error : " + e.getMessage());
}
}
@ -243,7 +241,7 @@ public class ElasticsearchInterpreter extends Interpreter {
public List<InterpreterCompletion> completion(String s, int i) {
final List suggestions = new ArrayList<>();
for (String cmd : COMMANDS) {
for (final String cmd : COMMANDS) {
if (cmd.toLowerCase().contains(s)) {
suggestions.add(new InterpreterCompletion(cmd, cmd));
}
@ -276,19 +274,18 @@ public class ElasticsearchInterpreter extends Interpreter {
|| StringUtils.isEmpty(urlItems[1])
|| StringUtils.isEmpty(urlItems[2])) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type/id)");
"Bad URL (it should be /index/type/id)");
}
final GetResponse response = client
.prepareGet(urlItems[0], urlItems[1], urlItems[2])
.get();
if (response.isExists()) {
final String json = gson.toJson(response.getSource());
final ActionResponse response = elsClient.get(urlItems[0], urlItems[1], urlItems[2]);
if (response.isSucceeded()) {
final String json = gson.toJson(response.getHit().getSourceAsString());
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
json);
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
json);
}
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
@ -305,15 +302,15 @@ public class ElasticsearchInterpreter extends Interpreter {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
}
final SearchResponse response = searchData(urlItems, data, 0);
final ActionResponse response = searchData(urlItems, data, 0);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
"" + response.getHits().getTotalHits());
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
"" + response.getTotalHits());
}
/**
@ -328,10 +325,10 @@ public class ElasticsearchInterpreter extends Interpreter {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
}
final SearchResponse response = searchData(urlItems, data, size);
final ActionResponse response = searchData(urlItems, data, size);
return buildResponseMessage(response);
}
@ -347,18 +344,16 @@ public class ElasticsearchInterpreter extends Interpreter {
if (urlItems.length < 2 || urlItems.length > 3) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type or /index/type/id)");
"Bad URL (it should be /index/type or /index/type/id)");
}
final IndexResponse response = client
.prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2])
.setSource(data)
.get();
final ActionResponse response = elsClient.index(
urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2], data);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
response.getId());
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
response.getHit().getId());
}
/**
@ -374,54 +369,34 @@ public class ElasticsearchInterpreter extends Interpreter {
|| StringUtils.isEmpty(urlItems[1])
|| StringUtils.isEmpty(urlItems[2])) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type/id)");
"Bad URL (it should be /index/type/id)");
}
final DeleteResponse response = client
.prepareDelete(urlItems[0], urlItems[1], urlItems[2])
.get();
final ActionResponse response = elsClient.delete(urlItems[0], urlItems[1], urlItems[2]);
if (response.isFound()) {
if (response.isSucceeded()) {
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
response.getId());
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
response.getHit().getId());
}
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
}
private SearchResponse searchData(String[] urlItems, String query, int size) {
private ActionResponse searchData(String[] urlItems, String query, int size) {
final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
client, SearchAction.INSTANCE);
reqBuilder.setIndices();
String[] indices = null;
String[] types = null;
if (urlItems.length >= 1) {
reqBuilder.setIndices(StringUtils.split(urlItems[0], ","));
indices = StringUtils.split(urlItems[0], ",");
}
if (urlItems.length > 1) {
reqBuilder.setTypes(StringUtils.split(urlItems[1], ","));
types = StringUtils.split(urlItems[1], ",");
}
if (!StringUtils.isEmpty(query)) {
// The query can be either JSON-formatted, nor a Lucene query
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
try {
final Map source = gson.fromJson(query, Map.class);
reqBuilder.setExtraSource(source);
}
catch (JsonParseException e) {
// This is not a JSON (or maybe not well formatted...)
reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
}
}
reqBuilder.setSize(size);
final SearchResponse response = reqBuilder.get();
return response;
return elsClient.search(indices, types, query, size);
}
private InterpreterResult buildAggResponseMessage(Aggregations aggregations) {
@ -442,8 +417,8 @@ public class ElasticsearchInterpreter extends Interpreter {
final Set<String> headerKeys = new HashSet<>();
final List<Map<String, Object>> buckets = new LinkedList<>();
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
try {
final XContentBuilder builder = XContentFactory.jsonBuilder();
bucket.toXContent(builder, null);
@ -451,22 +426,22 @@ public class ElasticsearchInterpreter extends Interpreter {
headerKeys.addAll(bucketMap.keySet());
buckets.add(bucketMap);
}
catch (IOException e) {
catch (final IOException e) {
logger.error("Processing bucket: " + e.getMessage(), e);
}
}
final StringBuffer buffer = new StringBuffer();
final String[] keys = headerKeys.toArray(new String[0]);
for (String key: keys) {
for (final String key: keys) {
buffer.append("\t" + key);
}
buffer.deleteCharAt(0);
for (Map<String, Object> bucket : buckets) {
for (final Map<String, Object> bucket : buckets) {
buffer.append("\n");
for (String key: keys) {
for (final String key: keys) {
buffer.append(bucket.get(key)).append("\t");
}
buffer.deleteCharAt(buffer.length() - 1);
@ -479,38 +454,64 @@ public class ElasticsearchInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
}
private String buildSearchHitsResponseMessage(SearchHit[] hits) {
private InterpreterResult buildAggResponseMessage(List<AggWrapper> aggregations) {
if (hits == null || hits.length == 0) {
final InterpreterResult.Type resType = InterpreterResult.Type.TABLE;
String resMsg = "";
final Set<String> headerKeys = new HashSet<>();
final List<Map<String, Object>> buckets = new LinkedList<>();
for (final AggWrapper aggregation: aggregations) {
final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(aggregation.getResult());
headerKeys.addAll(bucketMap.keySet());
buckets.add(bucketMap);
}
final StringBuffer buffer = new StringBuffer();
final String[] keys = headerKeys.toArray(new String[0]);
for (final String key: keys) {
buffer.append("\t" + key);
}
buffer.deleteCharAt(0);
for (final Map<String, Object> bucket : buckets) {
buffer.append("\n");
for (final String key: keys) {
buffer.append(bucket.get(key)).append("\t");
}
buffer.deleteCharAt(buffer.length() - 1);
}
resMsg = buffer.toString();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
}
private String buildSearchHitsResponseMessage(ActionResponse response) {
if (response.getHits() == null || response.getHits().size() == 0) {
return "";
}
//First : get all the keys in order to build an ordered list of the values for each hit
//
final Map<String, Object> hitFields = new HashMap<>();
final List<Map<String, Object>> flattenHits = new LinkedList<>();
final Set<String> keys = new TreeSet<>();
for (SearchHit hit : hits) {
// Fields can be found either in _source, or in fields (it depends on the query)
//
String json = hit.getSourceAsString();
if (json == null) {
hitFields.clear();
for (SearchHitField hitField : hit.getFields().values()) {
hitFields.put(hitField.getName(), hitField.getValues());
}
json = gson.toJson(hitFields);
}
for (final HitWrapper hit : response.getHits()) {
final String json = hit.getSourceAsString();
final Map<String, Object> flattenJsonMap = JsonFlattener.flattenAsMap(json);
final Map<String, Object> flattenMap = new HashMap<>();
for (Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
for (final Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
// Replace keys that match a format like that : [\"keyname\"][0]
final String fieldName = iter.next();
final Matcher fieldNameMatcher = FIELD_NAME_PATTERN.matcher(fieldName);
if (fieldNameMatcher.matches()) {
flattenMap.put(fieldNameMatcher.group(1) + fieldNameMatcher.group(2),
flattenJsonMap.get(fieldName));
flattenJsonMap.get(fieldName));
}
else {
flattenMap.put(fieldName, flattenJsonMap.get(fieldName));
@ -518,7 +519,7 @@ public class ElasticsearchInterpreter extends Interpreter {
}
flattenHits.add(flattenMap);
for (String key : flattenMap.keySet()) {
for (final String key : flattenMap.keySet()) {
keys.add(key);
}
}
@ -526,15 +527,15 @@ public class ElasticsearchInterpreter extends Interpreter {
// Next : build the header of the table
//
final StringBuffer buffer = new StringBuffer();
for (String key : keys) {
for (final String key : keys) {
buffer.append(key).append('\t');
}
buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n");
// Finally : build the result by using the key set
//
for (Map<String, Object> hit : flattenHits) {
for (String key : keys) {
for (final Map<String, Object> hit : flattenHits) {
for (final String key : keys) {
final Object val = hit.get(key);
if (val != null) {
buffer.append(val);
@ -547,17 +548,17 @@ public class ElasticsearchInterpreter extends Interpreter {
return buffer.toString();
}
private InterpreterResult buildResponseMessage(SearchResponse response) {
private InterpreterResult buildResponseMessage(ActionResponse response) {
final Aggregations aggregations = response.getAggregations();
final List<AggWrapper> aggregations = response.getAggregations();
if (aggregations != null && aggregations.asList().size() > 0) {
if (aggregations != null && aggregations.size() > 0) {
return buildAggResponseMessage(aggregations);
}
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE,
buildSearchHitsResponseMessage(response.getHits().getHits()));
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE,
buildSearchHitsResponseMessage(response));
}
}

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.action;
/**
* Runtime exception thrown when there is a problem during an action (search, get, ...).
*/
public class ActionException extends RuntimeException {
public ActionException(String message) {
super(message);
}
public ActionException(Throwable cause) {
super(cause);
}
}

View file

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.action;
import java.util.LinkedList;
import java.util.List;
/**
* Contains the result of an action (hits, aggregations, ...).
*/
public class ActionResponse {
private boolean succeeded;
private long totalHits;
private final List<HitWrapper> hits = new LinkedList<>();
private final List<AggWrapper> aggregations = new LinkedList<>();
// public ActionResponse source(String source) {
// this.source = source;
// return this;
// }
public ActionResponse succeeded(boolean succeeded) {
this.succeeded = succeeded;
return this;
}
public boolean isSucceeded() {
return succeeded;
}
// public String getSource() {
// return source;
// }
// public String getId() {
// return id;
// }
// public ActionResponse id(String id) {
// this.id = id;
// return this;
// }
public ActionResponse totalHits(long totalHits) {
this.totalHits = totalHits;
return this;
}
public long getTotalHits() {
return totalHits;
}
public List<HitWrapper> getHits() {
return hits;
}
public ActionResponse addHit(HitWrapper hit) {
this.hits.add(hit);
return this;
}
public List<AggWrapper> getAggregations() {
return aggregations;
}
public ActionResponse addAggregation(AggWrapper aggregation) {
this.aggregations.add(aggregation);
return this;
}
public ActionResponse hit(HitWrapper hit) {
this.addHit(hit);
return this;
}
public HitWrapper getHit() {
return this.hits.get(0);
}
}

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.action;
/**
* Contains the result of an aggregation.
*/
public class AggWrapper {
/** Type of an aggregation (to know if there are buckets or not) */
public enum AggregationType { SIMPLE, MULTI_BUCKETS };
private final AggregationType type;
private final String result;
public AggWrapper(AggregationType type, String result) {
this.type = type;
this.result = result;
}
public AggregationType getType() {
return type;
}
public String getResult() {
return result;
}
}

View file

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.action;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
/**
* Contains the data of a hit.
*/
public class HitWrapper {
private final JsonParser parser = new JsonParser();
private final String index;
private final String type;
private final String id;
private final String source;
public HitWrapper(String index, String type, String id, String source) {
this.index = index;
this.type = type;
this.id = id;
this.source = source;
}
public HitWrapper(String source) {
this(null, null, null, source);
}
public String getSourceAsString() {
return source;
}
public JsonObject getSourceAsJsonObject() {
final JsonElement element = parser.parse(source);
return element.getAsJsonObject();
}
public String getIndex() {
return index;
}
public String getType() {
return type;
}
public String getId() {
return id;
}
}

View file

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.client;
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
/**
* Interface that must be implemented by any kind of Elasticsearch client (transport, ...).
*/
public interface ElasticsearchClient {
ActionResponse get(String index, String type, String id);
ActionResponse index(String index, String type, String id, String data);
ActionResponse delete(String index, String type, String id);
ActionResponse search(String[] indices, String[] types, String query, int size);
void close();
}

View file

@ -0,0 +1,310 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.client;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
import org.apache.zeppelin.elasticsearch.action.ActionException;
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
import org.apache.zeppelin.elasticsearch.action.AggWrapper.AggregationType;
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
import org.json.JSONArray;
import org.json.JSONObject;
import com.google.common.base.Joiner;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import com.mashape.unirest.request.HttpRequest;
import com.mashape.unirest.request.HttpRequestWithBody;
/**
* Elasticsearch client using the HTTP API.
*/
public class HttpBasedClient implements ElasticsearchClient {
private static final String QUERY_STRING_TEMPLATE =
"{ \"query\": { \"query_string\": { \"query\": \"_Q_\", \"analyze_wildcard\": \"true\" } } }";
private final String host;
private final int port;
private final String username;
private final String password;
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
// private HttpHost elasticHost;
// private HttpClientContext context;
// private CredentialsProvider credsProvider;
public HttpBasedClient(Properties props) {
this.host = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
this.port = Integer.parseInt(props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
this.username = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_USERNAME);
this.password = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_PASSWORD);
}
private boolean isSucceeded(HttpResponse response) {
return response.getStatus() >= 200 && response.getStatus() < 300;
}
private JSONObject getParentField(JSONObject parent, String[] fields) {
JSONObject obj = parent;
for (int i = 0; i < fields.length - 1; i++) {
obj = obj.getJSONObject(fields[i]);
}
return obj;
}
private JSONArray getFieldAsArray(JSONObject obj, String field) {
final String[] fields = field.split("/");
final JSONObject parent = getParentField(obj, fields);
return parent.getJSONArray(fields[fields.length - 1]);
}
private String getFieldAsString(HttpResponse<JsonNode> response, String field) {
return response.getBody().getObject().get(field).toString();
}
private long getFieldAsLong(HttpResponse<JsonNode> response, String field) {
final String[] fields = field.split("/");
final JSONObject obj = getParentField(response.getBody().getObject(), fields);
return obj.getLong(fields[fields.length - 1]);
}
private String getUrl(String index, String type, String id) {
final StringBuilder buffer = new StringBuilder();
buffer.append("http://").append(host).append(":").append(port).append("/");
if (StringUtils.isNotEmpty(index)) {
buffer.append(index);
if (StringUtils.isNotEmpty(type)) {
buffer.append("/").append(type);
if (StringUtils.isNotEmpty(id)) {
buffer.append("/").append(id);
}
}
}
return buffer.toString();
}
private String getUrl(String[] indices, String[] types) {
final String inds = indices == null ? null : Joiner.on(",").join(indices);
final String typs = types == null ? null : Joiner.on(",").join(types);
return getUrl(inds, typs, null);
}
@Override
public ActionResponse get(String index, String type, String id) {
ActionResponse response = null;
try {
final HttpRequest request = Unirest.get(getUrl(index, type, id));
if (StringUtils.isNotEmpty(username)) {
request.basicAuth(username, password);
}
final HttpResponse<JsonNode> result = request.asJson();
response = new ActionResponse()
.succeeded(isSucceeded(result))
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
getFieldAsString(result, "_source")));
}
catch (final UnirestException e) {
throw new ActionException(e);
}
return response;
}
@Override
public ActionResponse delete(String index, String type, String id) {
ActionResponse response = null;
try {
final HttpRequest request = Unirest.delete(getUrl(index, type, id));
if (StringUtils.isNotEmpty(username)) {
request.basicAuth(username, password);
}
final HttpResponse<JsonNode> result = request.asJson();
response = new ActionResponse()
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
null))
.succeeded(isSucceeded(result));
}
catch (final UnirestException e) {
throw new ActionException(e);
}
return response;
}
@Override
public ActionResponse index(String index, String type, String id, String data) {
ActionResponse response = null;
try {
HttpRequestWithBody request = null;
if (StringUtils.isEmpty(id)) {
request = Unirest.post(getUrl(index, type, id));
}
else {
request = Unirest.put(getUrl(index, type, id));
}
request
.header("Accept", "application/json")
.header("Content-Type", "application/json")
.body(data).getHttpRequest();
if (StringUtils.isNotEmpty(username)) {
request.basicAuth(username, password);
}
final HttpResponse<JsonNode> result = request.asJson();
response = new ActionResponse()
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
null))
.succeeded(isSucceeded(result));
}
catch (final UnirestException e) {
throw new ActionException(e);
}
return response;
}
@Override
public ActionResponse search(String[] indices, String[] types, String query, int size) {
ActionResponse response = null;
if (!StringUtils.isEmpty(query)) {
// The query can be either JSON-formatted, nor a Lucene query
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
try {
gson.fromJson(query, Map.class);
}
catch (final JsonParseException e) {
// This is not a JSON (or maybe not well formatted...)
query = QUERY_STRING_TEMPLATE.replace("_Q_", query);
}
}
try {
final HttpRequestWithBody request = Unirest
.post(getUrl(indices, types) + "/_search?size=" + size)
.header("Content-Type", "application/json");
if (StringUtils.isNoneEmpty(query)) {
request.header("Accept", "application/json").body(query);
}
if (StringUtils.isNotEmpty(username)) {
request.basicAuth(username, password);
}
final HttpResponse<JsonNode> result = request.asJson();
final JSONObject body = result.getBody() != null ? result.getBody().getObject() : null;
if (isSucceeded(result)) {
final long total = getFieldAsLong(result, "hits/total");
response = new ActionResponse()
.succeeded(true)
.totalHits(total);
if (containsAggs(result)) {
JSONObject aggregationsMap = body.getJSONObject("aggregations");
if (aggregationsMap == null) {
aggregationsMap = body.getJSONObject("aggs");
}
for (final String key: aggregationsMap.keySet()) {
final JSONObject aggResult = aggregationsMap.getJSONObject(key);
if (aggResult.has("buckets")) {
// Multi-bucket aggregations
final Iterator<Object> buckets = aggResult.getJSONArray("buckets").iterator();
while (buckets.hasNext()) {
response.addAggregation(
new AggWrapper(AggregationType.MULTI_BUCKETS, buckets.next().toString()));
}
}
else {
response.addAggregation(
new AggWrapper(AggregationType.SIMPLE, aggregationsMap.toString()));
}
break; // Keep only one aggregation
}
}
else if (size > 0 && total > 0) {
final JSONArray hits = getFieldAsArray(body, "hits/hits");
final Iterator<Object> iter = hits.iterator();
while (iter.hasNext()) {
final JSONObject hit = (JSONObject) iter.next();
final Object data =
hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields");
response.addHit(new HitWrapper(
hit.getString("_index"),
hit.getString("_type"),
hit.getString("_id"),
data.toString()));
}
}
}
else {
throw new ActionException(body.get("error").toString());
}
}
catch (final UnirestException e) {
throw new ActionException(e);
}
return response;
}
private boolean containsAggs(HttpResponse<JsonNode> result) {
return result.getBody() != null &&
(result.getBody().getObject().has("aggregations") ||
result.getBody().getObject().has("aggs"));
}
@Override
public void close() {
}
@Override
public String toString() {
return "HttpBasedClient [host=" + host + ", port=" + port + ", username=" + username + "]";
}
}

View file

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.elasticsearch.client;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
/**
* Elasticsearch client using the transport protocol.
*/
public class TransportBasedClient implements ElasticsearchClient {
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private final Client client;
public TransportBasedClient(Properties props) throws UnknownHostException {
final String host =
props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
final int port = Integer.parseInt(
props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
final String clusterName =
props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME);
final Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put(props)
.build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
}
@Override
public ActionResponse get(String index, String type, String id) {
final GetResponse getResp = client
.prepareGet(index, type, id)
.get();
return new ActionResponse()
.succeeded(getResp.isExists())
.hit(new HitWrapper(
getResp.getIndex(),
getResp.getType(),
getResp.getId(),
getResp.getSourceAsString()));
}
@Override
public ActionResponse delete(String index, String type, String id) {
final DeleteResponse delResp = client
.prepareDelete(index, type, id)
.get();
return new ActionResponse()
.succeeded(delResp.isFound())
.hit(new HitWrapper(
delResp.getIndex(),
delResp.getType(),
delResp.getId(),
null));
}
@Override
public ActionResponse index(String index, String type, String id, String data) {
final IndexResponse idxResp = client
.prepareIndex(index, type, id)
.setSource(data)
.get();
return new ActionResponse()
.succeeded(idxResp.isCreated())
.hit(new HitWrapper(
idxResp.getIndex(),
idxResp.getType(),
idxResp.getId(),
null));
}
@Override
public ActionResponse search(String[] indices, String[] types, String query, int size) {
final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
client, SearchAction.INSTANCE);
reqBuilder.setIndices();
if (indices != null) {
reqBuilder.setIndices(indices);
}
if (types != null) {
reqBuilder.setTypes(types);
}
if (!StringUtils.isEmpty(query)) {
// The query can be either JSON-formatted, nor a Lucene query
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
try {
@SuppressWarnings("rawtypes")
final Map source = gson.fromJson(query, Map.class);
reqBuilder.setExtraSource(source);
}
catch (final JsonParseException e) {
// This is not a JSON (or maybe not well formatted...)
reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
}
}
reqBuilder.setSize(size);
final SearchResponse searchResp = reqBuilder.get();
final ActionResponse actionResp = new ActionResponse()
.succeeded(true)
.totalHits(searchResp.getHits().getTotalHits());
if (searchResp.getAggregations() != null) {
setAggregations(searchResp.getAggregations(), actionResp);
}
else {
for (final SearchHit hit: searchResp.getHits()) {
// Fields can be found either in _source, or in fields (it depends on the query)
// => specific for elasticsearch's version < 5
//
String src = hit.getSourceAsString();
if (src == null) {
final Map<String, Object> hitFields = new HashMap<>();
for (final SearchHitField hitField : hit.getFields().values()) {
hitFields.put(hitField.getName(), hitField.getValues());
}
src = gson.toJson(hitFields);
}
actionResp.addHit(new HitWrapper(hit.getIndex(), hit.getType(), hit.getId(), src));
}
}
return actionResp;
}
private void setAggregations(Aggregations aggregations, ActionResponse actionResp) {
// Only the result of the first aggregation is returned
//
final Aggregation agg = aggregations.asList().get(0);
if (agg instanceof InternalMetricsAggregation) {
actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
XContentHelper.toString((InternalMetricsAggregation) agg).toString()));
}
else if (agg instanceof InternalSingleBucketAggregation) {
actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
XContentHelper.toString((InternalSingleBucketAggregation) agg).toString()));
}
else if (agg instanceof InternalMultiBucketAggregation) {
final Set<String> headerKeys = new HashSet<>();
final List<Map<String, Object>> buckets = new LinkedList<>();
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
try {
final XContentBuilder builder = XContentFactory.jsonBuilder();
bucket.toXContent(builder, null);
actionResp.addAggregation(
new AggWrapper(AggWrapper.AggregationType.MULTI_BUCKETS, builder.string()));
}
catch (final IOException e) {
// Ignored
}
}
}
}
@Override
public void close() {
if (client != null) {
client.close();
}
}
@Override
public String toString() {
return "TransportBasedClient []";
}
}

View file

@ -16,6 +16,12 @@
"defaultValue": "9300",
"description": "The port for Elasticsearch"
},
"elasticsearch.client.type": {
"envName": "ELASTICSEARCH_CLIENT_TYPE",
"propertyName": "elasticsearch.client.type",
"defaultValue": "transport",
"description": "The type of client for Elasticsearch (transport or http)"
},
"elasticsearch.cluster.name": {
"envName": "ELASTICSEARCH_CLUSTER_NAME",
"propertyName": "elasticsearch.cluster.name",
@ -27,6 +33,18 @@
"propertyName": "elasticsearch.result.size",
"defaultValue": "10",
"description": "The size of the result set of a search query"
},
"elasticsearch.basicauth.username": {
"envName": "ELASTICSEARCH_BASIC_AUTH_USERNAME",
"propertyName": "elasticsearch.basicauth.username",
"defaultValue": "",
"description": "Username for a basic authentication"
},
"elasticsearch.basicauth.password": {
"envName": "ELASTICSEARCH_BASIC_AUTH_PASSWORD",
"propertyName": "elasticsearch.basicauth.password",
"defaultValue": "",
"description": "Password for a basic authentication"
}
},
"editor": {

View file

@ -27,6 +27,7 @@ import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -40,13 +41,19 @@ import org.elasticsearch.node.NodeBuilder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
@RunWith(Theories.class)
public class ElasticsearchInterpreterTest {
@DataPoint public static ElasticsearchInterpreter transportInterpreter;
@DataPoint public static ElasticsearchInterpreter httpInterpreter;
private static Client elsClient;
private static Node elsNode;
private static ElasticsearchInterpreter interpreter;
private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" };
private static final int[] STATUS = { 200, 404, 500, 403 };
@ -57,6 +64,8 @@ public class ElasticsearchInterpreterTest {
private static final String ELS_HTTP_PORT = "10200";
private static final String ELS_PATH = "/tmp/els";
private static final AtomicInteger deleteId = new AtomicInteger(4);
@BeforeClass
public static void populate() throws IOException {
@ -99,16 +108,27 @@ public class ElasticsearchInterpreterTest {
final Properties props = new Properties();
props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME);
interpreter = new ElasticsearchInterpreter(props);
interpreter.open();
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "transport");
transportInterpreter = new ElasticsearchInterpreter(props);
transportInterpreter.open();
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_HTTP_PORT);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "http");
httpInterpreter = new ElasticsearchInterpreter(props);
httpInterpreter.open();
}
@AfterClass
public static void clean() {
if (interpreter != null) {
interpreter.close();
if (transportInterpreter != null) {
transportInterpreter.close();
}
if (httpInterpreter != null) {
httpInterpreter.close();
}
if (elsClient != null) {
@ -121,28 +141,35 @@ public class ElasticsearchInterpreterTest {
}
}
@Test
public void testCount() {
@Theory
public void testCount(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("count /unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("count /logs", null);
assertEquals(Code.SUCCESS, res.code());
assertEquals("50", res.message().get(0).getData());
res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testGet() {
@Theory
public void testGet(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("get /logs/http/10", null);
res = interpreter.interpret("get /logs/http/4", null);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("get /logs/_all/4", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testSearch() {
@Theory
public void testSearch(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null);
assertEquals(Code.SUCCESS, res.code());
@ -160,8 +187,8 @@ public class ElasticsearchInterpreterTest {
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testAgg() {
@Theory
public void testAgg(ElasticsearchInterpreter interpreter) {
// Single-value metric
InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " +
@ -183,35 +210,40 @@ public class ElasticsearchInterpreterTest {
res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
" { \"terms\" : { \"field\" : \"status\" } } } }", null);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs { \"aggs\" : { " +
" \"length\" : { \"terms\": { \"field\": \"status\" }, " +
" \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testIndex() {
@Theory
public void testIndex(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("index /logs/http/1000 { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testDelete() {
@Theory
public void testDelete(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("delete /logs/http/11", null);
assertEquals("11", res.message().get(0).getData());
final int testDeleteId = deleteId.decrementAndGet();
res = interpreter.interpret("delete /logs/http/" + testDeleteId, null);
assertEquals(Code.SUCCESS, res.code());
assertEquals("" + testDeleteId, res.message().get(0).getData());
}
@Test
public void testMisc() {
@Theory
public void testMisc(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret(null, null);
assertEquals(Code.SUCCESS, res.code());
@ -220,23 +252,23 @@ public class ElasticsearchInterpreterTest {
assertEquals(Code.SUCCESS, res.code());
}
@Test
public void testCompletion() {
List expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
List expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
@Theory
public void testCompletion(ElasticsearchInterpreter interpreter) {
final List<InterpreterCompletion> expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
final List<InterpreterCompletion> expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
final List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
final List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
final List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
Assert.assertEquals(expectedResultOne, resultOne);
Assert.assertEquals(expectedResultTwo, resultTwo);
List allCompletionList = new ArrayList<>();
for (InterpreterCompletion ic : resultAll) {
final List<String> allCompletionList = new ArrayList<>();
for (final InterpreterCompletion ic : resultAll) {
allCompletionList.add(ic.getName());
}
Assert.assertEquals(interpreter.COMMANDS, allCompletionList);
Assert.assertEquals(ElasticsearchInterpreter.COMMANDS, allCompletionList);
}
}