mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Search: refatoring, move SearchService from NotebookRepoSync -> Notebook
This commit is contained in:
parent
08fe806b0e
commit
b2b93c436a
8 changed files with 46 additions and 33 deletions
|
|
@ -255,9 +255,10 @@ public class ZeppelinServer extends Application {
|
|||
this.schedulerFactory = new SchedulerFactory();
|
||||
this.replFactory = new InterpreterFactory(conf, notebookServer);
|
||||
this.notebookIndex = new SearchService();
|
||||
this.notebookRepo = new NotebookRepoSync(conf, notebookIndex);
|
||||
this.notebookRepo = new NotebookRepoSync(conf);
|
||||
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory,
|
||||
notebookServer, notebookIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -372,6 +372,8 @@ public class Note implements Serializable, JobListener {
|
|||
}
|
||||
|
||||
public void persist() throws IOException {
|
||||
//TODO(bzz): update index
|
||||
//notebookIndex.
|
||||
snapshotAngularObjectRegistry();
|
||||
repo.save(this);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
|
|
@ -38,6 +39,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
import org.quartz.CronTrigger;
|
||||
import org.quartz.JobBuilder;
|
||||
|
|
@ -65,22 +67,45 @@ public class Notebook {
|
|||
private org.quartz.Scheduler quartzSched;
|
||||
private JobListenerFactory jobListenerFactory;
|
||||
private NotebookRepo notebookRepo;
|
||||
private SearchService notebookIndex;
|
||||
|
||||
/**
|
||||
* Main constructor \w manual Dependency Injection
|
||||
*
|
||||
* @param conf
|
||||
* @param notebookRepo
|
||||
* @param schedulerFactory
|
||||
* @param replFactory
|
||||
* @param jobListenerFactory
|
||||
* @param notebookIndex - (nullable) for indexing all notebooks on creating.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws SchedulerException
|
||||
*/
|
||||
public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo,
|
||||
SchedulerFactory schedulerFactory,
|
||||
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException,
|
||||
SchedulerException {
|
||||
InterpreterFactory replFactory, JobListenerFactory jobListenerFactory,
|
||||
SearchService notebookIndex) throws IOException, SchedulerException {
|
||||
this.conf = conf;
|
||||
this.notebookRepo = notebookRepo;
|
||||
this.schedulerFactory = schedulerFactory;
|
||||
this.replFactory = replFactory;
|
||||
this.jobListenerFactory = jobListenerFactory;
|
||||
this.notebookIndex = notebookIndex;
|
||||
quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
|
||||
quartzSched = quertzSchedFact.getScheduler();
|
||||
quartzSched.start();
|
||||
CronJob.notebook = this;
|
||||
|
||||
loadAllNotes();
|
||||
if (this.notebookIndex != null) {
|
||||
long start = System.nanoTime();
|
||||
logger.info("Notebook indexing started...");
|
||||
notebookIndex.index(notes.values());
|
||||
logger.info("Notebook indexing finished: {} indexed in {}s", notes.size(),
|
||||
TimeUnit.NANOSECONDS.toSeconds(start - System.nanoTime()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
|||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -38,7 +37,6 @@ import org.slf4j.LoggerFactory;
|
|||
* Notebook repository sync with remote storage
|
||||
*/
|
||||
public class NotebookRepoSync implements NotebookRepo {
|
||||
private SearchService notebookIndex;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class);
|
||||
private static final int maxRepoNum = 2;
|
||||
private static final String pushKey = "pushNoteIDs";
|
||||
|
|
@ -52,8 +50,7 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
* @param (conf)
|
||||
* @throws - Exception
|
||||
*/
|
||||
public NotebookRepoSync(ZeppelinConfiguration conf, SearchService noteIndex) throws Exception {
|
||||
this.notebookIndex = noteIndex;
|
||||
public NotebookRepoSync(ZeppelinConfiguration conf) throws Exception {
|
||||
config = conf;
|
||||
|
||||
String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim();
|
||||
|
|
@ -147,17 +144,6 @@ public class NotebookRepoSync implements NotebookRepo {
|
|||
List <NoteInfo> srcNotes = srcRepo.list();
|
||||
List <NoteInfo> dstNotes = dstRepo.list();
|
||||
|
||||
//TODO(bzz): find a better place
|
||||
if (notebookIndex != null) {
|
||||
List<Note> notebooks = new ArrayList<>();
|
||||
for (NoteInfo i: srcNotes) {
|
||||
notebooks.add(srcRepo.get(i.getId()));
|
||||
}
|
||||
LOG.info("Index started");
|
||||
notebookIndex.index(notebooks);
|
||||
LOG.info("Index ended");
|
||||
}
|
||||
|
||||
Map<String, List<String>> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo);
|
||||
List<String> pushNoteIDs = noteIDs.get(pushKey);
|
||||
List<String> pullNoteIDs = noteIDs.get(pullKey);
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package org.apache.zeppelin.search;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
|
@ -126,7 +127,7 @@ public class SearchService {
|
|||
return matchingParagraphs;
|
||||
}
|
||||
|
||||
public void index(List<Note> notebooks) {
|
||||
public void index(Collection<Note> collection) {
|
||||
try {
|
||||
Date start = new Date();
|
||||
ramDirectory = new RAMDirectory();
|
||||
|
|
@ -134,7 +135,7 @@ public class SearchService {
|
|||
IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
|
||||
IndexWriter writer = new IndexWriter(ramDirectory, iwc);
|
||||
|
||||
indexDocs(writer, notebooks);
|
||||
indexDocs(writer, collection);
|
||||
|
||||
writer.close();
|
||||
Date end = new Date();
|
||||
|
|
@ -155,8 +156,8 @@ public class SearchService {
|
|||
* @throws IOException
|
||||
* If there is a low-level I/O error
|
||||
*/
|
||||
void indexDocs(final IndexWriter writer, List<Note> docs) throws IOException {
|
||||
for (Note note : docs) {
|
||||
void indexDocs(final IndexWriter writer, Collection<Note> notes) throws IOException {
|
||||
for (Note note : notes) {
|
||||
for (Paragraph doc : note.getParagraphs()) {
|
||||
if (doc.getText() == null) {
|
||||
LOG.info("Skipping empty paragraph");
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
|
||||
|
||||
notebookRepo = new VFSNotebookRepo(conf);
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, null);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -170,7 +170,8 @@ public class NotebookTest implements JobListenerFactory{
|
|||
p1.setText("hello world");
|
||||
note.persist();
|
||||
|
||||
Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this);
|
||||
Notebook notebook2 = new Notebook(
|
||||
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this, null);
|
||||
assertEquals(1, notebook2.getAllNotes().size());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -86,8 +86,8 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
|
|||
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
|
||||
|
||||
notebookRepoSync = new NotebookRepoSync(conf, null);
|
||||
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this);
|
||||
notebookRepoSync = new NotebookRepoSync(conf);
|
||||
notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, null);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -44,8 +44,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class VFSNotebookRepoTest implements JobListenerFactory{
|
||||
private static final Logger logger = LoggerFactory.getLogger(NotebookTest.class);
|
||||
|
||||
private ZeppelinConfiguration conf;
|
||||
private SchedulerFactory schedulerFactory;
|
||||
private Notebook notebook;
|
||||
|
|
@ -53,16 +51,15 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
|
|||
private InterpreterFactory factory;
|
||||
|
||||
private File mainZepDir;
|
||||
|
||||
private File mainNotebookDir;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
String zpath = System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis();
|
||||
String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis();
|
||||
mainZepDir = new File(zpath);
|
||||
mainZepDir.mkdirs();
|
||||
new File(mainZepDir, "conf").mkdirs();
|
||||
String mainNotePath = zpath+"/notebook";
|
||||
String mainNotePath = zpath + "/notebook";
|
||||
mainNotebookDir = new File(mainNotePath);
|
||||
mainNotebookDir.mkdirs();
|
||||
|
||||
|
|
@ -80,7 +77,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory{
|
|||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
|
||||
|
||||
notebookRepo = new VFSNotebookRepo(conf);
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this);
|
||||
notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this, null);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
Loading…
Reference in a new issue