Search: refatoring, move SearchService from NotebookRepoSync -> Notebook

This commit is contained in:
Alexander Bezzubov 2015-12-11 17:30:39 +09:00
parent 08fe806b0e
commit b2b93c436a
8 changed files with 46 additions and 33 deletions

View file

@ -255,9 +255,10 @@ public class ZeppelinServer extends Application {
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookServer);
this.notebookIndex = new SearchService();
this.notebookRepo = new NotebookRepoSync(conf, notebookIndex);
this.notebookRepo = new NotebookRepoSync(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory,
notebookServer, notebookIndex);
}
@Override

View file

@ -372,6 +372,8 @@ public class Note implements Serializable, JobListener {
}
public void persist() throws IOException {
//TODO(bzz): update index
//notebookIndex.
snapshotAngularObjectRegistry();
repo.save(this);
}

View file

@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@ -38,6 +39,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
@ -65,22 +67,45 @@ public class Notebook {
private org.quartz.Scheduler quartzSched;
private JobListenerFactory jobListenerFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
/**
* Main constructor \w manual Dependency Injection
*
* @param conf
* @param notebookRepo
* @param schedulerFactory
* @param replFactory
* @param jobListenerFactory
* @param notebookIndex - (nullable) for indexing all notebooks on creating.
*
* @throws IOException
* @throws SchedulerException
*/
public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo,
SchedulerFactory schedulerFactory,
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException,
SchedulerException {
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory,
SearchService notebookIndex) throws IOException, SchedulerException {
this.conf = conf;
this.notebookRepo = notebookRepo;
this.schedulerFactory = schedulerFactory;
this.replFactory = replFactory;
this.jobListenerFactory = jobListenerFactory;
this.notebookIndex = notebookIndex;
quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
quartzSched = quertzSchedFact.getScheduler();
quartzSched.start();
CronJob.notebook = this;
loadAllNotes();
if (this.notebookIndex != null) {
long start = System.nanoTime();
logger.info("Notebook indexing started...");
notebookIndex.index(notes.values());
logger.info("Notebook indexing finished: {} indexed in {}s", notes.size(),
TimeUnit.NANOSECONDS.toSeconds(start - System.nanoTime()));
}
}
/**

View file

@ -30,7 +30,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.search.SearchService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,7 +37,6 @@ import org.slf4j.LoggerFactory;
* Notebook repository sync with remote storage
*/
public class NotebookRepoSync implements NotebookRepo {
private SearchService notebookIndex;
private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class);
private static final int maxRepoNum = 2;
private static final String pushKey = "pushNoteIDs";
@ -52,8 +50,7 @@ public class NotebookRepoSync implements NotebookRepo {
* @param (conf)
* @throws - Exception
*/
public NotebookRepoSync(ZeppelinConfiguration conf, SearchService noteIndex) throws Exception {
this.notebookIndex = noteIndex;
public NotebookRepoSync(ZeppelinConfiguration conf) throws Exception {
config = conf;
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
@ -147,17 +144,6 @@ public class NotebookRepoSync implements NotebookRepo {
List <NoteInfo> srcNotes = srcRepo.list();
List <NoteInfo> dstNotes = dstRepo.list();
//TODO(bzz): find a better place
if (notebookIndex != null) {
List<Note> notebooks = new ArrayList<>();
for (NoteInfo i: srcNotes) {
notebooks.add(srcRepo.get(i.getId()));
}
LOG.info("Index started");
notebookIndex.index(notebooks);
LOG.info("Index ended");
}
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
List<String> pushNoteIDs = noteIDs.get(pushKey);
List<String> pullNoteIDs = noteIDs.get(pullKey);

View file

@ -1,6 +1,7 @@
package org.apache.zeppelin.search;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -126,7 +127,7 @@ public class SearchService {
return matchingParagraphs;
}
public void index(List<Note> notebooks) {
public void index(Collection<Note> collection) {
try {
Date start = new Date();
ramDirectory = new RAMDirectory();
@ -134,7 +135,7 @@ public class SearchService {
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
IndexWriter writer = new IndexWriter(ramDirectory, iwc);
indexDocs(writer, notebooks);
indexDocs(writer, collection);
writer.close();
Date end = new Date();
@ -155,8 +156,8 @@ public class SearchService {
* @throws IOException
* If there is a low-level I/O error
*/
void indexDocs(final IndexWriter writer, List<Note> docs) throws IOException {
for (Note note : docs) {
void indexDocs(final IndexWriter writer, Collection<Note> notes) throws IOException {
for (Note note : notes) {
for (Paragraph doc : note.getParagraphs()) {
if (doc.getText() == null) {
LOG.info("Skipping empty paragraph");

View file

@ -85,7 +85,7 @@ public class NotebookTest implements JobListenerFactory{
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
notebookRepo = new VFSNotebookRepo(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, null);
}
@After
@ -170,7 +170,8 @@ public class NotebookTest implements JobListenerFactory{
p1.setText("hello world");
note.persist();
Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this);
Notebook notebook2 = new Notebook(
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this, null);
assertEquals(1, notebook2.getAllNotes().size());
}

View file

@ -86,8 +86,8 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
notebookRepoSync = new NotebookRepoSync(conf, null);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this);
notebookRepoSync = new NotebookRepoSync(conf);
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, null);
}
@After

View file

@ -44,8 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VFSNotebookRepoTest implements JobListenerFactory{
private static final Logger logger = LoggerFactory.getLogger(NotebookTest.class);
private ZeppelinConfiguration conf;
private SchedulerFactory schedulerFactory;
private Notebook notebook;
@ -53,16 +51,15 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
private InterpreterFactory factory;
private File mainZepDir;
private File mainNotebookDir;
@Before
public void setUp() throws Exception {
String zpath = System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis();
String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis();
mainZepDir = new File(zpath);
mainZepDir.mkdirs();
new File(mainZepDir, "conf").mkdirs();
String mainNotePath = zpath+"/notebook";
String mainNotePath = zpath + "/notebook";
mainNotebookDir = new File(mainNotePath);
mainNotebookDir.mkdirs();
@ -80,7 +77,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
notebookRepo = new VFSNotebookRepo(conf);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, null);
}
@After