Search: backend indexing using Lucene added to zengine

This commit is contained in:
Alexander Bezzubov 2015-11-23 09:38:57 +09:00
parent 163a465e17
commit c2c2a52526
7 changed files with 232 additions and 53 deletions

View file

@ -197,6 +197,22 @@
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0-m10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
@ -240,23 +256,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0-m10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>

View file

@ -40,14 +40,13 @@ import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.message.CronRequest;
import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind;
import org.apache.zeppelin.rest.message.NewNotebookRequest;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.server.JsonResponse;
import org.apache.zeppelin.socket.NotebookServer;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@ -57,17 +56,18 @@ import com.google.gson.reflect.TypeToken;
@Path("/notebook")
@Produces("application/json")
public class NotebookRestApi {
Logger logger = LoggerFactory.getLogger(NotebookRestApi.class);
private static final Logger LOG = LoggerFactory.getLogger(NotebookRestApi.class);
Gson gson = new Gson();
private Notebook notebook;
private NotebookServer notebookServer;
private SearchService notebookIndex;
public NotebookRestApi() {}
public NotebookRestApi(Notebook notebook, NotebookServer notebookServer) {
public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) {
this.notebook = notebook;
this.notebookServer = notebookServer;
this.notebookIndex = search;
}
/**
@ -141,7 +141,7 @@ public class NotebookRestApi {
@POST
@Path("/")
public Response createNote(String message) throws IOException {
logger.info("Create new notebook by JSON {}" , message);
LOG.info("Create new notebook by JSON {}" , message);
NewNotebookRequest request = gson.fromJson(message,
NewNotebookRequest.class);
Note note = notebook.createNote();
@ -166,7 +166,7 @@ public class NotebookRestApi {
@DELETE
@Path("{notebookId}")
public Response deleteNote(@PathParam("notebookId") String notebookId) throws IOException {
logger.info("Delete notebook {} ", notebookId);
LOG.info("Delete notebook {} ", notebookId);
if (!(notebookId.isEmpty())) {
Note note = notebook.getNote(notebookId);
if (note != null) {
@ -187,7 +187,7 @@ public class NotebookRestApi {
@Path("{notebookId}")
public Response cloneNote(@PathParam("notebookId") String notebookId, String message) throws
IOException, CloneNotSupportedException, IllegalArgumentException {
logger.info("clone notebook by JSON {}" , message);
LOG.info("clone notebook by JSON {}" , message);
NewNotebookRequest request = gson.fromJson(message,
NewNotebookRequest.class);
String newNoteName = request.getName();
@ -207,7 +207,7 @@ public class NotebookRestApi {
@Path("job/{notebookId}")
public Response runNoteJobs(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("run notebook jobs {} ", notebookId);
LOG.info("run notebook jobs {} ", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
@ -227,7 +227,7 @@ public class NotebookRestApi {
@Path("job/{notebookId}")
public Response stopNoteJobs(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("stop notebook jobs {} ", notebookId);
LOG.info("stop notebook jobs {} ", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
@ -251,7 +251,7 @@ public class NotebookRestApi {
@Path("job/{notebookId}")
public Response getNoteJobStatus(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("get notebook job status.");
LOG.info("get notebook job status.");
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
@ -271,7 +271,7 @@ public class NotebookRestApi {
public Response runParagraph(@PathParam("notebookId") String notebookId,
@PathParam("paragraphId") String paragraphId) throws
IOException, IllegalArgumentException {
logger.info("run paragraph job {} {} ", notebookId, paragraphId);
LOG.info("run paragraph job {} {} ", notebookId, paragraphId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
@ -296,7 +296,7 @@ public class NotebookRestApi {
public Response stopParagraph(@PathParam("notebookId") String notebookId,
@PathParam("paragraphId") String paragraphId) throws
IOException, IllegalArgumentException {
logger.info("stop paragraph job {} ", notebookId);
LOG.info("stop paragraph job {} ", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
return new JsonResponse(Status.NOT_FOUND, "note not found.").build();
@ -320,7 +320,7 @@ public class NotebookRestApi {
@Path("cron/{notebookId}")
public Response registerCronJob(@PathParam("notebookId") String notebookId, String message) throws
IOException, IllegalArgumentException {
logger.info("Register cron job note={} request cron msg={}", notebookId, message);
LOG.info("Register cron job note={} request cron msg={}", notebookId, message);
CronRequest request = gson.fromJson(message,
CronRequest.class);
@ -352,7 +352,7 @@ public class NotebookRestApi {
@Path("cron/{notebookId}")
public Response removeCronJob(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("Remove cron job note {}", notebookId);
LOG.info("Remove cron job note {}", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
@ -377,7 +377,7 @@ public class NotebookRestApi {
@Path("cron/{notebookId}")
public Response getCronJob(@PathParam("notebookId") String notebookId) throws
IOException, IllegalArgumentException {
logger.info("Get cron job note {}", notebookId);
LOG.info("Get cron job note {}", notebookId);
Note note = notebook.getNote(notebookId);
if (note == null) {
@ -393,18 +393,10 @@ public class NotebookRestApi {
@GET
@Path("search")
public Response search(@QueryParam("q") String query) {
logger.info("Searching notebooks for {}", query);
//List<Map<String, String>> notebooksFound = searchNotebooks(query);
ImmutableList<ImmutableMap<String, String>> foundNotebooks = ImmutableList.of(
ImmutableMap.of("id", "XXXX", "name", "Test Notebook"),
ImmutableMap.of("id", "YYYY", "name", "Another Notebook")
);
logger.info("Notbooks {} found", foundNotebooks.size());
return new JsonResponse<>(Status.OK, foundNotebooks).build();
LOG.info("Searching notebooks for {}", query);
List<Map<String, String>> notebooksFound = notebookIndex.search(query);
LOG.info("Notbooks {} found", notebooksFound.size());
return new JsonResponse<>(Status.OK, notebooksFound).build();
}
private List<Map<String, String>> searchNotebooks(String query) {
return null;
}
}

View file

@ -38,6 +38,7 @@ import org.apache.zeppelin.rest.InterpreterRestApi;
import org.apache.zeppelin.rest.NotebookRestApi;
import org.apache.zeppelin.rest.ZeppelinRestApi;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.socket.NotebookServer;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Handler;
@ -63,13 +64,14 @@ import org.slf4j.LoggerFactory;
public class ZeppelinServer extends Application {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
private SchedulerFactory schedulerFactory;
public static Notebook notebook;
public static NotebookServer notebookServer;
public static Server jettyServer;
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
public static void main(String[] args) throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
@ -251,9 +253,10 @@ public class ZeppelinServer extends Application {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookServer);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new SearchService();
this.notebookRepo = new NotebookRepoSync(conf, notebookIndex);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
}
@ -271,7 +274,7 @@ public class ZeppelinServer extends Application {
ZeppelinRestApi root = new ZeppelinRestApi();
singletons.add(root);
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer);
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer, notebookIndex);
singletons.add(notebookApi);
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);

View file

@ -123,6 +123,30 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>

View file

@ -30,6 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.search.SearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory;
* Notebook repository sync with remote storage
*/
public class NotebookRepoSync implements NotebookRepo {
private SearchService notebookIndex;
private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class);
private static final int maxRepoNum = 2;
private static final String pushKey = "pushNoteIDs";
@ -46,11 +48,12 @@ public class NotebookRepoSync implements NotebookRepo {
private List<NotebookRepo> repos = new ArrayList<NotebookRepo>();
/**
* @param notebookIndex
* @param (conf)
* @throws - Exception
*/
public NotebookRepoSync(ZeppelinConfiguration conf) throws Exception {
public NotebookRepoSync(ZeppelinConfiguration conf, SearchService notebookIndex) throws Exception {
this.notebookIndex = notebookIndex;
config = conf;
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
@ -137,13 +140,24 @@ public class NotebookRepoSync implements NotebookRepo {
* copy new/updated notes from source to destination storage
* @throws IOException
*/
void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
public void sync(int sourceRepoIndex, int destRepoIndex) throws IOException {
LOG.info("Sync started");
NotebookRepo sourceRepo = getRepo(sourceRepoIndex);
NotebookRepo destRepo = getRepo(destRepoIndex);
List <NoteInfo> sourceNotes = sourceRepo.list();
List <NoteInfo> destNotes = destRepo.list();
//TODO(bzz): find a better place
if (notebookIndex != null) {
List<Note> notebooks = new ArrayList<>();
for (NoteInfo i: sourceNotes) {
notebooks.add(sourceRepo.get(i.getId()));
}
LOG.info("Index started");
notebookIndex.index(notebooks);
LOG.info("Index ended");
}
Map<String, List<String>> noteIDs = notesCheckDiff(sourceNotes,
sourceRepo,
destNotes,

View file

@ -0,0 +1,147 @@
package org.apache.zeppelin.search;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* TODO(bzz): find a better name
*/
public class SearchService {
private static final Logger LOG = LoggerFactory.getLogger(SearchService.class);
Directory ramDirectory;
static final String SEARCH_FIELD = "contents";
static final String ID_FIELD = "contents";
public List<Map<String, String>> search(String queryStr) {
List<Map<String, String>> result = Collections.emptyList();
try (IndexReader indexReader = DirectoryReader.open(ramDirectory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Analyzer analyzer = new StandardAnalyzer();
QueryParser parser = new QueryParser(SEARCH_FIELD, analyzer);
Query query = parser.parse(queryStr);
LOG.info("Searching for: " + query.toString(SEARCH_FIELD));
result = doSearch(indexSearcher, query);
} catch (IOException e) {
LOG.error("Faild to open index dir", e);
} catch (ParseException e) {
LOG.error("Faild to parse query " + queryStr, e);
}
return result;
}
private List<Map<String, String>> doSearch(IndexSearcher searcher, Query query) {
List<Map<String, String>> matchingParagraphs = Lists.newArrayList();
ScoreDoc[] hits;
try {
hits = searcher.search(query, 20).scoreDocs;
for (int i = 0; i < hits.length; i++) {
LOG.info("doc={} score={}", hits[i].doc, hits[i].score);
Document doc = searcher.doc(hits[i].doc);
String path = doc.get(ID_FIELD);
if (path != null) {
LOG.info((i + 1) + ". " + path);
String title = doc.get("title");
if (title != null) {
LOG.info(" Title: {}", doc.get("title"));
}
matchingParagraphs.add(ImmutableMap.of("id", path, "name", title));
} else {
LOG.info("{}. No {} for this document", i + 1, ID_FIELD);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return matchingParagraphs;
}
public void index(List<Note> notebooks) {
try {
Date start = new Date();
ramDirectory = new RAMDirectory();
Analyzer analyzer = new StandardAnalyzer();
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
IndexWriter writer = new IndexWriter(ramDirectory, iwc);
indexDocs(writer, notebooks);
writer.close();
Date end = new Date();
LOG.info(end.getTime() - start.getTime() + " total milliseconds");
} catch (Exception e) {
LOG.error("Failed to index all Notebooks", e);
}
}
/**
* Indexes the given list of notebooks
*
* @param writer Writer to the index where the given file/dir info will be stored
* @param path The file to index, or the directory to recurse into to find files to index
* @throws IOException If there is a low-level I/O error
*/
void indexDocs(final IndexWriter writer, List<Note> docs) throws IOException {
for (Note note: docs) {
for (Paragraph doc: note.getParagraphs()) {
if (doc.getText() == null) {
LOG.info("Skipping empty paragraph");
continue;
}
indexDoc(writer, note, doc);
}
}
}
/** Indexes a single paragraph = document */
void indexDoc(IndexWriter writer, Note note, Paragraph p) throws IOException {
Document doc = new Document();
//<note-id>/paragraph/<paragraph-id>
String id = String.format("%s/paragraph/%s", note.getId(), p.getId());
Field pathField = new StringField(ID_FIELD, id, Field.Store.YES);
doc.add(pathField);
doc.add(new StringField("title", note.getName(), Field.Store.YES));
Date date = p.getDateStarted() != null ? p.getDateStarted() : p.getDateCreated();
doc.add(new LongField("modified", date.getTime(), Field.Store.NO));
doc.add(new TextField(SEARCH_FIELD, p.getText(), Field.Store.YES));
writer.addDocument(doc);
}
}

View file

@ -86,7 +86,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
notebookRepoSync = new NotebookRepoSync(conf);
notebookRepoSync = new NotebookRepoSync(conf, null);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this);
}