Skip to content

Commit 5687fee

Browse files
e1ijah1paomian
authored andcommitted
feat: enable caching when using object store (GreptimeTeam#928)
* feat: enable caching when using object store * feat: support file cache for object store * feat: maintaining the cached files with lru * fix: improve the code * empty commit * improve the code
1 parent 31f03b6 commit 5687fee

File tree

8 files changed

+300
-4
lines changed

8 files changed

+300
-4
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/datanode/src/datanode.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use crate::error::Result;
2525
use crate::instance::{Instance, InstanceRef};
2626
use crate::server::Services;
2727

28+
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
29+
2830
#[derive(Debug, Clone, Serialize, Deserialize)]
2931
#[serde(tag = "type")]
3032
pub enum ObjectStoreConfig {
@@ -48,6 +50,8 @@ pub struct S3Config {
4850
pub secret_access_key: String,
4951
pub endpoint: Option<String>,
5052
pub region: Option<String>,
53+
pub cache_path: Option<String>,
54+
pub cache_capacity: Option<ReadableSize>,
5155
}
5256

5357
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
@@ -58,6 +62,8 @@ pub struct OssConfig {
5862
pub access_key_id: String,
5963
pub access_key_secret: String,
6064
pub endpoint: String,
65+
pub cache_path: Option<String>,
66+
pub cache_capacity: Option<ReadableSize>,
6167
}
6268

6369
impl Default for ObjectStoreConfig {

src/datanode/src/instance.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::{fs, path};
1919
use backon::ExponentialBackoff;
2020
use catalog::remote::MetaKvBackend;
2121
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
22+
use common_base::readable_size::ReadableSize;
2223
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
2324
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
2425
use common_telemetry::logging::info;
@@ -28,7 +29,8 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
2829
use meta_client::MetaClientOpts;
2930
use mito::config::EngineConfig as TableEngineConfig;
3031
use mito::engine::MitoEngine;
31-
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
32+
use object_store::cache_policy::LruCachePolicy;
33+
use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
3234
use object_store::services::fs::Builder as FsBuilder;
3335
use object_store::services::oss::Builder as OSSBuilder;
3436
use object_store::services::s3::Builder as S3Builder;
@@ -42,7 +44,9 @@ use table::table::numbers::NumbersTable;
4244
use table::table::TableIdProviderRef;
4345
use table::Table;
4446

45-
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
47+
use crate::datanode::{
48+
DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
49+
};
4650
use crate::error::{
4751
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
4852
NewCatalogSnafu, OpenLogStoreSnafu, Result,
@@ -240,7 +244,44 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re
240244
config: store_config.clone(),
241245
})?;
242246

243-
Ok(ObjectStore::new(accessor))
247+
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
248+
}
249+
250+
fn create_object_store_with_cache(
251+
object_store: ObjectStore,
252+
store_config: &ObjectStoreConfig,
253+
) -> Result<ObjectStore> {
254+
let (cache_path, cache_capacity) = match store_config {
255+
ObjectStoreConfig::S3(s3_config) => {
256+
let path = s3_config.cache_path.as_ref();
257+
let capacity = s3_config
258+
.cache_capacity
259+
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
260+
(path, capacity)
261+
}
262+
ObjectStoreConfig::Oss(oss_config) => {
263+
let path = oss_config.cache_path.as_ref();
264+
let capacity = oss_config
265+
.cache_capacity
266+
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
267+
(path, capacity)
268+
}
269+
_ => (None, ReadableSize(0)),
270+
};
271+
272+
if let Some(path) = cache_path {
273+
let cache_store =
274+
ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| {
275+
error::InitBackendSnafu {
276+
config: store_config.clone(),
277+
}
278+
})?);
279+
let policy = LruCachePolicy::new(cache_capacity.0 as usize);
280+
let cache_layer = CacheLayer::new(cache_store).with_policy(policy);
281+
Ok(object_store.layer(cache_layer))
282+
} else {
283+
Ok(object_store)
284+
}
244285
}
245286

246287
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
@@ -273,7 +314,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
273314
config: store_config.clone(),
274315
})?;
275316

276-
Ok(ObjectStore::new(accessor))
317+
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
277318
}
278319

279320
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {

src/object-store/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ edition.workspace = true
55
license.workspace = true
66

77
[dependencies]
8+
lru = "0.9"
9+
async-trait = "0.1"
810
futures = { version = "0.3" }
911
opendal = { version = "0.25.1", features = [
1012
"layers-tracing",
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::num::NonZeroUsize;
16+
use std::ops::DerefMut;
17+
use std::sync::Arc;
18+
19+
use async_trait::async_trait;
20+
use futures::future::BoxFuture;
21+
use lru::LruCache;
22+
use opendal::layers::CachePolicy;
23+
use opendal::raw::output::Reader;
24+
use opendal::raw::{Accessor, RpDelete, RpRead};
25+
use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result};
26+
use tokio::sync::Mutex;
27+
28+
#[derive(Debug)]
29+
pub struct LruCachePolicy {
30+
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
31+
}
32+
33+
impl LruCachePolicy {
34+
pub fn new(capacity: usize) -> Self {
35+
Self {
36+
lru_cache: Arc::new(Mutex::new(LruCache::new(
37+
NonZeroUsize::new(capacity).unwrap(),
38+
))),
39+
}
40+
}
41+
42+
fn cache_path(&self, path: &str, args: &OpRead) -> String {
43+
format!("{}.cache-{}", path, args.range().to_header())
44+
}
45+
}
46+
47+
#[async_trait]
48+
impl CachePolicy for LruCachePolicy {
49+
fn on_read(
50+
&self,
51+
inner: Arc<dyn Accessor>,
52+
cache: Arc<dyn Accessor>,
53+
path: &str,
54+
args: OpRead,
55+
) -> BoxFuture<'static, Result<(RpRead, Reader)>> {
56+
let path = path.to_string();
57+
let cache_path = self.cache_path(&path, &args);
58+
let lru_cache = self.lru_cache.clone();
59+
Box::pin(async move {
60+
match cache.read(&cache_path, OpRead::default()).await {
61+
Ok(v) => {
62+
// update lru when cache hit
63+
let mut lru_cache = lru_cache.lock().await;
64+
lru_cache.get_or_insert(cache_path.clone(), || ());
65+
Ok(v)
66+
}
67+
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
68+
let (rp, reader) = inner.read(&path, args.clone()).await?;
69+
let size = rp.clone().into_metadata().content_length();
70+
let _ = cache
71+
.write(&cache_path, OpWrite::new(size), Box::new(reader))
72+
.await?;
73+
match cache.read(&cache_path, OpRead::default()).await {
74+
Ok(v) => {
75+
let r = {
76+
// push new cache file name to lru
77+
let mut lru_cache = lru_cache.lock().await;
78+
lru_cache.push(cache_path.clone(), ())
79+
};
80+
// delete the evicted cache file
81+
if let Some((k, _v)) = r {
82+
let _ = cache.delete(&k, OpDelete::new()).await;
83+
}
84+
Ok(v)
85+
}
86+
Err(_) => return inner.read(&path, args).await,
87+
}
88+
}
89+
Err(_) => return inner.read(&path, args).await,
90+
}
91+
})
92+
}
93+
94+
fn on_delete(
95+
&self,
96+
inner: Arc<dyn Accessor>,
97+
cache: Arc<dyn Accessor>,
98+
path: &str,
99+
args: OpDelete,
100+
) -> BoxFuture<'static, Result<RpDelete>> {
101+
let path = path.to_string();
102+
let lru_cache = self.lru_cache.clone();
103+
Box::pin(async move {
104+
let cache_files: Vec<String> = {
105+
let mut guard = lru_cache.lock().await;
106+
let lru = guard.deref_mut();
107+
let cache_files = lru
108+
.iter()
109+
.filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str()))
110+
.map(|(k, _v)| k.clone())
111+
.collect::<Vec<_>>();
112+
for k in &cache_files {
113+
lru.pop(k);
114+
}
115+
cache_files
116+
};
117+
for file in cache_files {
118+
let _ = cache.delete(&file, OpDelete::new()).await;
119+
}
120+
return inner.delete(&path, args).await;
121+
})
122+
}
123+
}

src/object-store/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@ pub use opendal::{
1717
Operator as ObjectStore, Result,
1818
};
1919
pub mod backend;
20+
pub mod cache_policy;
2021
pub mod test_util;
2122
pub mod util;

src/object-store/tests/object_store_test.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ use std::env;
1717
use anyhow::Result;
1818
use common_telemetry::logging;
1919
use object_store::backend::{fs, s3};
20+
use object_store::cache_policy::LruCachePolicy;
2021
use object_store::test_util::TempFolder;
2122
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
23+
use opendal::layers::CacheLayer;
2224
use opendal::services::oss;
25+
use opendal::Operator;
2326
use tempdir::TempDir;
2427

2528
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
@@ -160,3 +163,108 @@ async fn test_oss_backend() -> Result<()> {
160163

161164
Ok(())
162165
}
166+
167+
async fn assert_cache_files(
168+
store: &Operator,
169+
file_names: &[&str],
170+
file_contents: &[&str],
171+
) -> Result<()> {
172+
let o = store.object("/");
173+
let obs = o.list().await?;
174+
let objects = util::collect(obs).await?;
175+
176+
// compare the cache file with the expected cache file; ignore orders
177+
for o in objects {
178+
let position = file_names.iter().position(|&x| x == o.name());
179+
assert!(position.is_some(), "file not found: {}", o.name());
180+
181+
let position = position.unwrap();
182+
let bs = o.read().await?;
183+
assert_eq!(
184+
file_contents[position],
185+
String::from_utf8(bs.clone())?,
186+
"file content not match: {}",
187+
o.name()
188+
);
189+
}
190+
191+
Ok(())
192+
}
193+
194+
#[tokio::test]
195+
async fn test_object_store_cache_policy() -> Result<()> {
196+
// create file storage
197+
let root_dir = TempDir::new("test_fs_backend")?;
198+
let store = ObjectStore::new(
199+
fs::Builder::default()
200+
.root(&root_dir.path().to_string_lossy())
201+
.atomic_write_dir(&root_dir.path().to_string_lossy())
202+
.build()?,
203+
);
204+
205+
// create file cache layer
206+
let cache_dir = TempDir::new("test_fs_cache")?;
207+
let cache_op = ObjectStore::new(
208+
fs::Builder::default()
209+
.root(&cache_dir.path().to_string_lossy())
210+
.atomic_write_dir(&cache_dir.path().to_string_lossy())
211+
.build()?,
212+
);
213+
// create operator for cache dir to verify cache file
214+
let cache_store = ObjectStore::from(cache_op.inner());
215+
let policy = LruCachePolicy::new(3);
216+
let store = store.layer(CacheLayer::new(cache_op).with_policy(policy));
217+
218+
// create several object handler.
219+
let o1 = store.object("test_file1");
220+
let o2 = store.object("test_file2");
221+
222+
// write data into object;
223+
assert!(o1.write("Hello, object1!").await.is_ok());
224+
assert!(o2.write("Hello, object2!").await.is_ok());
225+
226+
// crate cache by read object
227+
o1.range_read(7..).await?;
228+
o1.read().await?;
229+
o2.range_read(7..).await?;
230+
o2.read().await?;
231+
232+
assert_cache_files(
233+
&cache_store,
234+
&[
235+
"test_file1.cache-bytes=0-",
236+
"test_file2.cache-bytes=7-",
237+
"test_file2.cache-bytes=0-",
238+
],
239+
&["Hello, object1!", "object2!", "Hello, object2!"],
240+
)
241+
.await?;
242+
243+
assert!(o2.delete().await.is_ok());
244+
245+
assert_cache_files(
246+
&cache_store,
247+
&["test_file1.cache-bytes=0-"],
248+
&["Hello, object1!"],
249+
)
250+
.await?;
251+
252+
let o3 = store.object("test_file3");
253+
assert!(o3.write("Hello, object3!").await.is_ok());
254+
255+
o3.read().await?;
256+
o3.range_read(0..5).await?;
257+
258+
assert_cache_files(
259+
&cache_store,
260+
&[
261+
"test_file1.cache-bytes=0-",
262+
"test_file3.cache-bytes=0-",
263+
"test_file3.cache-bytes=0-4",
264+
],
265+
&["Hello, object1!", "Hello, object3!", "Hello"],
266+
)
267+
.await?;
268+
269+
Ok(())
270+
}

0 commit comments

Comments
 (0)