Skip to content

Commit c986ebe

Browse files
authored
Online DDL - stage 1 (#80)
1 parent d6d7eb7 commit c986ebe

File tree

15 files changed

+688
-81
lines changed

15 files changed

+688
-81
lines changed

mvclient/src/lib.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub struct MultiVersionClientConfig {
4141
pub ns_key: String,
4242

4343
pub ns_key_hashproof: Option<String>,
44+
45+
pub lock_owner: Option<String>,
4446
}
4547

4648
impl MultiVersionClientConfig {
@@ -97,6 +99,8 @@ pub struct CommitGlobalInit<'a> {
9799
pub allow_skip_idempotency_check: bool,
98100

99101
pub num_namespaces: usize,
102+
103+
pub lock_owner: Option<&'a str>,
100104
}
101105

102106
#[derive(Serialize)]
@@ -183,6 +187,10 @@ impl MultiVersionClient {
183187
.append_pair("from_version", from_version);
184188
}
185189

190+
if let Some(lock_owner) = &self.config.lock_owner {
191+
url.query_pairs_mut().append_pair("lock_owner", lock_owner);
192+
}
193+
186194
let mut boff = RandomizedExponentialBackoff::default();
187195
let stat_res: StatResponse = loop {
188196
let resp = request_and_check(self.client.get(url.clone()).decorate(self)).await?;
@@ -259,6 +267,7 @@ impl MultiVersionClient {
259267
idempotency_key: &idempotency_key[..],
260268
allow_skip_idempotency_check,
261269
num_namespaces: intents.len(),
270+
lock_owner: self.config.lock_owner.as_deref(),
262271
};
263272
allow_skip_idempotency_check = false;
264273
let mut raw_request: Vec<u8> = Vec::new();
@@ -682,10 +691,15 @@ impl Transaction {
682691
}
683692
}
684693

694+
#[derive(Error, Debug)]
695+
#[error("status {0}")]
696+
pub struct StatusCodeError(pub StatusCode);
697+
685698
async fn request_and_check(r: RequestBuilder) -> Result<Option<(HeaderMap, Bytes)>> {
686699
request_and_check_returning_status(r)
687700
.await
688-
.map_err(|e| anyhow::anyhow!("status {}", e))
701+
.map_err(StatusCodeError)
702+
.map_err(anyhow::Error::from)
689703
}
690704

691705
async fn request_and_check_returning_status(

mvfs/src/vfs.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use std::{
1212
};
1313

1414
use mvclient::{
15-
CommitOutput, MultiVersionClient, MultiVersionClientConfig, TimeToVersionResponse, Transaction,
15+
CommitOutput, MultiVersionClient, MultiVersionClientConfig, StatusCodeError,
16+
TimeToVersionResponse, Transaction,
1617
};
1718

1819
use crate::types::LockKind;
@@ -30,6 +31,7 @@ pub struct MultiVersionVfs {
3031
pub sector_size: usize,
3132
pub http_client: reqwest::Client,
3233
pub db_name_map: Arc<HashMap<String, String>>,
34+
pub lock_owner: Option<String>,
3335
}
3436

3537
impl MultiVersionVfs {
@@ -93,6 +95,7 @@ impl MultiVersionVfs {
9395
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
9496
ns_key: ns_key.to_string(),
9597
ns_key_hashproof,
98+
lock_owner: self.lock_owner.clone(),
9699
},
97100
self.http_client.clone(),
98101
)?;
@@ -604,6 +607,18 @@ impl Connection {
604607
Ok(x) => x,
605608
Err(e) => {
606609
tracing::error!(ns_key = self.client.config().ns_key, error = %e, "transaction initialization failed");
610+
611+
if let Some(sc_err) =
612+
e.chain().find_map(|x| x.downcast_ref::<StatusCodeError>())
613+
{
614+
if sc_err.0.as_u16() == 410 {
615+
tracing::error!(
616+
"this client can no longer start transaction on this database"
617+
);
618+
return Err(std::io::Error::new(std::io::ErrorKind::Other, "error"));
619+
}
620+
}
621+
607622
return Ok(false);
608623
}
609624
};

mvsqlite-fuse/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async fn main() -> Result<()> {
9696
sector_size: opt.sector_size,
9797
http_client: reqwest::Client::new(),
9898
db_name_map: Arc::new(Default::default()),
99+
lock_owner: None,
99100
};
100101
let fuse_fs = FuseFs {
101102
namespaces,

mvsqlite/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ fn init_with_options_impl(opts: InitOptions) {
101101
}
102102
let db_name_map = Arc::new(db_name_map);
103103

104+
let mut lock_owner: Option<String> = None;
105+
if let Ok(s) = std::env::var("MVSQLITE_LOCK_OWNER") {
106+
if !s.is_empty() {
107+
tracing::debug!(lock_owner = s, "configuring lock owner");
108+
lock_owner = Some(s);
109+
}
110+
}
111+
104112
let mut builder = reqwest::ClientBuilder::new();
105113
builder = builder.timeout(Duration::from_secs(timeout_secs));
106114
if force_http2 {
@@ -118,6 +126,7 @@ fn init_with_options_impl(opts: InitOptions) {
118126
sector_size,
119127
http_client: http_client.clone(),
120128
db_name_map: db_name_map.clone(),
129+
lock_owner: lock_owner.clone(),
121130
},
122131
};
123132

@@ -131,6 +140,7 @@ fn init_with_options_impl(opts: InitOptions) {
131140
sector_size,
132141
http_client: http_client.clone(),
133142
db_name_map: db_name_map.clone(),
143+
lock_owner: lock_owner.clone(),
134144
},
135145
};
136146
sqlite_vfs::register(&format!("{}-{}", VFS_NAME, sector_size), vfs, false)

mvstore-stress/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ async fn main() -> Result<()> {
7979
data_plane: vec![opt.data_plane.parse()?],
8080
ns_key: opt.ns_key.clone(),
8181
ns_key_hashproof: None,
82+
lock_owner: None,
8283
},
8384
reqwest::Client::new(),
8485
)?;

mvstore/src/commit.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use rand::RngCore;
1616
use crate::{
1717
delta::reader::DeltaReader,
1818
server::Server,
19-
util::{decode_version, generate_suffix_versionstamp_atomic_op},
19+
util::{decode_version, generate_suffix_versionstamp_atomic_op, GoneError},
2020
util::{get_txn_read_version_as_versionstamp, ContentIndex},
2121
write::{WriteApplier, WriteApplierContext, WriteRequest},
2222
};
@@ -39,6 +39,7 @@ pub struct CommitContext<'a> {
3939
pub idempotency_key: [u8; 16],
4040
pub allow_skip_idempotency_check: bool,
4141
pub namespaces: &'a [CommitNamespaceContext<'a>],
42+
pub lock_owner: Option<&'a str>,
4243
}
4344

4445
pub struct CommitNamespaceContext<'a> {
@@ -175,9 +176,31 @@ impl Server {
175176
// Phase 2 - content index insertion
176177

177178
for ns in ctx.namespaces {
178-
if let Some(md) = &ns.metadata {
179-
let metadata_key = self.key_codec.construct_nsmd_key(ns.ns_id);
180-
txn.set(&metadata_key, md.as_bytes());
179+
let metadata = self
180+
.ns_metadata_cache
181+
.get(&txn, &self.key_codec, ns.ns_id)
182+
.await?;
183+
184+
// Does the client own the lock, if any?
185+
if let Some(lock) = &metadata.lock {
186+
match ctx.lock_owner {
187+
Some(lock_owner) => {
188+
// Validate lock ownership and state
189+
if lock.owner.as_str() != lock_owner {
190+
return Err(GoneError("you no longer own the lock").into());
191+
}
192+
if lock.rolling_back {
193+
return Err(GoneError("rolling back").into());
194+
}
195+
}
196+
None => {
197+
return Ok(CommitResult::Conflict);
198+
}
199+
}
200+
} else {
201+
if ctx.lock_owner.is_some() {
202+
return Err(GoneError("you do not own the lock").into());
203+
}
181204
}
182205

183206
let mut written_pages: BTreeSet<u32> = BTreeSet::new();

mvstore/src/delta/reader.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,6 @@ impl<'a> DeltaReader<'a> {
3434
Some(x) => decode_version(x)?,
3535
None => [0xffu8; 10],
3636
};
37-
if let Some(rm) = self.replica_manager {
38-
let current_rv = rm.replica_version(self.txn).await?;
39-
let requested_rv = i64::from_be_bytes(page_version[0..8].try_into().unwrap());
40-
if current_rv < requested_rv {
41-
anyhow::bail!("this replica does not have the requested read version");
42-
}
43-
tracing::debug!(current_rv, requested_rv, "read_page_hash replica read");
44-
}
4537
let scan_end = self
4638
.key_codec
4739
.construct_page_key(self.ns_id, page_index, page_version);

mvstore/src/gc.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ use std::{
88

99
use anyhow::{Context, Result};
1010
use bloom::{BloomFilter, ASMS};
11-
use foundationdb::{
12-
future::FdbKeyValue,
13-
options::{ConflictRangeType, StreamingMode},
14-
RangeOption,
11+
use foundationdb::{future::FdbKeyValue, options::StreamingMode, RangeOption};
12+
13+
use crate::{
14+
fixed::FixedKeyVec,
15+
lock::DistributedLock,
16+
server::Server,
17+
util::{
18+
add_single_key_read_conflict_range, decode_version, extract_10_byte_suffix,
19+
get_txn_read_version_as_versionstamp, truncate_10_byte_suffix, ContentIndex,
20+
},
1521
};
1622

17-
use crate::{fixed::FixedKeyVec, lock::DistributedLock, server::Server, util::ContentIndex};
18-
1923
pub static GC_SCAN_BATCH_SIZE: AtomicUsize = AtomicUsize::new(5000);
2024
pub static GC_FRESH_PAGE_TTL_SECS: AtomicU64 = AtomicU64::new(3600);
2125

@@ -24,9 +28,32 @@ impl Server {
2428
self: Arc<Self>,
2529
dry_run: bool,
2630
ns_id: [u8; 10],
27-
before_version: [u8; 10],
31+
mut before_version: [u8; 10],
2832
mut progress_callback: impl FnMut(Option<u64>),
2933
) -> Result<()> {
34+
// Fix up `before_version` to be the minimum of the three:
35+
// - The supplied value
36+
// - The cluster's current read version as seen by this snapshot
37+
// - The NS lock version in the same snapshot
38+
{
39+
let txn = self.db.create_trx()?;
40+
41+
before_version = before_version.min(get_txn_read_version_as_versionstamp(&txn).await?);
42+
43+
let metadata = self
44+
.ns_metadata_cache
45+
.get(&txn, &self.key_codec, ns_id)
46+
.await?;
47+
if let Some(lock) = &metadata.lock {
48+
before_version = before_version.min(decode_version(&lock.snapshot_version)?);
49+
}
50+
}
51+
52+
tracing::info!(
53+
ns = hex::encode(&ns_id),
54+
before_version = hex::encode(&before_version),
55+
"starting version truncation"
56+
);
3057
let scan_start = self.key_codec.construct_page_key(ns_id, 0, [0u8; 10]);
3158
let scan_end = self
3259
.key_codec
@@ -409,15 +436,7 @@ impl Server {
409436
self.key_codec.construct_delta_referrer_key(ns_id, *hash);
410437

411438
// 3e. Add the CAM index of the remaining pages to the conflict set.
412-
txn.add_conflict_range(
413-
&ci_key,
414-
&ci_key
415-
.iter()
416-
.copied()
417-
.chain(std::iter::once(0u8))
418-
.collect::<Vec<u8>>(),
419-
ConflictRangeType::Read,
420-
)?;
439+
add_single_key_read_conflict_range(&txn, &ci_key)?;
421440

422441
// 3f. Delete the remaining pages from the CAM.
423442
txn.clear(&ci_key);
@@ -452,13 +471,3 @@ impl Server {
452471
Ok(())
453472
}
454473
}
455-
456-
fn truncate_10_byte_suffix(data: &[u8]) -> &[u8] {
457-
assert!(data.len() >= 10);
458-
&data[..data.len() - 10]
459-
}
460-
461-
fn extract_10_byte_suffix(data: &[u8]) -> [u8; 10] {
462-
assert!(data.len() >= 10);
463-
<[u8; 10]>::try_from(&data[data.len() - 10..]).unwrap()
464-
}

mvstore/src/keys.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ impl KeyCodec {
1515
key
1616
}
1717

18+
pub fn construct_nsrollbackcursor_key(&self, ns_id: [u8; 10]) -> Vec<u8> {
19+
let mut key = pack(&(self.metadata_prefix.as_str(), "nsrollbackcursor"));
20+
key.push(0x32);
21+
key.extend_from_slice(&ns_id);
22+
key
23+
}
24+
1825
pub fn construct_nstask_key(&self, ns_id: [u8; 10], task: &str) -> Vec<u8> {
1926
let mut key = pack(&(self.metadata_prefix.as_str(), "nstask"));
2027
key.push(0x32);

mvstore/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ mod fixed;
44
mod gc;
55
mod keys;
66
mod lock;
7+
mod metadata;
8+
mod nslock;
79
mod page;
810
mod replica;
911
mod server;

0 commit comments

Comments
 (0)