Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kvdb-rocksdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog].

## [Unreleased]

## [0.20.1] - 2025-11-07
- Force compact the DB on startup and on heavy writes. [#949](https://github.com/paritytech/parity-common/pull/949)

## [0.20.0] - 2025-09-03
- Updated `rocksdb` to 0.24. [#935](https://github.com/paritytech/parity-common/pull/935)

Expand Down
2 changes: 1 addition & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kvdb-rocksdb"
version = "0.20.0"
version = "0.20.1"
description = "kvdb implementation backed by RocksDB"
rust-version = "1.71.1"
authors.workspace = true
Expand Down
51 changes: 47 additions & 4 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use std::{
collections::HashMap,
error, io,
path::{Path, PathBuf},
time::{Duration, Instant},
};

use parking_lot::Mutex;
use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB,
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, CompactOptions, Options, ReadOptions, WriteBatch,
WriteOptions, DB,
};

use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
Expand Down Expand Up @@ -268,6 +271,7 @@ pub struct Database {
read_opts: ReadOptions,
block_opts: BlockBasedOptions,
stats: stats::RunningDbStats,
last_compaction: Mutex<Instant>,
}

/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
Expand Down Expand Up @@ -350,15 +354,23 @@ impl Database {
Self::open_primary(&opts, path.as_ref(), config, column_names.as_slice(), &block_opts)?
};

Ok(Database {
let db = Database {
inner: DBAndColumns { db, column_names },
config: config.clone(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
last_compaction: Mutex::new(Instant::now()),
};

// After opening the DB, we want to compact it.
//
// This just in case the node crashed before to ensure the db stays fast.
db.force_compaction()?;

Ok(db)
}

/// Internal api to open a database in primary mode.
Expand Down Expand Up @@ -460,7 +472,21 @@ impl Database {
}
self.stats.tally_bytes_written(stats_total_bytes as u64);

cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)
let res = cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)?;

// If we have written more data than what we want to have stored in a `sst` file, we force compaction.
// We also ensure that we only compact once per minute.
//
// Otherwise, rocksdb read performance is going down, after e.g. a warp sync.
if stats_total_bytes > self.config.compaction.initial_file_size as usize &&
self.last_compaction.lock().elapsed() > Duration::from_secs(60)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas how long compaction lasts? 60s sounds too often - but it is a pure guess from my side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its only when we do large write and 60s have passed. If not much happens, we will not compact every 60 seconds. But also means that if changes to rocksdb trickle in, this branch will never be triggered. But maybe thats okay and rocksdb can handle that case by itself. At least we saw this problem mostly with large writes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not measure it, but it took less than 60s. Also we are not really writing that much. Even if it takes longer, rocksdb internally hopefully prevents this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hopefully

[x] doubts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For whoever lands here in the future, sorry :)

{
self.force_compaction()?;

*self.last_compaction.lock() = Instant::now();
}

Ok(res)
}

/// Get value by key.
Expand Down Expand Up @@ -579,6 +605,23 @@ impl Database {
pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
}

/// Force compacting the entire db.
fn force_compaction(&self) -> io::Result<()> {
let mut compact_options = CompactOptions::default();
compact_options.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force);

// Don't ask me why we can not just use `compact_range_opt`...
// But we are forced to trigger compaction on every column. Actually we only need this for the `STATE` column,
// but we don't know which one this is here. So, we just iterate all of them.
for col in 0..self.inner.column_names.len() {
self.inner
.db
.compact_range_cf_opt(self.inner.cf(col)?, None::<Vec<u8>>, None::<Vec<u8>>, &compact_options);
}

Ok(())
}
}

// duplicate declaration of methods here to avoid trait import in certain existing cases
Expand Down
Loading