Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions core/integration/tests/server/concurrent_addition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,10 @@ async fn matrix(
) {
// TODO: Need to do this, in order to avoid timeouts from QUIC connections during tests.
let mut extra_envs = std::collections::HashMap::new();
extra_envs.insert(
"IGGY_QUIC_MAX_IDLE_TIMEOUT".to_string(),
"500s".to_string(),
);
extra_envs.insert("IGGY_QUIC_MAX_IDLE_TIMEOUT".to_string(), "500s".to_string());
extra_envs.insert(
"IGGY_QUIC_KEEP_ALIVE_INTERVAL".to_string(),
"15s".to_string(),
"15s".to_string(),
);
let mut test_server = TestServer::new(
Some(extra_envs),
Expand Down
38 changes: 19 additions & 19 deletions core/integration/tests/server/scenarios/encryption_scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
login_root(&client).await;

// 3. Create test stream and topic
let stream_id = 1001;
let topic_id = 1;
let stream_name = "test-stream-api";
let topic_name = "test-topic-api";
let partition_count = 1;

client.create_stream("test-stream-api").await.unwrap();
client.create_stream(stream_name).await.unwrap();
client
.create_topic(
&Identifier::numeric(stream_id).unwrap(),
"test-topic-api",
&Identifier::named(stream_name).unwrap(),
topic_name,
partition_count,
CompressionAlgorithm::default(),
None,
Expand Down Expand Up @@ -128,8 +128,8 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp

client
.send_messages(
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
&Partitioning::partition_id(0),
&mut messages_batch_1,
)
Expand All @@ -139,9 +139,9 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
// 5. Flush and get initial stats
client
.flush_unsaved_buffer(
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
1,
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
0,
true, // Force flush
)
.await
Expand All @@ -158,8 +158,8 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
let consumer = Consumer::default();
let polled = client
.poll_messages(
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
Some(0),
&consumer,
&PollingStrategy::offset(0),
Expand Down Expand Up @@ -273,8 +273,8 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp

client
.send_messages(
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
&Partitioning::partition_id(0), // Use specific partition for testing
&mut messages_batch_2,
)
Expand All @@ -284,9 +284,9 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
// Flush the buffer after sending second batch
client
.flush_unsaved_buffer(
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
1,
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
0,
true, // Force flush
)
.await
Expand All @@ -298,8 +298,8 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
// 10. Poll all messages (both batches) and verify
let polled = client
.poll_messages(
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
Some(0),
&consumer,
&PollingStrategy::offset(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn run(client: &IggyClient) {
.send_messages(
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
&Partitioning::partition_id(1),
&Partitioning::partition_id(0),
&mut messages,
)
.await
Expand Down
1 change: 0 additions & 1 deletion core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ serde = { workspace = true }
serde_with = { workspace = true }
static-toml = "1.3.0"
slab = "0.4.10"
sharded_queue = "2.0.1"
strum = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ impl ServerCommandHandler for CreatePartitions {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");

// Acquire partition lock to serialize filesystem operations
let _partition_guard = shard.fs_locks.partition_lock.lock().await;

let partitions = shard
.create_partitions2(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl ServerCommandHandler for CreateStream {
{
// Acquire stream lock to serialize filesystem operations
let _stream_guard = shard.fs_locks.stream_lock.lock().await;

let stream = shard.create_stream2(session, name).await?;
let created_stream_id = stream.id();

Expand Down
46 changes: 40 additions & 6 deletions core/server/src/binary/handlers/system/get_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::sender::SenderKind;
use crate::shard::IggyShard;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
use crate::streaming::session::Session;
use error_set::ErrContext;
use iggy_common::IggyError;
use iggy_common::get_stats::GetStats;
use iggy_common::{Identifier, IggyError};
use std::rc::Rc;
use tracing::debug;

Expand All @@ -43,11 +47,41 @@ impl ServerCommandHandler for GetStats {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");

let stats = shard.get_stats().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get stats, session: {session}")
})?;
let bytes = mapper::map_stats(&stats);
sender.send_ok_response(&bytes).await?;
// Route GetStats to shard0 only
let request = ShardRequest {
stream_id: Identifier::default(),
topic_id: Identifier::default(),
partition_id: 0,
payload: ShardRequestPayload::GetStats {
user_id: session.get_user_id(),
},
};

let message = ShardMessage::Request(request);
match shard.send_request_to_shard_or_recoil(None, message).await? {
ShardSendRequestResult::Recoil(_) => {
let stats = shard.get_stats().await.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get stats, session: {session}"
)
})?;
let bytes = mapper::map_stats(&stats);
sender.send_ok_response(&bytes).await?;
}
ShardSendRequestResult::Response(response) => match response {
ShardResponse::GetStatsResponse(stats) => {
let bytes = mapper::map_stats(&stats);
sender.send_ok_response(&bytes).await?;
}
ShardResponse::ErrorResponse(err) => {
return Err(err);
}
_ => unreachable!(
"Expected a GetStatsResponse inside of GetStats handler, impossible state"
),
},
}

Ok(())
}
}
Expand Down
5 changes: 1 addition & 4 deletions core/server/src/binary/handlers/system/ping_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ impl ServerCommandHandler for Ping {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
if let Some(mut client) = shard
.client_manager
.try_get_client_mut(session.client_id)
{
if let Some(mut client) = shard.client_manager.try_get_client_mut(session.client_id) {
let now = IggyTimestamp::now();
client.last_heartbeat = now;
debug!("Updated last heartbeat to: {now} for session: {session}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ServerCommandHandler for CreateTopic {
{
// Acquire topic lock to serialize filesystem operations
let _topic_guard = shard.fs_locks.topic_lock.lock().await;

let topic = shard
.create_topic2(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ impl ServerCommandHandler for DeleteTopic {
// we end up in a state where the topic is deleted from the disk, but during state recreation it would be recreated,
// without it's segments.
debug!("session: {session}, command: {self}");

// Acquire topic lock to serialize filesystem operations
let _topic_guard = shard.fs_locks.topic_lock.lock().await;

let topic = shard
.delete_topic2(session, &self.stream_id, &self.topic_id)
.await?;
Expand Down
14 changes: 10 additions & 4 deletions core/server/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub fn create_shard_connections(

// Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU core numbers
let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count)
.map(|idx| ShardConnector::new(idx as u16, shards_count))
.map(|idx| ShardConnector::new(idx as u16))
.collect();

let shutdown_handles = connectors
Expand Down Expand Up @@ -648,9 +648,15 @@ async fn load_partition(

let start_offset = log_file_name.parse::<u64>().unwrap();

let messages_file_path =
format!("{}/{}.{}", partition_path, start_offset, LOG_EXTENSION);
std::fs::metadata(&messages_file_path).is_ok_and(|metadata| metadata.len() > 0)
let messages_file_path = config.get_messages_file_path(
stream_id,
topic_id,
partition_id as usize,
start_offset,
);
let metadata = std::fs::metadata(&messages_file_path)
.expect("failed to get metadata for first segment in log");
metadata.len() > 0
})
.unwrap_or_else(|| false);

Expand Down
12 changes: 6 additions & 6 deletions core/server/src/http/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ async fn delete_consumer_group(
// Get members from the deleted consumer group and make them leave
let slab = consumer_group.members().inner().shared_get();
for (_, member) in slab.iter() {
if let Err(err) = state
.shard
.shard()
.client_manager
.leave_consumer_group(member.client_id, stream_id_usize, topic_id_usize, cg_id)
{
if let Err(err) = state.shard.shard().client_manager.leave_consumer_group(
member.client_id,
stream_id_usize,
topic_id_usize,
cg_id,
) {
tracing::warn!(
"{COMPONENT} (error: {err}) - failed to make client leave consumer group for client ID: {}, group ID: {}",
member.client_id,
Expand Down
8 changes: 8 additions & 0 deletions core/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use server::slab::traits_ext::{
};
use server::state::file::FileState;
use server::state::system::SystemState;
use server::streaming::clients::client_manager::{Client, ClientManager};
use server::streaming::diagnostics::metrics::Metrics;
use server::streaming::storage::SystemStorage;
use server::streaming::utils::MemoryPool;
Expand Down Expand Up @@ -255,6 +256,11 @@ async fn main() -> Result<(), ServerError> {
let shards_table = Box::leak(shards_table);
let shards_table: EternalPtr<DashMap<IggyNamespace, ShardInfo>> = shards_table.into();

let client_manager = Box::new(DashMap::new());
let client_manager = Box::leak(client_manager);
let client_manager: EternalPtr<DashMap<u32, Client>> = client_manager.into();
let client_manager = ClientManager::new(client_manager);

streams.with_components(|components| {
let (root, ..) = components.into_components();
for (_, stream) in root.iter() {
Expand Down Expand Up @@ -297,6 +303,7 @@ async fn main() -> Result<(), ServerError> {
state_persister,
encryptor.clone(),
);
let client_manager = client_manager.clone();

// TODO: Explore decoupling the `Log` from `Partition` entity.
// Ergh... I knew this will backfire to include `Log` as part of the `Partition` entity,
Expand Down Expand Up @@ -335,6 +342,7 @@ async fn main() -> Result<(), ServerError> {
.users(users)
.shards_table(shards_table)
.connections(connections)
.clients_manager(client_manager)
.config(config)
.encryptor(encryptor)
.version(current_version)
Expand Down
4 changes: 2 additions & 2 deletions core/server/src/quic/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async fn handle_stream(

trace!("Received a QUIC command: {command}, payload size: {length}");

match command.handle(&mut sender, length, &session, &shard).await {
match command.handle(&mut sender, length, session, &shard).await {
Ok(_) => {
trace!(
"Command was handled successfully, session: {:?}. QUIC response was sent.",
Expand All @@ -217,4 +217,4 @@ async fn handle_stream(
}
}
}
}
}
Loading