@@ -45,334 +45,3 @@ impl StoreManager for DelegatingStoreManager {
45
45
None
46
46
}
47
47
}
48
-
49
- /// Wrap each `Store` produced by the inner `StoreManager` in an asynchronous,
50
- /// write-behind cache.
51
- ///
52
- /// This serves two purposes:
53
- ///
54
- /// - Improve performance with slow and/or distant stores
55
- ///
56
- /// - Provide a relaxed consistency guarantee vs. what a fully synchronous store
57
- /// provides
58
- ///
59
- /// The latter is intended to prevent guests from coming to rely on the
60
- /// synchronous consistency model of an existing implementation which may later
61
- /// be replaced with one providing a more relaxed, asynchronous (i.e.
62
- /// "eventual") consistency model. See also <https://www.hyrumslaw.com/> and
63
- /// <https://xkcd.com/1172/>.
64
- ///
65
- /// This implementation provides a "read-your-writes", asynchronous consistency
66
- /// model such that values are immediately available for reading as soon as they
67
- /// are written as long as the read(s) hit the same cache as the write(s).
68
- /// Reads and writes through separate caches (e.g. separate guest instances or
69
- /// separately-opened references to the same store within a single instance) are
70
- /// _not_ guaranteed to be consistent; not only is cross-cache consistency
71
- /// subject to scheduling and/or networking delays, a given tuple is never
72
- /// refreshed from the backing store once added to a cache since this
73
- /// implementation is intended for use only by short-lived guest instances.
74
- ///
75
- /// Note that, because writes are asynchronous and return immediately,
76
- /// durability is _not_ guaranteed. I/O errors may occur asynchronously after
77
- /// the write operation has returned control to the guest, which may result in
78
- /// the write being lost without the guest knowing. In the future, a separate
79
- /// `write-durable` function could be added to key-value.wit to provide either
80
- /// synchronous or asynchronous feedback on durability for guests which need it.
81
- pub struct CachingStoreManager < T > {
82
- capacity : NonZeroUsize ,
83
- inner : T ,
84
- }
85
-
86
- const DEFAULT_CACHE_SIZE : usize = 256 ;
87
-
88
- impl < T > CachingStoreManager < T > {
89
- pub fn new ( inner : T ) -> Self {
90
- Self :: new_with_capacity ( NonZeroUsize :: new ( DEFAULT_CACHE_SIZE ) . unwrap ( ) , inner)
91
- }
92
-
93
- pub fn new_with_capacity ( capacity : NonZeroUsize , inner : T ) -> Self {
94
- Self { capacity, inner }
95
- }
96
- }
97
-
98
- #[ async_trait]
99
- impl < T : StoreManager > StoreManager for CachingStoreManager < T > {
100
- async fn get ( & self , name : & str ) -> Result < Arc < dyn Store > , Error > {
101
- Ok ( Arc :: new ( CachingStore {
102
- inner : self . inner . get ( name) . await ?,
103
- state : Arc :: new ( AsyncMutex :: new ( CachingStoreState {
104
- cache : LruCache :: new ( self . capacity ) ,
105
- previous_task : None ,
106
- } ) ) ,
107
- } ) )
108
- }
109
-
110
- fn is_defined ( & self , store_name : & str ) -> bool {
111
- self . inner . is_defined ( store_name)
112
- }
113
-
114
- fn summary ( & self , store_name : & str ) -> Option < String > {
115
- self . inner . summary ( store_name)
116
- }
117
- }
118
-
119
- struct CachingStoreState {
120
- cache : LruCache < String , Option < Vec < u8 > > > ,
121
- previous_task : Option < JoinHandle < Result < ( ) , Error > > > ,
122
- }
123
-
124
- impl CachingStoreState {
125
- /// Wrap the specified task in an outer task which waits for `self.previous_task` before proceeding, and spawn
126
- /// the result. This ensures that write order is preserved.
127
- fn spawn ( & mut self , task : impl Future < Output = Result < ( ) , Error > > + Send + ' static ) {
128
- let previous_task = self . previous_task . take ( ) ;
129
- let task = async move {
130
- if let Some ( previous_task) = previous_task {
131
- previous_task
132
- . await
133
- . map_err ( |e| Error :: Other ( format ! ( "{e:?}" ) ) ) ??
134
- }
135
-
136
- task. await
137
- } ;
138
- self . previous_task = Some ( task:: spawn ( task. in_current_span ( ) ) )
139
- }
140
-
141
- async fn flush ( & mut self ) -> Result < ( ) , Error > {
142
- if let Some ( previous_task) = self . previous_task . take ( ) {
143
- previous_task
144
- . await
145
- . map_err ( |e| Error :: Other ( format ! ( "{e:?}" ) ) ) ??
146
- }
147
-
148
- Ok ( ( ) )
149
- }
150
- }
151
-
152
- struct CachingStore {
153
- inner : Arc < dyn Store > ,
154
- state : Arc < AsyncMutex < CachingStoreState > > ,
155
- }
156
-
157
- #[ async_trait]
158
- impl Store for CachingStore {
159
- async fn after_open ( & self ) -> Result < ( ) , Error > {
160
- self . inner . after_open ( ) . await
161
- }
162
-
163
- async fn get ( & self , key : & str ) -> Result < Option < Vec < u8 > > , Error > {
164
- // Retrieve the specified value from the cache, lazily populating the cache as necessary.
165
-
166
- let mut state = self . state . lock ( ) . await ;
167
-
168
- if let Some ( value) = state. cache . get ( key) . cloned ( ) {
169
- return Ok ( value) ;
170
- }
171
-
172
- // Flush any outstanding writes prior to reading from store. This is necessary because we need to
173
- // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
174
- // cache prior to their corresponding writes reaching the backing store.
175
- state. flush ( ) . await ?;
176
-
177
- let value = self . inner . get ( key) . await ?;
178
-
179
- state. cache . put ( key. to_owned ( ) , value. clone ( ) ) ;
180
-
181
- Ok ( value)
182
- }
183
-
184
- async fn set ( & self , key : & str , value : & [ u8 ] ) -> Result < ( ) , Error > {
185
- // Update the cache and spawn a task to update the backing store asynchronously.
186
-
187
- let mut state = self . state . lock ( ) . await ;
188
-
189
- state. cache . put ( key. to_owned ( ) , Some ( value. to_owned ( ) ) ) ;
190
-
191
- let inner = self . inner . clone ( ) ;
192
- let key = key. to_owned ( ) ;
193
- let value = value. to_owned ( ) ;
194
- state. spawn ( async move { inner. set ( & key, & value) . await } ) ;
195
-
196
- Ok ( ( ) )
197
- }
198
-
199
- async fn delete ( & self , key : & str ) -> Result < ( ) , Error > {
200
- // Update the cache and spawn a task to update the backing store asynchronously.
201
-
202
- let mut state = self . state . lock ( ) . await ;
203
-
204
- state. cache . put ( key. to_owned ( ) , None ) ;
205
-
206
- let inner = self . inner . clone ( ) ;
207
- let key = key. to_owned ( ) ;
208
- state. spawn ( async move { inner. delete ( & key) . await } ) ;
209
-
210
- Ok ( ( ) )
211
- }
212
-
213
- async fn exists ( & self , key : & str ) -> Result < bool , Error > {
214
- Ok ( self . get ( key) . await ?. is_some ( ) )
215
- }
216
-
217
- async fn get_keys ( & self ) -> Result < Vec < String > , Error > {
218
- // Get the keys from the backing store, remove any which are `None` in the cache, and add any which are
219
- // `Some` in the cache, returning the result.
220
- //
221
- // Note that we don't bother caching the result, since we expect this function won't be called more than
222
- // once for a given store in normal usage, and maintaining consistency would be complicated.
223
-
224
- let mut state = self . state . lock ( ) . await ;
225
-
226
- // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior
227
- // to their corresponding writes reaching the backing store.
228
- state. flush ( ) . await ?;
229
-
230
- Ok ( self
231
- . inner
232
- . get_keys ( )
233
- . await ?
234
- . into_iter ( )
235
- . filter ( |k| {
236
- state
237
- . cache
238
- . peek ( k)
239
- . map ( |v| v. as_ref ( ) . is_some ( ) )
240
- . unwrap_or ( true )
241
- } )
242
- . chain (
243
- state
244
- . cache
245
- . iter ( )
246
- . filter_map ( |( k, v) | v. as_ref ( ) . map ( |_| k. to_owned ( ) ) ) ,
247
- )
248
- . collect :: < HashSet < _ > > ( )
249
- . into_iter ( )
250
- . collect ( ) )
251
- }
252
-
253
- async fn get_many (
254
- & self ,
255
- keys : Vec < String > ,
256
- ) -> anyhow:: Result < Vec < ( String , Option < Vec < u8 > > ) > , Error > {
257
- let mut state = self . state . lock ( ) . await ;
258
-
259
- // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior
260
- // to their corresponding writes reaching the backing store.
261
- state. flush ( ) . await ?;
262
-
263
- let mut found: Vec < ( String , Option < Vec < u8 > > ) > = Vec :: new ( ) ;
264
- let mut not_found: Vec < String > = Vec :: new ( ) ;
265
- for key in keys {
266
- match state. cache . get ( key. as_str ( ) ) {
267
- Some ( Some ( value) ) => found. push ( ( key, Some ( value. clone ( ) ) ) ) ,
268
- _ => not_found. push ( key) ,
269
- }
270
- }
271
-
272
- if !not_found. is_empty ( ) {
273
- let keys_and_values = self . inner . get_many ( not_found) . await ?;
274
- for ( key, value) in keys_and_values {
275
- found. push ( ( key. clone ( ) , value. clone ( ) ) ) ;
276
- state. cache . put ( key, value) ;
277
- }
278
- }
279
-
280
- Ok ( found)
281
- }
282
-
283
- async fn set_many ( & self , key_values : Vec < ( String , Vec < u8 > ) > ) -> anyhow:: Result < ( ) , Error > {
284
- let mut state = self . state . lock ( ) . await ;
285
-
286
- for ( key, value) in key_values. clone ( ) {
287
- state. cache . put ( key, Some ( value) ) ;
288
- }
289
-
290
- self . inner . set_many ( key_values) . await
291
- }
292
-
293
- async fn delete_many ( & self , keys : Vec < String > ) -> anyhow:: Result < ( ) , Error > {
294
- let mut state = self . state . lock ( ) . await ;
295
-
296
- for key in keys. clone ( ) {
297
- state. cache . put ( key, None ) ;
298
- }
299
-
300
- self . inner . delete_many ( keys) . await
301
- }
302
-
303
- async fn increment ( & self , key : String , delta : i64 ) -> anyhow:: Result < i64 , Error > {
304
- let mut state = self . state . lock ( ) . await ;
305
- let counter = self . inner . increment ( key. clone ( ) , delta) . await ?;
306
- state
307
- . cache
308
- . put ( key, Some ( i64:: to_le_bytes ( counter) . to_vec ( ) ) ) ;
309
- Ok ( counter)
310
- }
311
-
312
- async fn new_compare_and_swap (
313
- & self ,
314
- bucket_rep : u32 ,
315
- key : & str ,
316
- ) -> anyhow:: Result < Arc < dyn Cas > , Error > {
317
- let inner = self . inner . new_compare_and_swap ( bucket_rep, key) . await ?;
318
- Ok ( Arc :: new ( CompareAndSwap {
319
- bucket_rep,
320
- state : self . state . clone ( ) ,
321
- key : key. to_string ( ) ,
322
- inner_cas : inner,
323
- } ) )
324
- }
325
- }
326
-
327
- struct CompareAndSwap {
328
- bucket_rep : u32 ,
329
- key : String ,
330
- state : Arc < AsyncMutex < CachingStoreState > > ,
331
- inner_cas : Arc < dyn Cas > ,
332
- }
333
-
334
- #[ async_trait]
335
- impl Cas for CompareAndSwap {
336
- async fn current ( & self ) -> anyhow:: Result < Option < Vec < u8 > > , Error > {
337
- let mut state = self . state . lock ( ) . await ;
338
- state. flush ( ) . await ?;
339
- let res = self . inner_cas . current ( ) . await ;
340
- match res. clone ( ) {
341
- Ok ( value) => {
342
- state. cache . put ( self . key . clone ( ) , value. clone ( ) ) ;
343
- state. flush ( ) . await ?;
344
- Ok ( value)
345
- }
346
- Err ( err) => Err ( err) ,
347
- } ?;
348
- res
349
- }
350
-
351
- async fn swap ( & self , value : Vec < u8 > ) -> anyhow:: Result < ( ) , SwapError > {
352
- let mut state = self . state . lock ( ) . await ;
353
- state
354
- . flush ( )
355
- . await
356
- . map_err ( |_e| SwapError :: Other ( "failed flushing" . to_string ( ) ) ) ?;
357
- let res = self . inner_cas . swap ( value. clone ( ) ) . await ;
358
- match res {
359
- Ok ( ( ) ) => {
360
- state. cache . put ( self . key . clone ( ) , Some ( value) ) ;
361
- state
362
- . flush ( )
363
- . await
364
- . map_err ( |_e| SwapError :: Other ( "failed flushing" . to_string ( ) ) ) ?;
365
- Ok ( ( ) )
366
- }
367
- Err ( err) => Err ( err) ,
368
- }
369
- }
370
-
371
- async fn bucket_rep ( & self ) -> u32 {
372
- self . bucket_rep
373
- }
374
-
375
- async fn key ( & self ) -> String {
376
- self . key . clone ( )
377
- }
378
- }
0 commit comments