Skip to content

Commit 5faf5a1

Browse files
committed
feat(nrt): add soft-commit API, IndexWriter::nrt_searcher, and NrtDirectory overlay for near real-time search
1 parent bf560ac commit 5faf5a1

File tree

5 files changed

+217
-0
lines changed

5 files changed

+217
-0
lines changed

src/directory/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod file_watcher;
99
pub mod footer;
1010
mod managed_directory;
1111
mod ram_directory;
12+
mod nrt_directory;
1213
mod watch_event_router;
1314

1415
/// Errors specific to the directory module.
@@ -48,6 +49,7 @@ pub use memmap2::Advice;
4849
pub use self::managed_directory::ManagedDirectory;
4950
#[cfg(feature = "mmap")]
5051
pub use self::mmap_directory::MmapDirectory;
52+
pub use self::nrt_directory::NrtDirectory;
5153

5254
/// Write object for Directory.
5355
///

src/directory/nrt_directory.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
use std::collections::HashSet;
2+
use std::fmt;
3+
use std::io::{self, Write};
4+
use std::path::{Path, PathBuf};
5+
use std::sync::{Arc, RwLock};
6+
7+
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
8+
use crate::directory::{Directory, DirectoryLock, FileHandle, FileSlice, Lock, TerminatingWrite,
9+
WatchCallback, WatchHandle, WritePtr};
10+
use crate::directory::RamDirectory;
11+
12+
/// A Directory that overlays a `RamDirectory` on top of a base `Directory`.
13+
///
14+
/// - Writes (open_write and atomic_write) go to the in-memory overlay.
15+
/// - Reads first check the overlay, then fallback to the base directory.
16+
/// - sync_directory() persists overlay files that do not yet exist in the base directory.
17+
#[derive(Clone)]
18+
pub struct NrtDirectory {
19+
base: Box<dyn Directory>,
20+
overlay: RamDirectory,
21+
/// Tracks files written into the overlay to decide what to persist on sync.
22+
overlay_paths: Arc<RwLock<HashSet<PathBuf>>>,
23+
}
24+
25+
impl NrtDirectory {
26+
/// Wraps a base directory with an NRT overlay.
27+
pub fn wrap(base: Box<dyn Directory>) -> NrtDirectory {
28+
NrtDirectory {
29+
base,
30+
overlay: RamDirectory::default(),
31+
overlay_paths: Arc::new(RwLock::new(HashSet::new())),
32+
}
33+
}
34+
35+
/// Persist overlay files into the base directory if missing there.
36+
fn persist_overlay_into_base(&self) -> crate::Result<()> {
37+
let snapshot_paths: Vec<PathBuf> = {
38+
let guard = self.overlay_paths.read().unwrap();
39+
guard.iter().cloned().collect()
40+
};
41+
for path in snapshot_paths {
42+
// Skip if base already has the file
43+
if self.base.exists(&path).unwrap_or(false) {
44+
continue;
45+
}
46+
// Read bytes from overlay
47+
let file_slice: FileSlice = match self.overlay.open_read(&path) {
48+
Ok(slice) => slice,
49+
Err(OpenReadError::FileDoesNotExist(_)) => continue, // was removed meanwhile
50+
Err(e) => return Err(e.into()),
51+
};
52+
let bytes = file_slice
53+
.read_bytes()
54+
.map_err(|io_err| OpenReadError::IoError {
55+
io_error: Arc::new(io_err),
56+
filepath: path.clone(),
57+
})?;
58+
// Write to base
59+
let mut dest_wrt: WritePtr = self.base.open_write(&path)?;
60+
dest_wrt.write_all(bytes.as_slice())?;
61+
dest_wrt.terminate()?;
62+
}
63+
Ok(())
64+
}
65+
}
66+
67+
impl fmt::Debug for NrtDirectory {
68+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69+
write!(f, "NrtDirectory")
70+
}
71+
}
72+
73+
impl Directory for NrtDirectory {
74+
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
75+
if self.overlay.exists(path).unwrap_or(false) {
76+
return self.overlay.get_file_handle(path);
77+
}
78+
self.base.get_file_handle(path)
79+
}
80+
81+
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
82+
if self.overlay.exists(path).unwrap_or(false) {
83+
return self.overlay.open_read(path);
84+
}
85+
self.base.open_read(path)
86+
}
87+
88+
fn delete(&self, path: &Path) -> Result<(), DeleteError> {
89+
let _ = self.overlay.delete(path); // best-effort
90+
self.base.delete(path)
91+
}
92+
93+
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
94+
if self.overlay.exists(path).unwrap_or(false) {
95+
return Ok(true);
96+
}
97+
self.base.exists(path)
98+
}
99+
100+
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
101+
{
102+
let mut guard = self.overlay_paths.write().unwrap();
103+
guard.insert(path.to_path_buf());
104+
}
105+
self.overlay.open_write(path)
106+
}
107+
108+
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
109+
if self.overlay.exists(path).unwrap_or(false) {
110+
return self.overlay.atomic_read(path);
111+
}
112+
self.base.atomic_read(path)
113+
}
114+
115+
fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
116+
{
117+
let mut guard = self.overlay_paths.write().unwrap();
118+
guard.insert(path.to_path_buf());
119+
}
120+
self.overlay.atomic_write(path, data)
121+
}
122+
123+
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, crate::directory::error::LockError> {
124+
self.base.acquire_lock(lock)
125+
}
126+
127+
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
128+
// Watch meta.json changes on the base directory
129+
self.base.watch(watch_callback)
130+
}
131+
132+
fn sync_directory(&self) -> io::Result<()> {
133+
// Best effort: persist overlay, then sync base directory
134+
if let Err(err) = self.persist_overlay_into_base() {
135+
return Err(io::Error::new(io::ErrorKind::Other, format!("{err}")));
136+
}
137+
self.base.sync_directory()
138+
}
139+
}
140+
141+

src/indexer/index_writer.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,41 @@ impl<D: Document> IndexWriter<D> {
349349
&self.index
350350
}
351351

352+
/// Creates a near-real-time `Searcher` reflecting the latest `soft_commit()` state.
353+
///
354+
/// This builds a `Searcher` directly from the IndexWriter's in-memory committed segments,
355+
/// without relying on `meta.json` having been persisted.
356+
pub fn nrt_searcher(&self) -> crate::Result<crate::Searcher> {
357+
use crate::core::searcher::{SearcherGeneration, SearcherInner};
358+
use crate::directory::{Directory as _, META_LOCK};
359+
use crate::store::DOCSTORE_CACHE_CAPACITY;
360+
use crate::Inventory;
361+
362+
// Prevent GC from removing files while we open readers
363+
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
364+
365+
// Load the current in-memory committed segment metas
366+
let metas = self.segment_updater().committed_segment_metas();
367+
let mut segment_readers = Vec::with_capacity(metas.len());
368+
for meta in metas {
369+
let segment = self.index.segment(meta);
370+
let seg_reader = SegmentReader::open(&segment)?;
371+
segment_readers.push(seg_reader);
372+
}
373+
374+
// Track generation using a fresh Inventory
375+
let inventory: Inventory<SearcherGeneration> = Inventory::default();
376+
let generation = inventory.track(SearcherGeneration::from_segment_readers(&segment_readers, 0));
377+
let searcher_inner = SearcherInner::new(
378+
self.index.schema(),
379+
self.index.clone(),
380+
segment_readers,
381+
generation,
382+
DOCSTORE_CACHE_CAPACITY,
383+
)?;
384+
Ok(Arc::new(searcher_inner).into())
385+
}
386+
352387
/// If there are some merging threads, blocks until they all finish their work and
353388
/// then drop the `IndexWriter`.
354389
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
@@ -665,6 +700,13 @@ impl<D: Document> IndexWriter<D> {
665700
self.prepare_commit()?.commit()
666701
}
667702

703+
/// Soft-commit the current changes: publishes them to the in-memory committed set so
704+
/// they become visible to a near real-time reader, without persisting `meta.json`.
705+
pub fn soft_commit(&mut self) -> crate::Result<Opstamp> {
706+
let prepared = self.prepare_commit()?;
707+
prepared.soft_commit()
708+
}
709+
668710
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
669711
&self.segment_updater
670712
}

src/indexer/prepared_commit.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ impl<'a, D: Document> PreparedCommit<'a, D> {
3939
self.commit_future().wait()
4040
}
4141

42+
/// Soft-commit: publish segments in-memory for near real-time search without persisting
43+
/// `meta.json`. This does not call gc or directory sync.
44+
pub fn soft_commit(self) -> crate::Result<Opstamp> {
45+
self.index_writer
46+
.segment_updater()
47+
.schedule_soft_commit(self.opstamp, self.payload)
48+
.wait()
49+
}
50+
4251
/// Proceeds to commit.
4352
///
4453
/// Unfortunately, contrary to what `PrepareCommit` may suggests,

src/indexer/segment_updater.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,29 @@ impl SegmentUpdater {
454454
})
455455
}
456456

457+
/// Perform a soft commit: publish current uncommitted segments as committed in the
458+
/// in-memory state, but do not write `meta.json`.
459+
pub(crate) fn schedule_soft_commit(
460+
&self,
461+
opstamp: Opstamp,
462+
_payload: Option<String>,
463+
) -> FutureResult<Opstamp> {
464+
let segment_updater: SegmentUpdater = self.clone();
465+
self.schedule_task(move || {
466+
let segment_entries = segment_updater.purge_deletes(opstamp)?;
467+
// Move them to committed in memory only
468+
segment_updater.segment_manager.commit(segment_entries);
469+
// Do not persist meta.json. Keep active_index_meta opstamp as-is.
470+
segment_updater.consider_merge_options();
471+
Ok(opstamp)
472+
})
473+
}
474+
475+
/// Returns the current list of committed segment metas from the in-memory state.
476+
pub(crate) fn committed_segment_metas(&self) -> Vec<SegmentMeta> {
477+
self.segment_manager.committed_segment_metas()
478+
}
479+
457480
fn store_meta(&self, index_meta: &IndexMeta) {
458481
*self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone());
459482
}

0 commit comments

Comments
 (0)