Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions core/integration/tests/server/concurrent_addition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,10 @@ async fn matrix(
) {
// TODO: Need to do this, in order to avoid timeouts from QUIC connections during tests.
let mut extra_envs = std::collections::HashMap::new();
extra_envs.insert(
"IGGY_QUIC_MAX_IDLE_TIMEOUT".to_string(),
"500s".to_string(),
);
extra_envs.insert("IGGY_QUIC_MAX_IDLE_TIMEOUT".to_string(), "500s".to_string());
extra_envs.insert(
"IGGY_QUIC_KEEP_ALIVE_INTERVAL".to_string(),
"15s".to_string(),
"15s".to_string(),
);
let mut test_server = TestServer::new(
Some(extra_envs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn run(client: &IggyClient) {
.send_messages(
&Identifier::named(stream_name).unwrap(),
&Identifier::named(topic_name).unwrap(),
&Partitioning::partition_id(1),
&Partitioning::partition_id(0),
&mut messages,
)
.await
Expand Down
1 change: 0 additions & 1 deletion core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ serde = { workspace = true }
serde_with = { workspace = true }
static-toml = "1.3.0"
slab = "0.4.10"
sharded_queue = "2.0.1"
strum = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ impl ServerCommandHandler for CreatePartitions {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");

// Acquire partition lock to serialize filesystem operations
let _partition_guard = shard.fs_locks.partition_lock.lock().await;

let partitions = shard
.create_partitions2(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl ServerCommandHandler for CreateStream {
{
// Acquire stream lock to serialize filesystem operations
let _stream_guard = shard.fs_locks.stream_lock.lock().await;

let stream = shard.create_stream2(session, name).await?;
let created_stream_id = stream.id();

Expand Down
46 changes: 40 additions & 6 deletions core/server/src/binary/handlers/system/get_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::sender::SenderKind;
use crate::shard::IggyShard;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
use crate::streaming::session::Session;
use error_set::ErrContext;
use iggy_common::IggyError;
use iggy_common::get_stats::GetStats;
use iggy_common::{Identifier, IggyError};
use std::rc::Rc;
use tracing::debug;

Expand All @@ -43,11 +47,41 @@ impl ServerCommandHandler for GetStats {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");

let stats = shard.get_stats().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get stats, session: {session}")
})?;
let bytes = mapper::map_stats(&stats);
sender.send_ok_response(&bytes).await?;
// Route GetStats to shard0 only
let request = ShardRequest {
stream_id: Identifier::default(),
topic_id: Identifier::default(),
partition_id: 0,
payload: ShardRequestPayload::GetStats {
user_id: session.get_user_id(),
},
};

let message = ShardMessage::Request(request);
match shard.send_request_to_shard_or_recoil(None, message).await? {
ShardSendRequestResult::Recoil(_) => {
let stats = shard.get_stats().await.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get stats, session: {session}"
)
})?;
let bytes = mapper::map_stats(&stats);
sender.send_ok_response(&bytes).await?;
}
ShardSendRequestResult::Response(response) => match response {
ShardResponse::GetStatsResponse(stats) => {
let bytes = mapper::map_stats(&stats);
sender.send_ok_response(&bytes).await?;
}
ShardResponse::ErrorResponse(err) => {
return Err(err);
}
_ => unreachable!(
"Expected a GetStatsResponse inside of GetStats handler, impossible state"
),
},
}

Ok(())
}
}
Expand Down
5 changes: 1 addition & 4 deletions core/server/src/binary/handlers/system/ping_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ impl ServerCommandHandler for Ping {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
if let Some(mut client) = shard
.client_manager
.try_get_client_mut(session.client_id)
{
if let Some(mut client) = shard.client_manager.try_get_client_mut(session.client_id) {
let now = IggyTimestamp::now();
client.last_heartbeat = now;
debug!("Updated last heartbeat to: {now} for session: {session}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ServerCommandHandler for CreateTopic {
{
// Acquire topic lock to serialize filesystem operations
let _topic_guard = shard.fs_locks.topic_lock.lock().await;

let topic = shard
.create_topic2(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ impl ServerCommandHandler for DeleteTopic {
// we end up in a state where the topic is deleted from the disk, but during state recreation it would be recreated,
// without it's segments.
debug!("session: {session}, command: {self}");

// Acquire topic lock to serialize filesystem operations
let _topic_guard = shard.fs_locks.topic_lock.lock().await;

let topic = shard
.delete_topic2(session, &self.stream_id, &self.topic_id)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion core/server/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub fn create_shard_connections(

// Create connectors with sequential IDs (0, 1, 2, ...) regardless of CPU core numbers
let connectors: Vec<ShardConnector<ShardFrame>> = (0..shards_count)
.map(|idx| ShardConnector::new(idx as u16, shards_count))
.map(|idx| ShardConnector::new(idx as u16))
.collect();

let shutdown_handles = connectors
Expand Down
12 changes: 6 additions & 6 deletions core/server/src/http/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ async fn delete_consumer_group(
// Get members from the deleted consumer group and make them leave
let slab = consumer_group.members().inner().shared_get();
for (_, member) in slab.iter() {
if let Err(err) = state
.shard
.shard()
.client_manager
.leave_consumer_group(member.client_id, stream_id_usize, topic_id_usize, cg_id)
{
if let Err(err) = state.shard.shard().client_manager.leave_consumer_group(
member.client_id,
stream_id_usize,
topic_id_usize,
cg_id,
) {
tracing::warn!(
"{COMPONENT} (error: {err}) - failed to make client leave consumer group for client ID: {}, group ID: {}",
member.client_id,
Expand Down
4 changes: 2 additions & 2 deletions core/server/src/quic/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async fn handle_stream(

trace!("Received a QUIC command: {command}, payload size: {length}");

match command.handle(&mut sender, length, &session, &shard).await {
match command.handle(&mut sender, length, session, &shard).await {
Ok(_) => {
trace!(
"Command was handled successfully, session: {:?}. QUIC response was sent.",
Expand All @@ -217,4 +217,4 @@ async fn handle_stream(
}
}
}
}
}
12 changes: 7 additions & 5 deletions core/server/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ impl Shard {

pub async fn send_request(&self, message: ShardMessage) -> Result<ShardResponse, IggyError> {
let (sender, receiver) = async_channel::bounded(1);
self.connection
let _ = self
.connection
.sender
.send(ShardFrame::new(message, Some(sender.clone()))); // Apparently sender needs to be cloned, otherwise channel will close...
//TODO: Fixme
Expand Down Expand Up @@ -312,10 +313,6 @@ impl IggyShard {
pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> {
let now: Instant = Instant::now();

// Workaround to ensure that the statistics are initialized before the server
// loads streams and starts accepting connections. This is necessary to
// have the correct statistics when the server starts.
self.get_stats().await?;
shard_info!(self.id, "Starting...");
self.init().await?;

Expand Down Expand Up @@ -636,6 +633,11 @@ impl IggyShard {

Ok(ShardResponse::CreateUserResponse(user))
}
ShardRequestPayload::GetStats { .. } => {
assert_eq!(self.id, 0, "GetStats should only be handled by shard0");
let stats = self.get_stats().await?;
Ok(ShardResponse::GetStatsResponse(stats))
}
}
}

Expand Down
7 changes: 2 additions & 5 deletions core/server/src/shard/system/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,8 @@ impl IggyShard {
);

// Clean up ClientManager state
self.client_manager.delete_consumer_group(
stream_id_value,
topic_id_value,
group_id_value,
);
self.client_manager
.delete_consumer_group(stream_id_value, topic_id_value, group_id_value);

cg
}
Expand Down
22 changes: 13 additions & 9 deletions core/server/src/shard/system/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@ use std::cell::RefCell;
use sysinfo::{Pid, ProcessesToUpdate, System as SysinfoSystem};

thread_local! {
static SYSINFO: RefCell<SysinfoSystem> = {
let mut sys = SysinfoSystem::new_all();
sys.refresh_all();
RefCell::new(sys)
};
static SYSINFO: RefCell<Option<SysinfoSystem>> = const { RefCell::new(None) };
}

impl IggyShard {
pub async fn get_stats(&self) -> Result<Stats, IggyError> {
SYSINFO.with(|sysinfo| {
let mut sys = sysinfo.borrow_mut();
assert_eq!(self.id, 0, "GetStats should only be called on shard0");

SYSINFO.with(|sysinfo_cell| {
let mut sysinfo_opt = sysinfo_cell.borrow_mut();

if sysinfo_opt.is_none() {
let mut sys = SysinfoSystem::new_all();
sys.refresh_all();
*sysinfo_opt = Some(sys);
}

let sys = sysinfo_opt.as_mut().unwrap();
let process_id = std::process::id();
sys.refresh_cpu_all();
sys.refresh_memory();
Expand Down Expand Up @@ -87,8 +93,6 @@ impl IggyShard {
stats.written_bytes = disk_usage.total_written_bytes.into();
}

drop(sys);

self.streams2.with_components(|stream_components| {
let (stream_roots, stream_stats) = stream_components.into_components();
// Iterate through all streams
Expand Down
2 changes: 1 addition & 1 deletion core/server/src/shard/system/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ mod tests {
Arc::new(PersisterKind::File(FilePersister)),
None,
);
let connections = vec![ShardConnector::new(0, 1)];
let connections = vec![ShardConnector::new(0)];

let builder = IggyShard::builder();
let shard = builder
Expand Down
13 changes: 8 additions & 5 deletions core/server/src/shard/tasks/continuous/message_pump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::shard::IggyShard;
use crate::shard::task_registry::ShutdownToken;
use crate::shard::transmission::frame::ShardFrame;
use crate::{shard_debug, shard_info};
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use std::rc::Rc;

pub fn spawn_message_pump(shard: Rc<IggyShard>) {
Expand All @@ -37,29 +37,32 @@ async fn message_pump(
shard: Rc<IggyShard>,
shutdown: ShutdownToken,
) -> Result<(), iggy_common::IggyError> {
let Some(mut messages_receiver) = shard.messages_receiver.take() else {
let Some(messages_receiver) = shard.messages_receiver.take() else {
shard_info!(shard.id, "Message receiver already taken; pump not started");
return Ok(());
};

shard_info!(shard.id, "Starting message passing task");

// Get the inner flume receiver directly
let receiver = messages_receiver.inner;

loop {
futures::select! {
_ = shutdown.wait().fuse() => {
shard_debug!(shard.id, "Message receiver shutting down");
break;
}
frame = messages_receiver.next().fuse() => {
frame = receiver.recv_async().fuse() => {
match frame {
Some(ShardFrame { message, response_sender }) => {
Ok(ShardFrame { message, response_sender }) => {
if let (Some(response), Some(tx)) =
(shard.handle_shard_message(message).await, response_sender)
{
let _ = tx.send(response).await;
}
}
None => {
Err(_) => {
shard_debug!(shard.id, "Message receiver closed; exiting pump");
break;
}
Expand Down
Loading