Skip to content

Commit 0d72aa5

Browse files
authored
Speedup list_prefix and getsize_prefix when prefix is not empty (#1245)
* Speedup `list_prefix` and `getsize_prefix` when prefix is not empty These two operations were very inefficient in repositories with a large number of arrays. This PR implements pre-filtering of chunks, so we only need to open manifests for the arrays that match the prefix. Also in this PR: limit to 50 the number of preloaded array manifests. Making this number configurable is an exercise left to a future PR. * Fail listing chunks if prefix is not a node * Add `X-Tigris-Consistent:true` header to the Tigris Storage * Enable debug logs in GHA test run * Back to old tigris endpoint * Back to new tigris endpoint
1 parent cda7e3a commit 0d72aa5

File tree

5 files changed

+100
-39
lines changed

5 files changed

+100
-39
lines changed

.github/workflows/rust-ci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ jobs:
9090
- name: Run tests with Docker services
9191
if: matrix.os == 'ubuntu-latest' || matrix.os == 'ubuntu-24.04-arm'
9292
env:
93+
RUST_LOG: trace
9394
R2_BUCKET: ${{ secrets.R2_BUCKET }}
9495
R2_ACCESS_KEY_ID: ${{ secrets.R2_ACCESS_KEY_ID }}
9596
R2_SECRET_ACCESS_KEY: ${{ secrets.R2_SECRET_ACCESS_KEY }}

icechunk/src/repository.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use futures::{
1313
Stream, StreamExt, TryStreamExt,
1414
stream::{FuturesOrdered, FuturesUnordered},
1515
};
16+
use itertools::Itertools;
1617
use regex::bytes::Regex;
1718
use serde::{Deserialize, Serialize};
1819
use thiserror::Error;
@@ -28,7 +29,8 @@ use crate::{
2829
IcechunkFormatError, IcechunkFormatErrorKind, ManifestId, NodeId, Path,
2930
SnapshotId,
3031
snapshot::{
31-
ManifestFileInfo, NodeData, Snapshot, SnapshotInfo, SnapshotProperties,
32+
ManifestFileInfo, NodeData, NodeType, Snapshot, SnapshotInfo,
33+
SnapshotProperties,
3234
},
3335
transaction_log::{Diff, DiffBuilder},
3436
},
@@ -857,7 +859,12 @@ impl Repository {
857859
// TODO: unnest this code
858860
if let Ok(snap) = asset_manager.fetch_snapshot(&snapshot_id).await {
859861
let snap_c = Arc::clone(&snap);
860-
for node in snap.iter_arc() {
862+
for node in snap
863+
.iter_arc()
864+
.filter_ok(|node| node.node_type() == NodeType::Array)
865+
// TODO: make configurable
866+
.take(50)
867+
{
861868
match node {
862869
Err(err) => {
863870
error!(error=%err, "Error retrieving snapshot nodes");

icechunk/src/session.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,21 @@ impl Session {
863863
all_chunks(&self.asset_manager, &self.change_set, self.snapshot_id()).await
864864
}
865865

866+
#[instrument(skip(self))]
867+
pub async fn all_node_chunks<'a>(
868+
&'a self,
869+
node: &'a NodeSnapshot,
870+
) -> impl Stream<Item = SessionResult<(Path, ChunkInfo)>> + 'a {
871+
updated_node_chunks_iterator(
872+
self.asset_manager.as_ref(),
873+
&self.change_set,
874+
&self.snapshot_id,
875+
node.clone(),
876+
ManifestExtents::ALL,
877+
)
878+
.await
879+
}
880+
866881
#[instrument(skip(self))]
867882
pub async fn chunk_coordinates<'a, 'b: 'a>(
868883
&'a self,
@@ -1423,8 +1438,7 @@ impl From<Session> for ChangeSet {
14231438
}
14241439

14251440
pub fn is_prefix_match(key: &str, prefix: &str) -> bool {
1426-
let tomatch =
1427-
if prefix != String::from('/') { key.strip_prefix(prefix) } else { Some(key) };
1441+
let tomatch = if prefix != "/" { key.strip_prefix(prefix) } else { Some(key) };
14281442
match tomatch {
14291443
None => false,
14301444
Some(rest) => {

icechunk/src/storage/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -907,22 +907,26 @@ pub fn new_tigris_storage(
907907
),
908908
..config
909909
};
910-
let mut extra_write_headers = Vec::with_capacity(1);
911-
let mut extra_read_headers = Vec::with_capacity(2);
910+
let mut extra_write_headers = Vec::with_capacity(2);
911+
let mut extra_read_headers = Vec::with_capacity(3);
912912

913913
if !use_weak_consistency {
914914
// TODO: Tigris will need more than this to offer good eventually consistent behavior
915915
// For example: we should use no-cache for branches and config file
916916
if let Some(region) = config.region.as_ref() {
917-
extra_write_headers.push(("X-Tigris-Region".to_string(), region.clone()));
918-
extra_read_headers.push(("X-Tigris-Region".to_string(), region.clone()));
917+
extra_write_headers.push(("X-Tigris-Regions".to_string(), region.clone()));
918+
extra_write_headers
919+
.push(("X-Tigris-Consistent".to_string(), "true".to_string()));
920+
921+
extra_read_headers.push(("X-Tigris-Regions".to_string(), region.clone()));
919922
extra_read_headers
920923
.push(("Cache-Control".to_string(), "no-cache".to_string()));
924+
extra_read_headers
925+
.push(("X-Tigris-Consistent".to_string(), "true".to_string()));
921926
} else {
922927
return Err(StorageErrorKind::Other("Tigris storage requires a region to provide full consistency. Either set the region for the bucket or use the read-only, eventually consistent storage by passing `use_weak_consistency=True` (experts only)".to_string()).into());
923928
}
924929
}
925-
926930
let st = S3Storage::new(
927931
config,
928932
bucket,

icechunk/src/store.rs

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
format::{
2525
ByteRange, ChunkIndices, ChunkOffset, Path, PathError,
2626
manifest::{ChunkPayload, VirtualChunkRef},
27-
snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot},
27+
snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot, NodeType},
2828
},
2929
refs::{RefError, RefErrorKind},
3030
repository::{RepositoryError, RepositoryErrorKind},
@@ -82,8 +82,8 @@ pub enum StoreErrorKind {
8282
SerializationError(#[from] Box<rmp_serde::encode::Error>),
8383
#[error("store method `{0}` is not implemented by Icechunk")]
8484
Unimplemented(&'static str),
85-
#[error("bad key prefix: `{0}`")]
86-
BadKeyPrefix(String),
85+
#[error("bad key prefix ({prefix}): {message}")]
86+
BadKeyPrefix { prefix: String, message: String },
8787
#[error("error during parallel execution of get_partial_values")]
8888
PartialValuesPanic,
8989
#[error("cannot write to read-only store")]
@@ -442,11 +442,14 @@ impl Store {
442442
if self.read_only().await {
443443
return Err(StoreErrorKind::ReadOnly.into());
444444
}
445-
let prefix = prefix.trim_start_matches("/").trim_end_matches("/");
445+
let prefix = prefix.trim_start_matches('/').trim_end_matches('/');
446446
// TODO: Handling preceding "/" is ugly!
447-
let path = format!("/{prefix}")
448-
.try_into()
449-
.map_err(|_| StoreErrorKind::BadKeyPrefix(prefix.to_owned()))?;
447+
let path = format!("/{prefix}").try_into().map_err(|_| {
448+
StoreErrorKind::BadKeyPrefix {
449+
prefix: prefix.to_owned(),
450+
message: "Cannot convert to a path".to_string(),
451+
}
452+
})?;
450453

451454
let mut guard = self.session.write().await;
452455
let node = guard.get_node(&path).await;
@@ -557,8 +560,6 @@ impl Store {
557560
&self,
558561
prefix: &str,
559562
) -> StoreResult<impl Stream<Item = StoreResult<String>> + Send + use<>> {
560-
// TODO: this is inefficient because it filters based on the prefix, instead of only
561-
// generating items that could potentially match
562563
let meta = self.list_metadata_prefix(prefix, false).await?;
563564
let chunks = self.list_chunks_prefix(prefix).await?;
564565
// FIXME: this is wrong, we are realizing all keys in memory
@@ -638,13 +639,13 @@ impl Store {
638639
{
639640
let trimmed = chunk_key
640641
.trim_start_matches(prefix)
641-
.trim_start_matches("/");
642+
.trim_start_matches('/');
642643
if trimmed.is_empty() {
643644
// we were provided with a prefix that is a path to a chunk key
644645
None
645646
} else if let Some((chunk_prefix, _)) =
646647
// if we can split it, this is a valid prefix to return
647-
trimmed.split_once("/")
648+
trimmed.split_once('/')
648649
{
649650
Some(ListDirItem::Prefix(
650651
chunk_prefix.to_string(),
@@ -748,13 +749,13 @@ impl Store {
748749
) -> StoreResult<impl Stream<Item = StoreResult<String>> + 'a + use<'a>> {
749750
let prefix = prefix.trim_end_matches('/');
750751
let res = try_stream! {
751-
let repository = Arc::clone(&self.session).read_owned().await;
752-
for node in repository.list_nodes().await? {
752+
let session = Arc::clone(&self.session).read_owned().await;
753+
for node in session.list_nodes().await? {
753754
// TODO: handle non-utf8?
754755
let meta_key = Key::Metadata { node_path: node?.path }.to_string();
755756
if is_prefix_match(&meta_key, prefix) {
756757
if strip_prefix {
757-
yield meta_key.trim_start_matches(prefix).trim_start_matches("/").to_string();
758+
yield meta_key.trim_start_matches(prefix).trim_start_matches('/').to_string();
758759
} else {
759760
yield meta_key;
760761
}
@@ -770,19 +771,40 @@ impl Store {
770771
) -> StoreResult<impl Stream<Item = StoreResult<String>> + 'a + use<'a>> {
771772
let prefix = prefix.trim_end_matches('/');
772773
let res = try_stream! {
773-
let repository = Arc::clone(&self.session).read_owned().await;
774-
// TODO: this is inefficient because it filters based on the prefix, instead of only
775-
// generating items that could potentially match
776-
for await maybe_path_chunk in repository.all_chunks().await.map_err(StoreError::from)? {
777-
// FIXME: utf8 handling
778-
match maybe_path_chunk {
779-
Ok((path, chunk)) => {
780-
let chunk_key = Key::Chunk { node_path: path, coords: chunk.coord }.to_string();
781-
if is_prefix_match(&chunk_key, prefix) {
782-
yield chunk_key;
783-
}
774+
let session = Arc::clone(&self.session).read_owned().await;
775+
776+
// TODO: Handling preceding "/" is ugly!
777+
let path = format!("/{}", prefix.trim_start_matches('/') ).try_into().map_err(|_| {
778+
StoreErrorKind::BadKeyPrefix {
779+
prefix: prefix.to_owned(),
780+
message: "Cannot convert to a path".to_string(),
781+
}
782+
})?;
783+
784+
if path != Path::root() {
785+
let _ = session.get_node(&path).await.map_err(|_| {
786+
StoreErrorKind::BadKeyPrefix {
787+
prefix: prefix.to_owned(),
788+
message: "Only prefixes pointing to a group or array are allowed".to_string(),
784789
}
785-
Err(err) => Err(err)?
790+
})?;
791+
}
792+
793+
for node in session.list_nodes().await? {
794+
let node = node?;
795+
if node.node_type() == NodeType::Array &&
796+
// FIXME: utf8 handling
797+
// skip the initial / in the path
798+
is_prefix_match(&node.path.to_string()[1..], prefix) {
799+
for await maybe_path_chunk in session.all_node_chunks(&node).await {
800+
match maybe_path_chunk {
801+
Ok((path, chunk)) => {
802+
let chunk_key = Key::Chunk { node_path: path, coords: chunk.coord }.to_string();
803+
yield chunk_key;
804+
}
805+
Err(err) => Err(err)?
806+
}
807+
}
786808
}
787809
}
788810
};
@@ -1227,10 +1249,7 @@ mod tests {
12271249
Ok(version1)
12281250
}
12291251

1230-
async fn keys(
1231-
store: &Store,
1232-
prefix: &str,
1233-
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
1252+
async fn keys(store: &Store, prefix: &str) -> Result<Vec<String>, StoreError> {
12341253
let mut res = store.list_prefix(prefix).await?.try_collect::<Vec<_>>().await?;
12351254
res.sort();
12361255
Ok(res)
@@ -1685,6 +1704,7 @@ mod tests {
16851704
assert!(!store.exists("zarr.json").await.unwrap());
16861705

16871706
assert_eq!(all_keys(&store).await.unwrap(), Vec::<String>::new());
1707+
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
16881708
store
16891709
.set(
16901710
"zarr.json",
@@ -1695,6 +1715,7 @@ mod tests {
16951715
assert!(!store.is_empty("").await.unwrap());
16961716
assert!(store.exists("zarr.json").await.unwrap());
16971717
assert_eq!(all_keys(&store).await.unwrap(), vec!["zarr.json".to_string()]);
1718+
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
16981719
store
16991720
.set(
17001721
"group/zarr.json",
@@ -1705,6 +1726,7 @@ mod tests {
17051726
all_keys(&store).await.unwrap(),
17061727
vec!["group/zarr.json".to_string(), "zarr.json".to_string()]
17071728
);
1729+
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
17081730
assert_eq!(
17091731
keys(&store, "group/").await.unwrap(),
17101732
vec!["group/zarr.json".to_string()]
@@ -1724,6 +1746,7 @@ mod tests {
17241746
"zarr.json".to_string()
17251747
]
17261748
);
1749+
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
17271750
assert_eq!(
17281751
keys(&store, "group/").await.unwrap(),
17291752
vec!["group/array/zarr.json".to_string(), "group/zarr.json".to_string()]
@@ -1797,6 +1820,7 @@ mod tests {
17971820
"zarr.json".to_string()
17981821
]
17991822
);
1823+
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
18001824

18011825
session.write().await.commit("foo", None).await?;
18021826

@@ -1825,6 +1849,17 @@ mod tests {
18251849
"zarr.json".to_string()
18261850
]
18271851
);
1852+
assert_eq!(all_keys(&store).await.unwrap(), keys(&store, "").await.unwrap());
1853+
1854+
for bad_prefix in ["arr", "/arr", "/arr/", "zarr", "array/c", "array/c/0"] {
1855+
assert!(matches!(
1856+
keys(&store, bad_prefix).await,
1857+
Err(StoreError {
1858+
kind: StoreErrorKind::BadKeyPrefix { message, .. },
1859+
..
1860+
}) if message.contains("group or array")
1861+
));
1862+
}
18281863

18291864
Ok(())
18301865
}

0 commit comments

Comments
 (0)