Skip to content

Commit 72fb256

Browse files
authored
chore(cluster_mgr): introduce SlotRange class (#4814)
Before: slot merging/splitting logic was mixed with business logic. Also, slots were represented as dictionary, which made the code less readable. Now, SlotRange handles the low-level logic, which makes the high-level code simpler to understand. Signed-off-by: Roman Gershman <[email protected]>
1 parent c4be633 commit 72fb256

File tree

1 file changed

+83
-55
lines changed

1 file changed

+83
-55
lines changed

tools/cluster_mgr.py

Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from argparse import RawTextHelpFormatter
44
import json
55
import math
6+
from typing import Iterable
67
import redis
78
import subprocess
89
import time
@@ -46,6 +47,7 @@ def start_node(node, dragonfly_bin, threads):
4647
"--dbfilename=",
4748
f"--logtostderr",
4849
"--proactor_affinity_mode=off",
50+
"--omit_basic_usage",
4951
],
5052
stderr=f,
5153
)
@@ -71,6 +73,55 @@ def send_command(node, command, print_errors=True):
7173
return Exception()
7274

7375

76+
class SlotRange:
77+
def __init__(self, start, end):
78+
assert start <= end
79+
self.start = start
80+
self.end = end
81+
82+
def to_dict(self):
83+
return {"start": self.start, "end": self.end}
84+
85+
@classmethod
86+
def from_dict(cls, d):
87+
return cls(d["start"], d["end"])
88+
89+
def __repr__(self):
90+
return f"({self.start}-{self.end})"
91+
92+
def merge(self, other):
93+
if self.end + 1 == other.start:
94+
self.end = other.end
95+
return True
96+
elif other.end + 1 == self.start:
97+
self.start = other.start
98+
return True
99+
return False
100+
101+
def contains(self, slot_id):
102+
return self.start <= slot_id <= self.end
103+
104+
def split(self, slot_id):
105+
assert self.contains(slot_id)
106+
107+
if self.start < self.end:
108+
if slot_id == self.start:
109+
return None, SlotRange(self.start + 1, self.end)
110+
elif slot_id == self.end:
111+
return SlotRange(self.start, self.end - 1), None
112+
elif self.start < slot_id < self.end:
113+
return SlotRange(self.start, slot_id - 1), SlotRange(slot_id + 1, self.end)
114+
return None, None
115+
116+
117+
# Custom JSON encoder to handle SlotRange objects
118+
class ClusterConfigEncoder(json.JSONEncoder):
119+
def default(self, obj):
120+
if isinstance(obj, SlotRange):
121+
return obj.to_dict()
122+
return super().default(obj)
123+
124+
74125
def build_node(node):
75126
return {"id": node.id, "ip": node.host, "port": node.port}
76127

@@ -81,15 +132,16 @@ def build_config_from_list(masters):
81132

82133
config = []
83134
for i, master in enumerate(masters):
135+
slot_range = SlotRange(i * slots_per_node, (i + 1) * slots_per_node - 1)
84136
c = {
85-
"slot_ranges": [{"start": i * slots_per_node, "end": (i + 1) * slots_per_node - 1}],
137+
"slot_ranges": [slot_range],
86138
"master": build_node(master.node),
87139
"replicas": [build_node(replica) for replica in master.replicas],
88140
}
89-
90141
config.append(c)
91142

92-
config[-1]["slot_ranges"][-1]["end"] += total_slots % len(masters)
143+
# Adjust the last slot range to include any remaining slots
144+
config[-1]["slot_ranges"][-1].end += total_slots % len(masters)
93145
return config
94146

95147

@@ -106,7 +158,8 @@ def get_nodes_from_config(config):
106158

107159
def push_config(config):
108160
def push_to_node(node, config):
109-
config_str = json.dumps(config, indent=2)
161+
# Use the custom encoder to convert SlotRange objects during serialization
162+
config_str = json.dumps(config, indent=2, cls=ClusterConfigEncoder)
110163
response = send_command(node, ["dflycluster", "config", config_str])
111164
print(f"- Push to {node.port}: {response}")
112165

@@ -191,7 +244,7 @@ def build_node(node_list):
191244
def build_slots(slot_list):
192245
slots = []
193246
for i in range(0, len(slot_list), 2):
194-
slots.append({"start": slot_list[i], "end": slot_list[i + 1]})
247+
slots.append(SlotRange(slot_list[i], slot_list[i + 1]))
195248
return slots
196249

197250
client = redis.Redis(decode_responses=True, host=args.target_host, port=args.target_port)
@@ -308,76 +361,51 @@ def move(args):
308361
config = build_config_from_existing(args)
309362
new_owner = find_master(config, args.target_host, args.target_port)
310363

311-
def remove_slot(slot, from_range, from_shard):
312-
if from_range["start"] == slot:
313-
from_range["start"] += 1
314-
if from_range["start"] > from_range["end"]:
315-
from_shard["slot_ranges"].remove(from_range)
316-
elif from_range["end"] == slot:
317-
from_range["end"] -= 1
318-
if from_range["start"] > from_range["end"]:
319-
from_shard["slot_ranges"].remove(from_range)
320-
else:
321-
assert (
322-
slot > from_range["start"] and slot < from_range["end"]
323-
), f'{slot} {from_range["start"]} {from_range["end"]}'
324-
from_shard["slot_ranges"].append({"start": slot + 1, "end": from_range["end"]})
325-
from_range["end"] = slot - 1
364+
def remove_slot(slot, from_range: SlotRange, from_shard):
365+
left, right = from_range.split(slot)
366+
if left:
367+
from_shard["slot_ranges"].append(left)
368+
if right:
369+
from_shard["slot_ranges"].append(right)
370+
from_shard["slot_ranges"].remove(from_range)
326371

327372
def add_slot(slot, to_shard):
328-
for slot_range in to_shard["slot_ranges"]:
329-
if slot == slot_range["start"] - 1:
330-
slot_range["start"] -= 1
331-
return
332-
if slot == slot_range["end"] + 1:
333-
slot_range["end"] += 1
373+
slot_range = SlotRange(slot, slot)
374+
for existing_range in to_shard["slot_ranges"]:
375+
if existing_range.merge(slot_range):
334376
return
335-
to_shard["slot_ranges"].append({"start": slot, "end": slot})
377+
to_shard["slot_ranges"].append(slot_range)
336378

337379
def find_slot(slot, config):
338380
for shard in config:
339381
if shard == new_owner:
340382
continue
341383
for slot_range in shard["slot_ranges"]:
342-
if slot >= slot_range["start"] and slot <= slot_range["end"]:
384+
if slot_range.contains(slot):
343385
return shard, slot_range
344386
return None, None
345387

346388
def pack(slot_ranges):
347-
new_range = []
348-
while True:
349-
changed = False
350-
new_range = []
351-
slot_ranges.sort(key=lambda x: x["start"])
352-
for i, slot_range in enumerate(slot_ranges):
353-
added = False
354-
for j in range(i):
355-
prev_slot_range = slot_ranges[j]
356-
if prev_slot_range["end"] + 1 == slot_range["start"]:
357-
prev_slot_range["end"] = slot_range["end"]
358-
changed = True
359-
added = True
360-
break
361-
if not added:
362-
new_range.append(slot_range)
363-
slot_ranges = new_range
364-
if not changed:
365-
break
366-
return new_range
389+
slot_objects = sorted(slot_ranges, key=lambda x: x.start)
390+
packed = []
391+
for slot_range in slot_objects:
392+
if packed and packed[-1].merge(slot_range):
393+
continue
394+
packed.append(slot_range)
395+
return packed
367396

368397
for slot in range(args.slot_start, args.slot_end + 1):
369398
shard, slot_range = find_slot(slot, config)
370-
if shard == None:
371-
continue
372-
if shard == new_owner:
399+
if shard == None or shard == new_owner:
373400
continue
374401
remove_slot(slot, slot_range, shard)
375402
add_slot(slot, new_owner)
376403

377404
for shard in config:
378405
shard["slot_ranges"] = pack(shard["slot_ranges"])
379406

380-
print(f"Pushing new config:\n{json.dumps(config, indent=2)}\n")
407+
# Use the custom encoder for printing the JSON
408+
print(f"Pushing new config:\n{json.dumps(config, indent=2, cls=ClusterConfigEncoder)}\n")
381409
push_config(config)
382410

383411

@@ -390,9 +418,9 @@ def migrate(args):
390418
# Find source node
391419
source = None
392420
for node in config:
393-
slots = node["slot_ranges"]
421+
slots: Iterable[SlotRange] = node["slot_ranges"]
394422
for slot in slots:
395-
if slot["start"] <= args.slot_start and slot["end"] >= args.slot_end:
423+
if slot.start <= args.slot_start and slot.end >= args.slot_end:
396424
source = node
397425
break
398426
if source == None:
@@ -428,7 +456,7 @@ def migrate(args):
428456

429457
def print_config(args):
430458
config = build_config_from_existing(args)
431-
print(json.dumps(config, indent=2))
459+
print(json.dumps(config, indent=2, cls=ClusterConfigEncoder))
432460

433461

434462
def shutdown(args):

0 commit comments

Comments
 (0)