Fix issue with id containing special chars (/, #)

This commit is contained in:
Bruno Bonnin 2017-01-19 15:57:44 +01:00
parent 4e9812e0f1
commit 6bcf36913e
4 changed files with 233 additions and 83 deletions

View file

@ -32,7 +32,7 @@ import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
import org.apache.zeppelin.elasticsearch.action.HitWrapper;
@ -56,8 +56,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.wnameless.json.flattener.JsonFlattener;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
/**
@ -99,7 +97,6 @@ public class ElasticsearchInterpreter extends Interpreter {
public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username";
public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password";
private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
private ElasticsearchClient elsClient;
private int resultSize = 10;
@ -200,13 +197,13 @@ public class ElasticsearchInterpreter extends Interpreter {
try {
if ("get".equalsIgnoreCase(method)) {
return processGet(urlItems);
return processGet(urlItems, interpreterContext);
}
else if ("count".equalsIgnoreCase(method)) {
return processCount(urlItems, data);
return processCount(urlItems, data, interpreterContext);
}
else if ("search".equalsIgnoreCase(method)) {
return processSearch(urlItems, data, currentResultSize);
return processSearch(urlItems, data, currentResultSize, interpreterContext);
}
else if ("index".equalsIgnoreCase(method)) {
return processIndex(urlItems, data);
@ -249,6 +246,31 @@ public class ElasticsearchInterpreter extends Interpreter {
return suggestions;
}
private void addAngularObject(InterpreterContext interpreterContext, String prefix, Object obj) {
interpreterContext.getAngularObjectRegistry().add(
prefix + "_" + interpreterContext.getParagraphId().replace("-", "_"),
obj, null, null);
}
private String[] getIndexTypeId(String[] urlItems) {
if (urlItems.length < 3) {
return null;
}
final String index = urlItems[0];
final String type = urlItems[1];
final String id = String.join("/", Arrays.copyOfRange(urlItems, 2, urlItems.length));
if (StringUtils.isEmpty(index)
|| StringUtils.isEmpty(type)
|| StringUtils.isEmpty(id)) {
return null;
}
return new String[] { index, type, id };
}
private InterpreterResult processHelp(InterpreterResult.Code code, String additionalMessage) {
final StringBuffer buffer = new StringBuffer();
@ -265,22 +287,24 @@ public class ElasticsearchInterpreter extends Interpreter {
* Processes a "get" request.
*
* @param urlItems Items of the URL
* @param interpreterContext Instance of the context
* @return Result of the get request, it contains a JSON-formatted string
*/
private InterpreterResult processGet(String[] urlItems) {
private InterpreterResult processGet(String[] urlItems, InterpreterContext interpreterContext) {
if (urlItems.length != 3
|| StringUtils.isEmpty(urlItems[0])
|| StringUtils.isEmpty(urlItems[1])
|| StringUtils.isEmpty(urlItems[2])) {
final String[] indexTypeId = getIndexTypeId(urlItems);
if (indexTypeId == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type/id)");
}
final ActionResponse response = elsClient.get(urlItems[0], urlItems[1], urlItems[2]);
final ActionResponse response = elsClient.get(indexTypeId[0], indexTypeId[1], indexTypeId[2]);
if (response.isSucceeded()) {
final String json = gson.toJson(response.getHit().getSourceAsString());
final String json = response.getHit().getSourceAsString();
addAngularObject(interpreterContext, "get", json);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
@ -296,9 +320,11 @@ public class ElasticsearchInterpreter extends Interpreter {
*
* @param urlItems Items of the URL
* @param data May contains the JSON of the request
* @param interpreterContext Instance of the context
* @return Result of the count request, it contains the total hits
*/
private InterpreterResult processCount(String[] urlItems, String data) {
private InterpreterResult processCount(String[] urlItems, String data,
InterpreterContext interpreterContext) {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -307,6 +333,8 @@ public class ElasticsearchInterpreter extends Interpreter {
final ActionResponse response = searchData(urlItems, data, 0);
addAngularObject(interpreterContext, "count", response.getTotalHits());
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
@ -319,9 +347,11 @@ public class ElasticsearchInterpreter extends Interpreter {
* @param urlItems Items of the URL
* @param data May contains the JSON of the request
* @param size Limit of result set
* @param interpreterContext Instance of the context
* @return Result of the search request, it contains a tab-formatted string of the matching hits
*/
private InterpreterResult processSearch(String[] urlItems, String data, int size) {
private InterpreterResult processSearch(String[] urlItems, String data, int size,
InterpreterContext interpreterContext) {
if (urlItems.length > 2) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -330,6 +360,10 @@ public class ElasticsearchInterpreter extends Interpreter {
final ActionResponse response = searchData(urlItems, data, size);
addAngularObject(interpreterContext, "search",
(response.getAggregations() != null && response.getAggregations().size() > 0) ?
response.getAggregations() : response.getHits());
return buildResponseMessage(response);
}
@ -364,15 +398,15 @@ public class ElasticsearchInterpreter extends Interpreter {
*/
private InterpreterResult processDelete(String[] urlItems) {
if (urlItems.length != 3
|| StringUtils.isEmpty(urlItems[0])
|| StringUtils.isEmpty(urlItems[1])
|| StringUtils.isEmpty(urlItems[2])) {
final String[] indexTypeId = getIndexTypeId(urlItems);
if (indexTypeId == null) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Bad URL (it should be /index/type/id)");
}
final ActionResponse response = elsClient.delete(urlItems[0], urlItems[1], urlItems[2]);
final ActionResponse response =
elsClient.delete(indexTypeId[0], indexTypeId[1], indexTypeId[2]);
if (response.isSucceeded()) {
return new InterpreterResult(

View file

@ -17,6 +17,8 @@
package org.apache.zeppelin.elasticsearch.client;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@ -83,7 +85,11 @@ public class HttpBasedClient implements ElasticsearchClient {
}
private String getFieldAsString(HttpResponse<JsonNode> response, String field) {
return response.getBody().getObject().get(field).toString();
return getFieldAsString(response.getBody(), field);
}
private String getFieldAsString(JsonNode json, String field) {
return json.getObject().get(field).toString();
}
private long getFieldAsLong(HttpResponse<JsonNode> response, String field) {
@ -92,47 +98,94 @@ public class HttpBasedClient implements ElasticsearchClient {
return obj.getLong(fields[fields.length - 1]);
}
private String getUrl(String index, String type, String id) {
final StringBuilder buffer = new StringBuilder();
buffer.append("http://").append(host).append(":").append(port).append("/");
if (StringUtils.isNotEmpty(index)) {
buffer.append(index);
private String getUrl(String index, String type, String id, boolean useSearch) {
try {
final StringBuilder buffer = new StringBuilder();
buffer.append("http://").append(host).append(":").append(port).append("/");
if (StringUtils.isNotEmpty(index)) {
buffer.append(index);
if (StringUtils.isNotEmpty(type)) {
buffer.append("/").append(type);
if (StringUtils.isNotEmpty(type)) {
buffer.append("/").append(type);
if (StringUtils.isNotEmpty(id)) {
buffer.append("/").append(id);
if (StringUtils.isNotEmpty(id)) {
if (useSearch) {
final String encodedId = URLEncoder.encode(id, "UTF-8");
if (id.equals(encodedId)) {
// No difference, use directly the id
buffer.append("/").append(id);
}
else {
// There are differences: to avoid problems with some special characters
// such as / and # in id, use a "terms" query
buffer.append("/_search?source=")
.append(URLEncoder
.encode("{\"query\":{\"terms\":{\"_id\":[\"" + id + "\"]}}}", "UTF-8"));
}
}
else {
buffer.append("/").append(id);
}
}
}
}
return buffer.toString();
}
catch (final UnsupportedEncodingException e) {
throw new ActionException(e);
}
return buffer.toString();
}
private String getUrl(String[] indices, String[] types) {
final String inds = indices == null ? null : Joiner.on(",").join(indices);
final String typs = types == null ? null : Joiner.on(",").join(types);
return getUrl(inds, typs, null);
return getUrl(inds, typs, null, false);
}
@Override
public ActionResponse get(String index, String type, String id) {
ActionResponse response = null;
try {
final HttpRequest request = Unirest.get(getUrl(index, type, id));
final HttpRequest request = Unirest.get(getUrl(index, type, id, true));
if (StringUtils.isNotEmpty(username)) {
request.basicAuth(username, password);
}
final HttpResponse<JsonNode> result = request.asJson();
final HttpResponse<String> result = request.asString();
final boolean isSucceeded = isSucceeded(result);
response = new ActionResponse()
.succeeded(isSucceeded(result))
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
getFieldAsString(result, "_source")));
if (isSucceeded) {
final JsonNode body = new JsonNode(result.getBody());
if (body.getObject().has("_index")) {
response = new ActionResponse()
.succeeded(true)
.hit(new HitWrapper(
getFieldAsString(body, "_index"),
getFieldAsString(body, "_type"),
getFieldAsString(body, "_id"),
getFieldAsString(body, "_source")));
}
else {
final JSONArray hits = getFieldAsArray(body.getObject(), "hits/hits");
final JSONObject hit = (JSONObject) hits.iterator().next();
response = new ActionResponse()
.succeeded(true)
.hit(new HitWrapper(
hit.getString("_index"),
hit.getString("_type"),
hit.getString("_id"),
hit.opt("_source").toString()));
}
}
else {
if (result.getStatus() == 404) {
response = new ActionResponse()
.succeeded(false);
}
else {
throw new ActionException(result.getBody());
}
}
}
catch (final UnirestException e) {
throw new ActionException(e);
@ -144,20 +197,27 @@ public class HttpBasedClient implements ElasticsearchClient {
public ActionResponse delete(String index, String type, String id) {
ActionResponse response = null;
try {
final HttpRequest request = Unirest.delete(getUrl(index, type, id));
final HttpRequest request = Unirest.delete(getUrl(index, type, id, true));
if (StringUtils.isNotEmpty(username)) {
request.basicAuth(username, password);
}
final HttpResponse<JsonNode> result = request.asJson();
final HttpResponse<String> result = request.asString();
final boolean isSucceeded = isSucceeded(result);
response = new ActionResponse()
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
null))
.succeeded(isSucceeded(result));
if (isSucceeded) {
final JsonNode body = new JsonNode(result.getBody());
response = new ActionResponse()
.succeeded(true)
.hit(new HitWrapper(
getFieldAsString(body, "_index"),
getFieldAsString(body, "_type"),
getFieldAsString(body, "_id"),
null));
}
else {
throw new ActionException(result.getBody());
}
}
catch (final UnirestException e) {
throw new ActionException(e);
@ -171,10 +231,10 @@ public class HttpBasedClient implements ElasticsearchClient {
try {
HttpRequestWithBody request = null;
if (StringUtils.isEmpty(id)) {
request = Unirest.post(getUrl(index, type, id));
request = Unirest.post(getUrl(index, type, id, false));
}
else {
request = Unirest.put(getUrl(index, type, id));
request = Unirest.put(getUrl(index, type, id, false));
}
request
.header("Accept", "application/json")
@ -185,14 +245,20 @@ public class HttpBasedClient implements ElasticsearchClient {
}
final HttpResponse<JsonNode> result = request.asJson();
final boolean isSucceeded = isSucceeded(result);
response = new ActionResponse()
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
null))
.succeeded(isSucceeded(result));
if (isSucceeded) {
response = new ActionResponse()
.succeeded(true)
.hit(new HitWrapper(
getFieldAsString(result, "_index"),
getFieldAsString(result, "_type"),
getFieldAsString(result, "_id"),
null));
}
else {
throw new ActionException(result.getBody().toString());
}
}
catch (final UnirestException e) {
throw new ActionException(e);
@ -217,7 +283,6 @@ public class HttpBasedClient implements ElasticsearchClient {
}
try {
final HttpRequestWithBody request = Unirest
.post(getUrl(indices, types) + "/_search?size=" + size)
.header("Content-Type", "application/json");

View file

@ -28,7 +28,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter;
import org.apache.zeppelin.elasticsearch.action.ActionResponse;
import org.apache.zeppelin.elasticsearch.action.AggWrapper;
@ -58,7 +58,7 @@ import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSyntaxException;
/**
* Elasticsearch client using the transport protocol.
@ -152,7 +152,7 @@ public class TransportBasedClient implements ElasticsearchClient {
final Map source = gson.fromJson(query, Map.class);
reqBuilder.setExtraSource(source);
}
catch (final JsonParseException e) {
catch (final JsonSyntaxException e) {
// This is not a JSON (or maybe not well formatted...)
reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true));
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.elasticsearch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.ArrayList;
@ -30,6 +31,8 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -64,7 +67,7 @@ public class ElasticsearchInterpreterTest {
private static final String ELS_HTTP_PORT = "10200";
private static final String ELS_PATH = "/tmp/els";
private static final AtomicInteger deleteId = new AtomicInteger(4);
private static final AtomicInteger deleteId = new AtomicInteger(2);
@BeforeClass
@ -89,7 +92,7 @@ public class ElasticsearchInterpreterTest {
.endObject()
.endObject().endObject().endObject()).get();
for (int i = 0; i < 50; i++) {
for (int i = 0; i < 48; i++) {
elsClient.prepareIndex("logs", "http", "" + i)
.setRefresh(true)
.setSource(jsonBuilder()
@ -106,6 +109,23 @@ public class ElasticsearchInterpreterTest {
.get();
}
for (int i = 1; i < 3; i++) {
elsClient.prepareIndex("logs", "http", "very/strange/id#" + i)
.setRefresh(true)
.setSource(jsonBuilder()
.startObject()
.field("date", new Date())
.startObject("request")
.field("method", METHODS[RandomUtils.nextInt(METHODS.length)])
.field("url", "/zeppelin/" + UUID.randomUUID().toString())
.field("headers", Arrays.asList("Accept: *.*", "Host: apache.org"))
.endObject()
.field("status", STATUS[RandomUtils.nextInt(STATUS.length)])
.field("content_length", RandomUtils.nextInt(2000))
)
.get();
}
final Properties props = new Properties();
props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST);
props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME);
@ -132,7 +152,7 @@ public class ElasticsearchInterpreterTest {
}
if (elsClient != null) {
elsClient.admin().indices().delete(new DeleteIndexRequest("logs")).actionGet();
elsClient.admin().indices().delete(new DeleteIndexRequest("*")).actionGet();
elsClient.close();
}
@ -141,79 +161,104 @@ public class ElasticsearchInterpreterTest {
}
}
private InterpreterContext buildContext(String noteAndParagraphId) {
final AngularObjectRegistry angularObjReg = new AngularObjectRegistry("elasticsearch", null);
return new InterpreterContext(noteAndParagraphId, noteAndParagraphId, null, null, null, null, null,
null, angularObjReg , null, null, null);
}
@Theory
public void testCount(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("count /unknown", null);
final InterpreterContext ctx = buildContext("testCount");
InterpreterResult res = interpreter.interpret("count /unknown", ctx);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("count /logs", null);
res = interpreter.interpret("count /logs", ctx);
assertEquals(Code.SUCCESS, res.code());
assertEquals("50", res.message().get(0).getData());
assertNotNull(ctx.getAngularObjectRegistry().get("count_testCount", null, null));
assertEquals(50l, ctx.getAngularObjectRegistry().get("count_testCount", null, null).get());
res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
}
@Theory
public void testGet(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null);
final InterpreterContext ctx = buildContext("get");
InterpreterResult res = interpreter.interpret("get /logs/http/unknown", ctx);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("get /logs/http/4", null);
res = interpreter.interpret("get /logs/http/unknown/unknown", ctx);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("get /unknown/unknown/unknown", ctx);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("get /logs/http/very/strange/id#1", ctx);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("get /logs/_all/4", null);
res = interpreter.interpret("get /logs/http/4", ctx);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("get /logs/_all/4", ctx);
assertEquals(Code.SUCCESS, res.code());
}
@Theory
public void testSearch(ElasticsearchInterpreter interpreter) {
InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null);
final InterpreterContext ctx = buildContext("search");
InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", ctx);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs {{{hello}}}", null);
res = interpreter.interpret("search /logs {{{hello}}}", ctx);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", null);
res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs status:404", null);
res = interpreter.interpret("search /logs status:404", ctx);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", null);
res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
}
@Theory
public void testAgg(ElasticsearchInterpreter interpreter) {
final InterpreterContext ctx = buildContext("agg");
// Single-value metric
InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " +
" { \"cardinality\" : { \"field\" : \"status\" } } } }", null);
" { \"cardinality\" : { \"field\" : \"status\" } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
// Multi-value metric
res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " +
" { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", null);
" { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
// Single bucket
res = interpreter.interpret("search /logs { \"aggs\" : { " +
" \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " +
" \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", null);
" \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
// Multi-buckets
res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " +
" { \"terms\" : { \"field\" : \"status\" } } } }", null);
" { \"terms\" : { \"field\" : \"status\" } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
res = interpreter.interpret("search /logs { \"aggs\" : { " +
" \"length\" : { \"terms\": { \"field\": \"status\" }, " +
" \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null);
" \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", ctx);
assertEquals(Code.SUCCESS, res.code());
}
@ -223,6 +268,9 @@ public class ElasticsearchInterpreterTest {
InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("index /logs/http { bad ", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null);
assertEquals(Code.SUCCESS, res.code());
@ -236,6 +284,9 @@ public class ElasticsearchInterpreterTest {
InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null);
assertEquals(Code.ERROR, res.code());
res = interpreter.interpret("delete /unknown/unknown/unknown", null);
assertEquals(Code.ERROR, res.code());
final int testDeleteId = deleteId.decrementAndGet();
res = interpreter.interpret("delete /logs/http/" + testDeleteId, null);
assertEquals(Code.SUCCESS, res.code());