Skip to content

Commit 5a2fe42

Browse files
authored
make zstd optional in sstable (#2633)
* make zstd truly optional * changelog notes * make sure we write * resolve comments * make this a default feature * remove changelog notes
1 parent 5379c99 commit 5a2fe42

File tree

5 files changed

+49
-28
lines changed

5 files changed

+49
-28
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,16 @@ debug-assertions = true
112112
overflow-checks = true
113113

114114
[features]
115-
default = ["mmap", "stopwords", "lz4-compression"]
115+
default = ["mmap", "stopwords", "lz4-compression", "columnar-zstd-compression"]
116116
mmap = ["fs4", "tempfile", "memmap2"]
117117
stopwords = []
118118

119119
lz4-compression = ["lz4_flex"]
120120
zstd-compression = ["zstd"]
121121

122+
# enable zstd-compression in columnar (and sstable)
123+
columnar-zstd-compression = ["columnar/zstd-compression"]
124+
122125
failpoints = ["fail", "fail/failpoints"]
123126
unstable = [] # useful for benches.
124127

columnar/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ harness = false
3333
name = "bench_access"
3434
harness = false
3535

36-
3736
[features]
3837
unstable = []
38+
zstd-compression = ["sstable/zstd-compression"]

sstable/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ itertools = "0.14.0"
1616
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
1717
tantivy-fst = "0.5"
1818
# experimental gives us access to Decompressor::upper_bound
19-
zstd = { version = "0.13", features = ["experimental"] }
19+
zstd = { version = "0.13", optional = true, features = ["experimental"] }
20+
21+
[features]
22+
zstd-compression = ["zstd"]
2023

2124
[dev-dependencies]
2225
proptest = "1"

sstable/src/block_reader.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::io::{self, Read};
22
use std::ops::Range;
33

44
use common::OwnedBytes;
5+
#[cfg(feature = "zstd-compression")]
56
use zstd::bulk::Decompressor;
67

78
pub struct BlockReader {
@@ -82,13 +83,23 @@ impl BlockReader {
8283
));
8384
}
8485
if compress == 1 {
85-
let required_capacity =
86-
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
87-
self.buffer.reserve(required_capacity);
88-
Decompressor::new()?
89-
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
86+
#[cfg(feature = "zstd-compression")]
87+
{
88+
let required_capacity =
89+
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
90+
self.buffer.reserve(required_capacity);
91+
Decompressor::new()?
92+
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
93+
94+
self.reader.advance(block_len);
95+
}
9096

91-
self.reader.advance(block_len);
97+
if cfg!(not(feature = "zstd-compression")) {
98+
return Err(io::Error::new(
99+
io::ErrorKind::Unsupported,
100+
"zstd-compression feature is not enabled",
101+
));
102+
}
92103
} else {
93104
self.buffer.resize(block_len, 0u8);
94105
self.reader.read_exact(&mut self.buffer[..])?;

sstable/src/delta.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::io::{self, BufWriter, Write};
22
use std::ops::Range;
33

44
use common::{CountingWriter, OwnedBytes};
5+
#[cfg(feature = "zstd-compression")]
56
use zstd::bulk::Compressor;
67

78
use super::value::ValueWriter;
@@ -53,25 +54,28 @@ where
5354

5455
let block_len = buffer.len() + self.block.len();
5556

56-
if block_len > 2048 {
57-
buffer.extend_from_slice(&self.block);
58-
self.block.clear();
59-
60-
let max_len = zstd::zstd_safe::compress_bound(buffer.len());
61-
self.block.reserve(max_len);
62-
Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?;
63-
64-
// verify compression had a positive impact
65-
if self.block.len() < buffer.len() {
66-
self.write
67-
.write_all(&(self.block.len() as u32 + 1).to_le_bytes())?;
68-
self.write.write_all(&[1])?;
69-
self.write.write_all(&self.block[..])?;
70-
} else {
71-
self.write
72-
.write_all(&(block_len as u32 + 1).to_le_bytes())?;
73-
self.write.write_all(&[0])?;
74-
self.write.write_all(&buffer[..])?;
57+
if cfg!(feature = "zstd-compression") && block_len > 2048 {
58+
#[cfg(feature = "zstd-compression")]
59+
{
60+
buffer.extend_from_slice(&self.block);
61+
self.block.clear();
62+
63+
let max_len = zstd::zstd_safe::compress_bound(buffer.len());
64+
self.block.reserve(max_len);
65+
Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?;
66+
67+
// verify compression had a positive impact
68+
if self.block.len() < buffer.len() {
69+
self.write
70+
.write_all(&(self.block.len() as u32 + 1).to_le_bytes())?;
71+
self.write.write_all(&[1])?;
72+
self.write.write_all(&self.block[..])?;
73+
} else {
74+
self.write
75+
.write_all(&(block_len as u32 + 1).to_le_bytes())?;
76+
self.write.write_all(&[0])?;
77+
self.write.write_all(&buffer[..])?;
78+
}
7579
}
7680
} else {
7781
self.write

0 commit comments

Comments
 (0)