mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
HTTP-based Elasticsearch client
This commit is contained in:
parent
a9788ff580
commit
f4c5ac39fe
11 changed files with 1070 additions and 188 deletions
|
|
@ -26,16 +26,16 @@
|
|||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-elasticsearch</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
<name>Zeppelin: Elasticsearch interpreter</name>
|
||||
|
||||
<properties>
|
||||
<elasticsearch.version>2.3.3</elasticsearch.version>
|
||||
<httpasyncclient.version>4.0.2</httpasyncclient.version>
|
||||
<guava.version>18.0</guava.version>
|
||||
<json-flattener.version>0.1.6</json-flattener.version>
|
||||
<unirest.version>1.4.9</unirest.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -51,6 +51,12 @@
|
|||
<artifactId>elasticsearch</artifactId>
|
||||
<version>${elasticsearch.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpasyncclient</artifactId>
|
||||
<version>${httpasyncclient.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
|
|
@ -64,6 +70,12 @@
|
|||
<version>${json-flattener.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.mashape.unirest</groupId>
|
||||
<artifactId>unirest-java</artifactId>
|
||||
<version>${unirest.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.zeppelin.elasticsearch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -34,26 +33,19 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
|
||||
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
|
||||
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
|
||||
import org.apache.zeppelin.elasticsearch.client.ElasticsearchClient;
|
||||
import org.apache.zeppelin.elasticsearch.client.HttpBasedClient;
|
||||
import org.apache.zeppelin.elasticsearch.client.TransportBasedClient;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHitField;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
|
|
@ -66,7 +58,6 @@ import org.slf4j.LoggerFactory;
|
|||
import com.github.wnameless.json.flattener.JsonFlattener;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.JsonParseException;
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -77,75 +68,82 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
private static Logger logger = LoggerFactory.getLogger(ElasticsearchInterpreter.class);
|
||||
|
||||
private static final String HELP = "Elasticsearch interpreter:\n"
|
||||
+ "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
|
||||
+ " - indices: list of indices separated by commas (depends on the command)\n"
|
||||
+ " - types: list of document types separated by commas (depends on the command)\n"
|
||||
+ "Commands:\n"
|
||||
+ " - search /indices/types <query>\n"
|
||||
+ " . indices and types can be omitted (at least, you have to provide '/')\n"
|
||||
+ " . a query is either a JSON-formatted query, nor a lucene query\n"
|
||||
+ " - size <value>\n"
|
||||
+ " . defines the size of the result set (default value is in the config)\n"
|
||||
+ " . if used, this command must be declared before a search command\n"
|
||||
+ " - count /indices/types <query>\n"
|
||||
+ " . same comments as for the search\n"
|
||||
+ " - get /index/type/id\n"
|
||||
+ " - delete /index/type/id\n"
|
||||
+ " - index /ndex/type/id <json-formatted document>\n"
|
||||
+ " . the id can be omitted, elasticsearch will generate one";
|
||||
+ "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n"
|
||||
+ " - indices: list of indices separated by commas (depends on the command)\n"
|
||||
+ " - types: list of document types separated by commas (depends on the command)\n"
|
||||
+ "Commands:\n"
|
||||
+ " - search /indices/types <query>\n"
|
||||
+ " . indices and types can be omitted (at least, you have to provide '/')\n"
|
||||
+ " . a query is either a JSON-formatted query, nor a lucene query\n"
|
||||
+ " - size <value>\n"
|
||||
+ " . defines the size of the result set (default value is in the config)\n"
|
||||
+ " . if used, this command must be declared before a search command\n"
|
||||
+ " - count /indices/types <query>\n"
|
||||
+ " . same comments as for the search\n"
|
||||
+ " - get /index/type/id\n"
|
||||
+ " - delete /index/type/id\n"
|
||||
+ " - index /ndex/type/id <json-formatted document>\n"
|
||||
+ " . the id can be omitted, elasticsearch will generate one";
|
||||
|
||||
protected static final List<String> COMMANDS = Arrays.asList(
|
||||
"count", "delete", "get", "help", "index", "search");
|
||||
"count", "delete", "get", "help", "index", "search");
|
||||
|
||||
private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)");
|
||||
|
||||
|
||||
public static final String ELASTICSEARCH_HOST = "elasticsearch.host";
|
||||
public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
|
||||
public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
|
||||
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";
|
||||
public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size";
|
||||
public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username";
|
||||
public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password";
|
||||
|
||||
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
private Client client;
|
||||
private String host = "localhost";
|
||||
private int port = 9300;
|
||||
private String clusterName = "elasticsearch";
|
||||
private ElasticsearchClient elsClient;
|
||||
private int resultSize = 10;
|
||||
|
||||
public ElasticsearchInterpreter(Properties property) {
|
||||
super(property);
|
||||
this.host = getProperty(ELASTICSEARCH_HOST);
|
||||
this.port = Integer.parseInt(getProperty(ELASTICSEARCH_PORT));
|
||||
this.clusterName = getProperty(ELASTICSEARCH_CLUSTER_NAME);
|
||||
try {
|
||||
this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
|
||||
} catch (NumberFormatException e) {
|
||||
this.resultSize = 10;
|
||||
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
|
||||
property.get(ELASTICSEARCH_RESULT_SIZE), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
logger.info("Properties: {}", getProperty());
|
||||
|
||||
String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE);
|
||||
clientType = clientType == null ? null : clientType.toLowerCase();
|
||||
|
||||
try {
|
||||
logger.info("prop={}", getProperty());
|
||||
final Settings settings = Settings.settingsBuilder()
|
||||
.put("cluster.name", clusterName)
|
||||
.put(getProperty())
|
||||
.build();
|
||||
client = TransportClient.builder().settings(settings).build()
|
||||
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
|
||||
this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE));
|
||||
}
|
||||
catch (IOException e) {
|
||||
catch (final NumberFormatException e) {
|
||||
this.resultSize = 10;
|
||||
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
|
||||
property.get(ELASTICSEARCH_RESULT_SIZE), e);
|
||||
}
|
||||
|
||||
try {
|
||||
if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) {
|
||||
elsClient = new TransportBasedClient(getProperty());
|
||||
}
|
||||
else if ("http".equals(clientType)) {
|
||||
elsClient = new HttpBasedClient(getProperty());
|
||||
}
|
||||
else {
|
||||
logger.error("Unknown type of Elasticsearch client: " + clientType);
|
||||
}
|
||||
}
|
||||
catch (final IOException e) {
|
||||
logger.error("Open connection with Elasticsearch", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (client != null) {
|
||||
client.close();
|
||||
if (elsClient != null) {
|
||||
elsClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -159,7 +157,7 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|
||||
int currentResultSize = resultSize;
|
||||
|
||||
if (client == null) {
|
||||
if (elsClient == null) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Problem with the Elasticsearch client, please check your configuration (host, port,...)");
|
||||
}
|
||||
|
|
@ -178,7 +176,7 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|
||||
if (lines.length < 2) {
|
||||
return processHelp(InterpreterResult.Code.ERROR,
|
||||
"Size cmd must be followed by a search");
|
||||
"Size cmd must be followed by a search");
|
||||
}
|
||||
|
||||
final String[] sizeLine = StringUtils.split(lines[0], " ", 2);
|
||||
|
|
@ -219,7 +217,7 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|
||||
return processHelp(InterpreterResult.Code.ERROR, "Unknown command");
|
||||
}
|
||||
catch (Exception e) {
|
||||
catch (final Exception e) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Error : " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
@ -243,7 +241,7 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
public List<InterpreterCompletion> completion(String s, int i) {
|
||||
final List suggestions = new ArrayList<>();
|
||||
|
||||
for (String cmd : COMMANDS) {
|
||||
for (final String cmd : COMMANDS) {
|
||||
if (cmd.toLowerCase().contains(s)) {
|
||||
suggestions.add(new InterpreterCompletion(cmd, cmd));
|
||||
}
|
||||
|
|
@ -276,19 +274,18 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|| StringUtils.isEmpty(urlItems[1])
|
||||
|| StringUtils.isEmpty(urlItems[2])) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Bad URL (it should be /index/type/id)");
|
||||
"Bad URL (it should be /index/type/id)");
|
||||
}
|
||||
|
||||
final GetResponse response = client
|
||||
.prepareGet(urlItems[0], urlItems[1], urlItems[2])
|
||||
.get();
|
||||
if (response.isExists()) {
|
||||
final String json = gson.toJson(response.getSource());
|
||||
final ActionResponse response = elsClient.get(urlItems[0], urlItems[1], urlItems[2]);
|
||||
|
||||
if (response.isSucceeded()) {
|
||||
final String json = gson.toJson(response.getHit().getSourceAsString());
|
||||
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
json);
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
json);
|
||||
}
|
||||
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
|
||||
|
|
@ -305,15 +302,15 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|
||||
if (urlItems.length > 2) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
|
||||
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
|
||||
}
|
||||
|
||||
final SearchResponse response = searchData(urlItems, data, 0);
|
||||
final ActionResponse response = searchData(urlItems, data, 0);
|
||||
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
"" + response.getHits().getTotalHits());
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
"" + response.getTotalHits());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -328,10 +325,10 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|
||||
if (urlItems.length > 2) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
|
||||
"Bad URL (it should be /index1,index2,.../type1,type2,...)");
|
||||
}
|
||||
|
||||
final SearchResponse response = searchData(urlItems, data, size);
|
||||
final ActionResponse response = searchData(urlItems, data, size);
|
||||
|
||||
return buildResponseMessage(response);
|
||||
}
|
||||
|
|
@ -347,18 +344,16 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|
||||
if (urlItems.length < 2 || urlItems.length > 3) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Bad URL (it should be /index/type or /index/type/id)");
|
||||
"Bad URL (it should be /index/type or /index/type/id)");
|
||||
}
|
||||
|
||||
final IndexResponse response = client
|
||||
.prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2])
|
||||
.setSource(data)
|
||||
.get();
|
||||
final ActionResponse response = elsClient.index(
|
||||
urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2], data);
|
||||
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
response.getId());
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
response.getHit().getId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -374,54 +369,34 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
|| StringUtils.isEmpty(urlItems[1])
|
||||
|| StringUtils.isEmpty(urlItems[2])) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Bad URL (it should be /index/type/id)");
|
||||
"Bad URL (it should be /index/type/id)");
|
||||
}
|
||||
|
||||
final DeleteResponse response = client
|
||||
.prepareDelete(urlItems[0], urlItems[1], urlItems[2])
|
||||
.get();
|
||||
final ActionResponse response = elsClient.delete(urlItems[0], urlItems[1], urlItems[2]);
|
||||
|
||||
if (response.isFound()) {
|
||||
if (response.isSucceeded()) {
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
response.getId());
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TEXT,
|
||||
response.getHit().getId());
|
||||
}
|
||||
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found");
|
||||
}
|
||||
|
||||
private SearchResponse searchData(String[] urlItems, String query, int size) {
|
||||
private ActionResponse searchData(String[] urlItems, String query, int size) {
|
||||
|
||||
final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
|
||||
client, SearchAction.INSTANCE);
|
||||
reqBuilder.setIndices();
|
||||
String[] indices = null;
|
||||
String[] types = null;
|
||||
|
||||
if (urlItems.length >= 1) {
|
||||
reqBuilder.setIndices(StringUtils.split(urlItems[0], ","));
|
||||
indices = StringUtils.split(urlItems[0], ",");
|
||||
}
|
||||
if (urlItems.length > 1) {
|
||||
reqBuilder.setTypes(StringUtils.split(urlItems[1], ","));
|
||||
types = StringUtils.split(urlItems[1], ",");
|
||||
}
|
||||
|
||||
if (!StringUtils.isEmpty(query)) {
|
||||
// The query can be either JSON-formatted, nor a Lucene query
|
||||
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
|
||||
try {
|
||||
final Map source = gson.fromJson(query, Map.class);
|
||||
reqBuilder.setExtraSource(source);
|
||||
}
|
||||
catch (JsonParseException e) {
|
||||
// This is not a JSON (or maybe not well formatted...)
|
||||
reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
|
||||
}
|
||||
}
|
||||
|
||||
reqBuilder.setSize(size);
|
||||
|
||||
final SearchResponse response = reqBuilder.get();
|
||||
|
||||
return response;
|
||||
return elsClient.search(indices, types, query, size);
|
||||
}
|
||||
|
||||
private InterpreterResult buildAggResponseMessage(Aggregations aggregations) {
|
||||
|
|
@ -442,8 +417,8 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
final Set<String> headerKeys = new HashSet<>();
|
||||
final List<Map<String, Object>> buckets = new LinkedList<>();
|
||||
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
|
||||
|
||||
for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
|
||||
|
||||
for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
|
||||
try {
|
||||
final XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
bucket.toXContent(builder, null);
|
||||
|
|
@ -451,22 +426,22 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
headerKeys.addAll(bucketMap.keySet());
|
||||
buckets.add(bucketMap);
|
||||
}
|
||||
catch (IOException e) {
|
||||
catch (final IOException e) {
|
||||
logger.error("Processing bucket: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
final StringBuffer buffer = new StringBuffer();
|
||||
final String[] keys = headerKeys.toArray(new String[0]);
|
||||
for (String key: keys) {
|
||||
for (final String key: keys) {
|
||||
buffer.append("\t" + key);
|
||||
}
|
||||
buffer.deleteCharAt(0);
|
||||
|
||||
for (Map<String, Object> bucket : buckets) {
|
||||
|
||||
for (final Map<String, Object> bucket : buckets) {
|
||||
buffer.append("\n");
|
||||
|
||||
for (String key: keys) {
|
||||
|
||||
for (final String key: keys) {
|
||||
buffer.append(bucket.get(key)).append("\t");
|
||||
}
|
||||
buffer.deleteCharAt(buffer.length() - 1);
|
||||
|
|
@ -479,38 +454,64 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
|
||||
}
|
||||
|
||||
private String buildSearchHitsResponseMessage(SearchHit[] hits) {
|
||||
private InterpreterResult buildAggResponseMessage(List<AggWrapper> aggregations) {
|
||||
|
||||
if (hits == null || hits.length == 0) {
|
||||
final InterpreterResult.Type resType = InterpreterResult.Type.TABLE;
|
||||
String resMsg = "";
|
||||
|
||||
final Set<String> headerKeys = new HashSet<>();
|
||||
final List<Map<String, Object>> buckets = new LinkedList<>();
|
||||
|
||||
for (final AggWrapper aggregation: aggregations) {
|
||||
final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(aggregation.getResult());
|
||||
headerKeys.addAll(bucketMap.keySet());
|
||||
buckets.add(bucketMap);
|
||||
}
|
||||
|
||||
final StringBuffer buffer = new StringBuffer();
|
||||
final String[] keys = headerKeys.toArray(new String[0]);
|
||||
for (final String key: keys) {
|
||||
buffer.append("\t" + key);
|
||||
}
|
||||
buffer.deleteCharAt(0);
|
||||
|
||||
for (final Map<String, Object> bucket : buckets) {
|
||||
buffer.append("\n");
|
||||
|
||||
for (final String key: keys) {
|
||||
buffer.append(bucket.get(key)).append("\t");
|
||||
}
|
||||
buffer.deleteCharAt(buffer.length() - 1);
|
||||
}
|
||||
|
||||
resMsg = buffer.toString();
|
||||
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg);
|
||||
}
|
||||
|
||||
private String buildSearchHitsResponseMessage(ActionResponse response) {
|
||||
|
||||
if (response.getHits() == null || response.getHits().size() == 0) {
|
||||
return "";
|
||||
}
|
||||
|
||||
//First : get all the keys in order to build an ordered list of the values for each hit
|
||||
//
|
||||
final Map<String, Object> hitFields = new HashMap<>();
|
||||
final List<Map<String, Object>> flattenHits = new LinkedList<>();
|
||||
final Set<String> keys = new TreeSet<>();
|
||||
for (SearchHit hit : hits) {
|
||||
// Fields can be found either in _source, or in fields (it depends on the query)
|
||||
//
|
||||
String json = hit.getSourceAsString();
|
||||
if (json == null) {
|
||||
hitFields.clear();
|
||||
for (SearchHitField hitField : hit.getFields().values()) {
|
||||
hitFields.put(hitField.getName(), hitField.getValues());
|
||||
}
|
||||
json = gson.toJson(hitFields);
|
||||
}
|
||||
for (final HitWrapper hit : response.getHits()) {
|
||||
|
||||
final String json = hit.getSourceAsString();
|
||||
|
||||
final Map<String, Object> flattenJsonMap = JsonFlattener.flattenAsMap(json);
|
||||
final Map<String, Object> flattenMap = new HashMap<>();
|
||||
for (Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
|
||||
for (final Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) {
|
||||
// Replace keys that match a format like that : [\"keyname\"][0]
|
||||
final String fieldName = iter.next();
|
||||
final Matcher fieldNameMatcher = FIELD_NAME_PATTERN.matcher(fieldName);
|
||||
if (fieldNameMatcher.matches()) {
|
||||
flattenMap.put(fieldNameMatcher.group(1) + fieldNameMatcher.group(2),
|
||||
flattenJsonMap.get(fieldName));
|
||||
flattenJsonMap.get(fieldName));
|
||||
}
|
||||
else {
|
||||
flattenMap.put(fieldName, flattenJsonMap.get(fieldName));
|
||||
|
|
@ -518,7 +519,7 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
}
|
||||
flattenHits.add(flattenMap);
|
||||
|
||||
for (String key : flattenMap.keySet()) {
|
||||
for (final String key : flattenMap.keySet()) {
|
||||
keys.add(key);
|
||||
}
|
||||
}
|
||||
|
|
@ -526,15 +527,15 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
// Next : build the header of the table
|
||||
//
|
||||
final StringBuffer buffer = new StringBuffer();
|
||||
for (String key : keys) {
|
||||
for (final String key : keys) {
|
||||
buffer.append(key).append('\t');
|
||||
}
|
||||
buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n");
|
||||
|
||||
// Finally : build the result by using the key set
|
||||
//
|
||||
for (Map<String, Object> hit : flattenHits) {
|
||||
for (String key : keys) {
|
||||
for (final Map<String, Object> hit : flattenHits) {
|
||||
for (final String key : keys) {
|
||||
final Object val = hit.get(key);
|
||||
if (val != null) {
|
||||
buffer.append(val);
|
||||
|
|
@ -547,17 +548,17 @@ public class ElasticsearchInterpreter extends Interpreter {
|
|||
return buffer.toString();
|
||||
}
|
||||
|
||||
private InterpreterResult buildResponseMessage(SearchResponse response) {
|
||||
private InterpreterResult buildResponseMessage(ActionResponse response) {
|
||||
|
||||
final Aggregations aggregations = response.getAggregations();
|
||||
final List<AggWrapper> aggregations = response.getAggregations();
|
||||
|
||||
if (aggregations != null && aggregations.asList().size() > 0) {
|
||||
if (aggregations != null && aggregations.size() > 0) {
|
||||
return buildAggResponseMessage(aggregations);
|
||||
}
|
||||
|
||||
return new InterpreterResult(
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TABLE,
|
||||
buildSearchHitsResponseMessage(response.getHits().getHits()));
|
||||
InterpreterResult.Code.SUCCESS,
|
||||
InterpreterResult.Type.TABLE,
|
||||
buildSearchHitsResponseMessage(response));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.action;
|
||||
|
||||
/**
|
||||
* Runtime exception thrown when there is a problem during an action (search, get, ...).
|
||||
*/
|
||||
public class ActionException extends RuntimeException {
|
||||
|
||||
public ActionException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ActionException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.action;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Contains the result of an action (hits, aggregations, ...).
|
||||
*/
|
||||
public class ActionResponse {
|
||||
|
||||
private boolean succeeded;
|
||||
private long totalHits;
|
||||
private final List<HitWrapper> hits = new LinkedList<>();
|
||||
private final List<AggWrapper> aggregations = new LinkedList<>();
|
||||
|
||||
|
||||
// public ActionResponse source(String source) {
|
||||
// this.source = source;
|
||||
// return this;
|
||||
// }
|
||||
|
||||
public ActionResponse succeeded(boolean succeeded) {
|
||||
this.succeeded = succeeded;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isSucceeded() {
|
||||
return succeeded;
|
||||
}
|
||||
|
||||
// public String getSource() {
|
||||
// return source;
|
||||
// }
|
||||
|
||||
// public String getId() {
|
||||
// return id;
|
||||
// }
|
||||
|
||||
// public ActionResponse id(String id) {
|
||||
// this.id = id;
|
||||
// return this;
|
||||
// }
|
||||
|
||||
public ActionResponse totalHits(long totalHits) {
|
||||
this.totalHits = totalHits;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getTotalHits() {
|
||||
return totalHits;
|
||||
}
|
||||
|
||||
public List<HitWrapper> getHits() {
|
||||
return hits;
|
||||
}
|
||||
|
||||
public ActionResponse addHit(HitWrapper hit) {
|
||||
this.hits.add(hit);
|
||||
return this;
|
||||
}
|
||||
|
||||
public List<AggWrapper> getAggregations() {
|
||||
return aggregations;
|
||||
}
|
||||
|
||||
public ActionResponse addAggregation(AggWrapper aggregation) {
|
||||
this.aggregations.add(aggregation);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ActionResponse hit(HitWrapper hit) {
|
||||
this.addHit(hit);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HitWrapper getHit() {
|
||||
return this.hits.get(0);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.action;
|
||||
|
||||
/**
|
||||
* Contains the result of an aggregation.
|
||||
*/
|
||||
public class AggWrapper {
|
||||
|
||||
/** Type of an aggregation (to know if there are buckets or not) */
|
||||
public enum AggregationType { SIMPLE, MULTI_BUCKETS };
|
||||
|
||||
private final AggregationType type;
|
||||
private final String result;
|
||||
|
||||
public AggWrapper(AggregationType type, String result) {
|
||||
this.type = type;
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public AggregationType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getResult() {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.action;
|
||||
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
|
||||
/**
|
||||
* Contains the data of a hit.
|
||||
*/
|
||||
public class HitWrapper {
|
||||
|
||||
private final JsonParser parser = new JsonParser();
|
||||
|
||||
private final String index;
|
||||
private final String type;
|
||||
private final String id;
|
||||
private final String source;
|
||||
|
||||
public HitWrapper(String index, String type, String id, String source) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
public HitWrapper(String source) {
|
||||
this(null, null, null, source);
|
||||
}
|
||||
|
||||
public String getSourceAsString() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public JsonObject getSourceAsJsonObject() {
|
||||
final JsonElement element = parser.parse(source);
|
||||
return element.getAsJsonObject();
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.client;
|
||||
|
||||
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
|
||||
|
||||
/**
|
||||
* Interface that must be implemented by any kind of Elasticsearch client (transport, ...).
|
||||
*/
|
||||
public interface ElasticsearchClient {
|
||||
|
||||
ActionResponse get(String index, String type, String id);
|
||||
|
||||
ActionResponse index(String index, String type, String id, String data);
|
||||
|
||||
ActionResponse delete(String index, String type, String id);
|
||||
|
||||
ActionResponse search(String[] indices, String[] types, String query, int size);
|
||||
|
||||
void close();
|
||||
}
|
||||
|
|
@ -0,0 +1,310 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.client;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
|
||||
import org.apache.zeppelin.elasticsearch.action.ActionException;
|
||||
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
|
||||
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
|
||||
import org.apache.zeppelin.elasticsearch.action.AggWrapper.AggregationType;
|
||||
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
|
||||
import org.json.JSONArray;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.JsonParseException;
|
||||
import com.mashape.unirest.http.HttpResponse;
|
||||
import com.mashape.unirest.http.JsonNode;
|
||||
import com.mashape.unirest.http.Unirest;
|
||||
import com.mashape.unirest.http.exceptions.UnirestException;
|
||||
import com.mashape.unirest.request.HttpRequest;
|
||||
import com.mashape.unirest.request.HttpRequestWithBody;
|
||||
|
||||
/**
|
||||
* Elasticsearch client using the HTTP API.
|
||||
*/
|
||||
public class HttpBasedClient implements ElasticsearchClient {
|
||||
|
||||
private static final String QUERY_STRING_TEMPLATE =
|
||||
"{ \"query\": { \"query_string\": { \"query\": \"_Q_\", \"analyze_wildcard\": \"true\" } } }";
|
||||
|
||||
private final String host;
|
||||
private final int port;
|
||||
private final String username;
|
||||
private final String password;
|
||||
|
||||
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
// private HttpHost elasticHost;
|
||||
// private HttpClientContext context;
|
||||
// private CredentialsProvider credsProvider;
|
||||
|
||||
public HttpBasedClient(Properties props) {
|
||||
this.host = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
|
||||
this.port = Integer.parseInt(props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
|
||||
this.username = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_USERNAME);
|
||||
this.password = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_PASSWORD);
|
||||
}
|
||||
|
||||
private boolean isSucceeded(HttpResponse response) {
|
||||
return response.getStatus() >= 200 && response.getStatus() < 300;
|
||||
}
|
||||
|
||||
private JSONObject getParentField(JSONObject parent, String[] fields) {
|
||||
JSONObject obj = parent;
|
||||
for (int i = 0; i < fields.length - 1; i++) {
|
||||
obj = obj.getJSONObject(fields[i]);
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
|
||||
private JSONArray getFieldAsArray(JSONObject obj, String field) {
|
||||
final String[] fields = field.split("/");
|
||||
final JSONObject parent = getParentField(obj, fields);
|
||||
return parent.getJSONArray(fields[fields.length - 1]);
|
||||
}
|
||||
|
||||
private String getFieldAsString(HttpResponse<JsonNode> response, String field) {
|
||||
return response.getBody().getObject().get(field).toString();
|
||||
}
|
||||
|
||||
private long getFieldAsLong(HttpResponse<JsonNode> response, String field) {
|
||||
final String[] fields = field.split("/");
|
||||
final JSONObject obj = getParentField(response.getBody().getObject(), fields);
|
||||
return obj.getLong(fields[fields.length - 1]);
|
||||
}
|
||||
|
||||
private String getUrl(String index, String type, String id) {
|
||||
final StringBuilder buffer = new StringBuilder();
|
||||
buffer.append("http://").append(host).append(":").append(port).append("/");
|
||||
if (StringUtils.isNotEmpty(index)) {
|
||||
buffer.append(index);
|
||||
|
||||
if (StringUtils.isNotEmpty(type)) {
|
||||
buffer.append("/").append(type);
|
||||
|
||||
if (StringUtils.isNotEmpty(id)) {
|
||||
buffer.append("/").append(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
return buffer.toString();
|
||||
}
|
||||
|
||||
private String getUrl(String[] indices, String[] types) {
|
||||
final String inds = indices == null ? null : Joiner.on(",").join(indices);
|
||||
final String typs = types == null ? null : Joiner.on(",").join(types);
|
||||
return getUrl(inds, typs, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse get(String index, String type, String id) {
|
||||
ActionResponse response = null;
|
||||
try {
|
||||
final HttpRequest request = Unirest.get(getUrl(index, type, id));
|
||||
if (StringUtils.isNotEmpty(username)) {
|
||||
request.basicAuth(username, password);
|
||||
}
|
||||
|
||||
final HttpResponse<JsonNode> result = request.asJson();
|
||||
|
||||
response = new ActionResponse()
|
||||
.succeeded(isSucceeded(result))
|
||||
.hit(new HitWrapper(
|
||||
getFieldAsString(result, "_index"),
|
||||
getFieldAsString(result, "_type"),
|
||||
getFieldAsString(result, "_id"),
|
||||
getFieldAsString(result, "_source")));
|
||||
}
|
||||
catch (final UnirestException e) {
|
||||
throw new ActionException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse delete(String index, String type, String id) {
|
||||
ActionResponse response = null;
|
||||
try {
|
||||
final HttpRequest request = Unirest.delete(getUrl(index, type, id));
|
||||
if (StringUtils.isNotEmpty(username)) {
|
||||
request.basicAuth(username, password);
|
||||
}
|
||||
|
||||
final HttpResponse<JsonNode> result = request.asJson();
|
||||
|
||||
response = new ActionResponse()
|
||||
.hit(new HitWrapper(
|
||||
getFieldAsString(result, "_index"),
|
||||
getFieldAsString(result, "_type"),
|
||||
getFieldAsString(result, "_id"),
|
||||
null))
|
||||
.succeeded(isSucceeded(result));
|
||||
}
|
||||
catch (final UnirestException e) {
|
||||
throw new ActionException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse index(String index, String type, String id, String data) {
|
||||
ActionResponse response = null;
|
||||
try {
|
||||
HttpRequestWithBody request = null;
|
||||
if (StringUtils.isEmpty(id)) {
|
||||
request = Unirest.post(getUrl(index, type, id));
|
||||
}
|
||||
else {
|
||||
request = Unirest.put(getUrl(index, type, id));
|
||||
}
|
||||
request
|
||||
.header("Accept", "application/json")
|
||||
.header("Content-Type", "application/json")
|
||||
.body(data).getHttpRequest();
|
||||
if (StringUtils.isNotEmpty(username)) {
|
||||
request.basicAuth(username, password);
|
||||
}
|
||||
|
||||
final HttpResponse<JsonNode> result = request.asJson();
|
||||
|
||||
response = new ActionResponse()
|
||||
.hit(new HitWrapper(
|
||||
getFieldAsString(result, "_index"),
|
||||
getFieldAsString(result, "_type"),
|
||||
getFieldAsString(result, "_id"),
|
||||
null))
|
||||
.succeeded(isSucceeded(result));
|
||||
}
|
||||
catch (final UnirestException e) {
|
||||
throw new ActionException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse search(String[] indices, String[] types, String query, int size) {
|
||||
ActionResponse response = null;
|
||||
|
||||
if (!StringUtils.isEmpty(query)) {
|
||||
// The query can be either JSON-formatted, nor a Lucene query
|
||||
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
|
||||
try {
|
||||
gson.fromJson(query, Map.class);
|
||||
}
|
||||
catch (final JsonParseException e) {
|
||||
// This is not a JSON (or maybe not well formatted...)
|
||||
query = QUERY_STRING_TEMPLATE.replace("_Q_", query);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
final HttpRequestWithBody request = Unirest
|
||||
.post(getUrl(indices, types) + "/_search?size=" + size)
|
||||
.header("Content-Type", "application/json");
|
||||
|
||||
if (StringUtils.isNoneEmpty(query)) {
|
||||
request.header("Accept", "application/json").body(query);
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(username)) {
|
||||
request.basicAuth(username, password);
|
||||
}
|
||||
|
||||
final HttpResponse<JsonNode> result = request.asJson();
|
||||
final JSONObject body = result.getBody() != null ? result.getBody().getObject() : null;
|
||||
|
||||
if (isSucceeded(result)) {
|
||||
final long total = getFieldAsLong(result, "hits/total");
|
||||
|
||||
response = new ActionResponse()
|
||||
.succeeded(true)
|
||||
.totalHits(total);
|
||||
|
||||
if (containsAggs(result)) {
|
||||
JSONObject aggregationsMap = body.getJSONObject("aggregations");
|
||||
if (aggregationsMap == null) {
|
||||
aggregationsMap = body.getJSONObject("aggs");
|
||||
}
|
||||
|
||||
for (final String key: aggregationsMap.keySet()) {
|
||||
final JSONObject aggResult = aggregationsMap.getJSONObject(key);
|
||||
if (aggResult.has("buckets")) {
|
||||
// Multi-bucket aggregations
|
||||
final Iterator<Object> buckets = aggResult.getJSONArray("buckets").iterator();
|
||||
while (buckets.hasNext()) {
|
||||
response.addAggregation(
|
||||
new AggWrapper(AggregationType.MULTI_BUCKETS, buckets.next().toString()));
|
||||
}
|
||||
}
|
||||
else {
|
||||
response.addAggregation(
|
||||
new AggWrapper(AggregationType.SIMPLE, aggregationsMap.toString()));
|
||||
}
|
||||
break; // Keep only one aggregation
|
||||
}
|
||||
}
|
||||
else if (size > 0 && total > 0) {
|
||||
final JSONArray hits = getFieldAsArray(body, "hits/hits");
|
||||
final Iterator<Object> iter = hits.iterator();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
final JSONObject hit = (JSONObject) iter.next();
|
||||
final Object data =
|
||||
hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields");
|
||||
response.addHit(new HitWrapper(
|
||||
hit.getString("_index"),
|
||||
hit.getString("_type"),
|
||||
hit.getString("_id"),
|
||||
data.toString()));
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw new ActionException(body.get("error").toString());
|
||||
}
|
||||
}
|
||||
catch (final UnirestException e) {
|
||||
throw new ActionException(e);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private boolean containsAggs(HttpResponse<JsonNode> result) {
|
||||
return result.getBody() != null &&
|
||||
(result.getBody().getObject().has("aggregations") ||
|
||||
result.getBody().getObject().has("aggs"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HttpBasedClient [host=" + host + ", port=" + port + ", username=" + username + "]";
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,235 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.elasticsearch.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
|
||||
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
|
||||
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
|
||||
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHitField;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.JsonParseException;
|
||||
|
||||
/**
|
||||
* Elasticsearch client using the transport protocol.
|
||||
*/
|
||||
public class TransportBasedClient implements ElasticsearchClient {
|
||||
|
||||
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
private final Client client;
|
||||
|
||||
public TransportBasedClient(Properties props) throws UnknownHostException {
|
||||
final String host =
|
||||
props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST);
|
||||
final int port = Integer.parseInt(
|
||||
props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT));
|
||||
final String clusterName =
|
||||
props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME);
|
||||
|
||||
final Settings settings = Settings.settingsBuilder()
|
||||
.put("cluster.name", clusterName)
|
||||
.put(props)
|
||||
.build();
|
||||
|
||||
client = TransportClient.builder().settings(settings).build()
|
||||
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse get(String index, String type, String id) {
|
||||
final GetResponse getResp = client
|
||||
.prepareGet(index, type, id)
|
||||
.get();
|
||||
|
||||
return new ActionResponse()
|
||||
.succeeded(getResp.isExists())
|
||||
.hit(new HitWrapper(
|
||||
getResp.getIndex(),
|
||||
getResp.getType(),
|
||||
getResp.getId(),
|
||||
getResp.getSourceAsString()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse delete(String index, String type, String id) {
|
||||
final DeleteResponse delResp = client
|
||||
.prepareDelete(index, type, id)
|
||||
.get();
|
||||
|
||||
return new ActionResponse()
|
||||
.succeeded(delResp.isFound())
|
||||
.hit(new HitWrapper(
|
||||
delResp.getIndex(),
|
||||
delResp.getType(),
|
||||
delResp.getId(),
|
||||
null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse index(String index, String type, String id, String data) {
|
||||
final IndexResponse idxResp = client
|
||||
.prepareIndex(index, type, id)
|
||||
.setSource(data)
|
||||
.get();
|
||||
|
||||
return new ActionResponse()
|
||||
.succeeded(idxResp.isCreated())
|
||||
.hit(new HitWrapper(
|
||||
idxResp.getIndex(),
|
||||
idxResp.getType(),
|
||||
idxResp.getId(),
|
||||
null));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionResponse search(String[] indices, String[] types, String query, int size) {
|
||||
final SearchRequestBuilder reqBuilder = new SearchRequestBuilder(
|
||||
client, SearchAction.INSTANCE);
|
||||
reqBuilder.setIndices();
|
||||
|
||||
if (indices != null) {
|
||||
reqBuilder.setIndices(indices);
|
||||
}
|
||||
if (types != null) {
|
||||
reqBuilder.setTypes(types);
|
||||
}
|
||||
|
||||
if (!StringUtils.isEmpty(query)) {
|
||||
// The query can be either JSON-formatted, nor a Lucene query
|
||||
// So, try to parse as a JSON => if there is an error, consider the query a Lucene one
|
||||
try {
|
||||
@SuppressWarnings("rawtypes")
|
||||
final Map source = gson.fromJson(query, Map.class);
|
||||
reqBuilder.setExtraSource(source);
|
||||
}
|
||||
catch (final JsonParseException e) {
|
||||
// This is not a JSON (or maybe not well formatted...)
|
||||
reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
|
||||
}
|
||||
}
|
||||
|
||||
reqBuilder.setSize(size);
|
||||
|
||||
final SearchResponse searchResp = reqBuilder.get();
|
||||
|
||||
final ActionResponse actionResp = new ActionResponse()
|
||||
.succeeded(true)
|
||||
.totalHits(searchResp.getHits().getTotalHits());
|
||||
|
||||
if (searchResp.getAggregations() != null) {
|
||||
setAggregations(searchResp.getAggregations(), actionResp);
|
||||
}
|
||||
else {
|
||||
for (final SearchHit hit: searchResp.getHits()) {
|
||||
// Fields can be found either in _source, or in fields (it depends on the query)
|
||||
// => specific for elasticsearch's version < 5
|
||||
//
|
||||
String src = hit.getSourceAsString();
|
||||
if (src == null) {
|
||||
final Map<String, Object> hitFields = new HashMap<>();
|
||||
for (final SearchHitField hitField : hit.getFields().values()) {
|
||||
hitFields.put(hitField.getName(), hitField.getValues());
|
||||
}
|
||||
src = gson.toJson(hitFields);
|
||||
}
|
||||
actionResp.addHit(new HitWrapper(hit.getIndex(), hit.getType(), hit.getId(), src));
|
||||
}
|
||||
}
|
||||
|
||||
return actionResp;
|
||||
}
|
||||
|
||||
private void setAggregations(Aggregations aggregations, ActionResponse actionResp) {
|
||||
// Only the result of the first aggregation is returned
|
||||
//
|
||||
final Aggregation agg = aggregations.asList().get(0);
|
||||
|
||||
if (agg instanceof InternalMetricsAggregation) {
|
||||
actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
|
||||
XContentHelper.toString((InternalMetricsAggregation) agg).toString()));
|
||||
}
|
||||
else if (agg instanceof InternalSingleBucketAggregation) {
|
||||
actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE,
|
||||
XContentHelper.toString((InternalSingleBucketAggregation) agg).toString()));
|
||||
}
|
||||
else if (agg instanceof InternalMultiBucketAggregation) {
|
||||
final Set<String> headerKeys = new HashSet<>();
|
||||
final List<Map<String, Object>> buckets = new LinkedList<>();
|
||||
final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg;
|
||||
|
||||
for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) {
|
||||
try {
|
||||
final XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
bucket.toXContent(builder, null);
|
||||
actionResp.addAggregation(
|
||||
new AggWrapper(AggWrapper.AggregationType.MULTI_BUCKETS, builder.string()));
|
||||
}
|
||||
catch (final IOException e) {
|
||||
// Ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TransportBasedClient []";
|
||||
}
|
||||
}
|
||||
|
|
@ -16,6 +16,12 @@
|
|||
"defaultValue": "9300",
|
||||
"description": "The port for Elasticsearch"
|
||||
},
|
||||
"elasticsearch.client.type": {
|
||||
"envName": "ELASTICSEARCH_CLIENT_TYPE",
|
||||
"propertyName": "elasticsearch.client.type",
|
||||
"defaultValue": "transport",
|
||||
"description": "The type of client for Elasticsearch (transport or http)"
|
||||
},
|
||||
"elasticsearch.cluster.name": {
|
||||
"envName": "ELASTICSEARCH_CLUSTER_NAME",
|
||||
"propertyName": "elasticsearch.cluster.name",
|
||||
|
|
@ -27,6 +33,18 @@
|
|||
"propertyName": "elasticsearch.result.size",
|
||||
"defaultValue": "10",
|
||||
"description": "The size of the result set of a search query"
|
||||
},
|
||||
"elasticsearch.basicauth.username": {
|
||||
"envName": "ELASTICSEARCH_BASIC_AUTH_USERNAME",
|
||||
"propertyName": "elasticsearch.basicauth.username",
|
||||
"defaultValue": "",
|
||||
"description": "Username for a basic authentication"
|
||||
},
|
||||
"elasticsearch.basicauth.password": {
|
||||
"envName": "ELASTICSEARCH_BASIC_AUTH_PASSWORD",
|
||||
"propertyName": "elasticsearch.basicauth.password",
|
||||
"defaultValue": "",
|
||||
"description": "Password for a basic authentication"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import java.util.Date;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.lang.math.RandomUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
|
@ -40,13 +41,19 @@ import org.elasticsearch.node.NodeBuilder;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.theories.DataPoint;
|
||||
import org.junit.experimental.theories.Theories;
|
||||
import org.junit.experimental.theories.Theory;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
@RunWith(Theories.class)
|
||||
public class ElasticsearchInterpreterTest {
|
||||
|
||||
@DataPoint public static ElasticsearchInterpreter transportInterpreter;
|
||||
@DataPoint public static ElasticsearchInterpreter httpInterpreter;
|
||||
|
||||
private static Client elsClient;
|
||||
private static Node elsNode;
|
||||
private static ElasticsearchInterpreter interpreter;
|
||||
|
||||
private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" };
|
||||
private static final int[] STATUS = { 200, 404, 500, 403 };
|
||||
|
|
@ -57,6 +64,8 @@ public class ElasticsearchInterpreterTest {
|
|||
private static final String ELS_HTTP_PORT = "10200";
|
||||
private static final String ELS_PATH = "/tmp/els";
|
||||
|
||||
private static final AtomicInteger deleteId = new AtomicInteger(4);
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void populate() throws IOException {
|
||||
|
|
@ -99,16 +108,27 @@ public class ElasticsearchInterpreterTest {
|
|||
|
||||
final Properties props = new Properties();
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST);
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME);
|
||||
interpreter = new ElasticsearchInterpreter(props);
|
||||
interpreter.open();
|
||||
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT);
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "transport");
|
||||
transportInterpreter = new ElasticsearchInterpreter(props);
|
||||
transportInterpreter.open();
|
||||
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_HTTP_PORT);
|
||||
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "http");
|
||||
httpInterpreter = new ElasticsearchInterpreter(props);
|
||||
httpInterpreter.open();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void clean() {
|
||||
if (interpreter != null) {
|
||||
interpreter.close();
|
||||
if (transportInterpreter != null) {
|
||||
transportInterpreter.close();
|
||||
}
|
||||
|
||||
if (httpInterpreter != null) {
|
||||
httpInterpreter.close();
|
||||
}
|
||||
|
||||
if (elsClient != null) {
|
||||
|
|
@ -121,28 +141,35 @@ public class ElasticsearchInterpreterTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCount() {
|
||||
@Theory
|
||||
public void testCount(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
InterpreterResult res = interpreter.interpret("count /unknown", null);
|
||||
assertEquals(Code.ERROR, res.code());
|
||||
|
||||
res = interpreter.interpret("count /logs", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
assertEquals("50", res.message().get(0).getData());
|
||||
|
||||
res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() {
|
||||
@Theory
|
||||
public void testGet(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null);
|
||||
assertEquals(Code.ERROR, res.code());
|
||||
|
||||
res = interpreter.interpret("get /logs/http/10", null);
|
||||
res = interpreter.interpret("get /logs/http/4", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
|
||||
res = interpreter.interpret("get /logs/_all/4", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSearch() {
|
||||
@Theory
|
||||
public void testSearch(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
|
|
@ -160,8 +187,8 @@ public class ElasticsearchInterpreterTest {
|
|||
assertEquals(Code.SUCCESS, res.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAgg() {
|
||||
@Theory
|
||||
public void testAgg(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
// Single-value metric
|
||||
InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " +
|
||||
|
|
@ -183,35 +210,40 @@ public class ElasticsearchInterpreterTest {
|
|||
res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
|
||||
" { \"terms\" : { \"field\" : \"status\" } } } }", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
|
||||
|
||||
res = interpreter.interpret("search /logs { \"aggs\" : { " +
|
||||
" \"length\" : { \"terms\": { \"field\": \"status\" }, " +
|
||||
" \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndex() {
|
||||
@Theory
|
||||
public void testIndex(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null);
|
||||
assertEquals(Code.ERROR, res.code());
|
||||
|
||||
res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
|
||||
res = interpreter.interpret("index /logs/http/1000 { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() {
|
||||
@Theory
|
||||
public void testDelete(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null);
|
||||
assertEquals(Code.ERROR, res.code());
|
||||
|
||||
res = interpreter.interpret("delete /logs/http/11", null);
|
||||
assertEquals("11", res.message().get(0).getData());
|
||||
final int testDeleteId = deleteId.decrementAndGet();
|
||||
res = interpreter.interpret("delete /logs/http/" + testDeleteId, null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
assertEquals("" + testDeleteId, res.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMisc() {
|
||||
@Theory
|
||||
public void testMisc(ElasticsearchInterpreter interpreter) {
|
||||
|
||||
InterpreterResult res = interpreter.interpret(null, null);
|
||||
assertEquals(Code.SUCCESS, res.code());
|
||||
|
|
@ -220,23 +252,23 @@ public class ElasticsearchInterpreterTest {
|
|||
assertEquals(Code.SUCCESS, res.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompletion() {
|
||||
List expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
|
||||
List expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
|
||||
@Theory
|
||||
public void testCompletion(ElasticsearchInterpreter interpreter) {
|
||||
final List<InterpreterCompletion> expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count"));
|
||||
final List<InterpreterCompletion> expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help"));
|
||||
|
||||
List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
|
||||
List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
|
||||
List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
|
||||
final List<InterpreterCompletion> resultOne = interpreter.completion("co", 0);
|
||||
final List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0);
|
||||
final List<InterpreterCompletion> resultAll = interpreter.completion("", 0);
|
||||
|
||||
Assert.assertEquals(expectedResultOne, resultOne);
|
||||
Assert.assertEquals(expectedResultTwo, resultTwo);
|
||||
|
||||
List allCompletionList = new ArrayList<>();
|
||||
for (InterpreterCompletion ic : resultAll) {
|
||||
final List<String> allCompletionList = new ArrayList<>();
|
||||
for (final InterpreterCompletion ic : resultAll) {
|
||||
allCompletionList.add(ic.getName());
|
||||
}
|
||||
Assert.assertEquals(interpreter.COMMANDS, allCompletionList);
|
||||
Assert.assertEquals(ElasticsearchInterpreter.COMMANDS, allCompletionList);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue