Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,16 @@ size_t CalculateEvictionBytes() {
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);

// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;

/* If rss_oom_deny_ratio is set, we should evict depending on rss memory too */
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;

// Calculate how much rss memory is used by all shards
Expand All @@ -247,6 +249,8 @@ size_t CalculateEvictionBytes() {
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/

return goal_bytes;
}

Expand Down
120 changes: 19 additions & 101 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,6 @@
from .instance import DflyInstance, DflyInstanceFactory


async def calculate_estimated_connection_memory(
async_client: aioredis.Redis, df_server: DflyInstance
):
memory_info = await async_client.info("memory")
already_used_rss_memory = memory_info["used_memory_rss"]

connections_number = 100
connections = []
for _ in range(connections_number):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)

await asyncio.sleep(1) # Wait RSS update

memory_info = await async_client.info("memory")
estimated_connections_memory = memory_info["used_memory_rss"] - already_used_rss_memory

# Close test connection
for conn in connections:
await conn.close()

return estimated_connections_memory // connections_number


@pytest.mark.opt_only
@pytest.mark.parametrize(
"type, keys, val_size, elements",
Expand Down Expand Up @@ -188,106 +163,49 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
assert rss_before_eval * 1.01 > info["used_memory_rss"]


@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "256mb",
"rss_oom_deny_ratio": 0.5,
"max_eviction_per_heartbeat": 1000,
"maxmemory": "5gb",
"rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 100,
}
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
df_server: DflyInstance,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""

max_memory = 256 * 1024 * 1024 # 256 MB
rss_max_memory = int(max_memory * 0.5) # 50% of max memory

data_fill_size = int(0.55 * rss_max_memory) # 55% of rss_max_memory
rss_increase_size = int(0.55 * rss_max_memory) # 55% of max rss_max_memory

key_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // key_size

await asyncio.sleep(1) # Wait for RSS update

estimated_connection_memory = await calculate_estimated_connection_memory(
async_client, df_server
)
num_connections = rss_increase_size // estimated_connection_memory

logging.info(
f"Estimated connection memory: {estimated_connection_memory}. Number of connections: {num_connections}."
)

# Fill data to 55% of rss max memory
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", key_size)
max_memory = 5 * 1024 * 1024 * 1024 # 5G
rss_max_memory = int(max_memory * 0.8)

await asyncio.sleep(1) # Wait for RSS heartbeat update
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory

# First test that eviction is not triggered without connection creation
stats_info = await async_client.info("stats")
assert stats_info["evicted_keys"] == 0, "No eviction should start yet."
val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // val_size

await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory."
assert (
memory_info["used_memory_rss"] < rss_max_memory * 0.9
memory_info["used_memory_rss"] > rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit.: "RSS memory should be higher than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."


# Disable heartbeat eviction
await async_client.execute_command("CONFIG SET enable_heartbeat_eviction false")

# Increase RSS memory by 55% of rss max memory
# We can simulate RSS increase by creating new connections
connections = []
for _ in range(num_connections):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)

await asyncio.sleep(1)

# Check that RSS memory is above rss limit
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory_rss"] >= rss_max_memory * 0.9
), "RSS memory should exceed 90% of the maximum RSS memory limit (max_memory * rss_oom_deny_ratio)."

# Enable heartbeat eviction
await async_client.execute_command("CONFIG SET enable_heartbeat_eviction true")

await asyncio.sleep(1) # Wait for RSS heartbeat update
await async_client.execute_command("MEMORY DECOMMIT")
await asyncio.sleep(1) # Wait for RSS update

# Get RSS memory after creating new connections
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")

logging.info(f'Evicted keys number: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')

assert (
memory_info["used_memory"] < data_fill_size
), "Used memory should be less than initial fill size due to eviction."

assert (
memory_info["used_memory_rss"] < rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio) after eviction."

# Check that eviction has occurred
assert (
stats_info["evicted_keys"] > 0
), "Eviction should have occurred due to rss memory pressure."

for conn in connections:
await conn.close()
while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
await asyncio.sleep(1)
memory_info = await async_client.info("memory")
logging.info(
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
)
stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add an assertion after this loop to verify that at least 5% of the num_keys have been evicted

Loading