Skip to content

Commit b83be3c

Browse files
committed
chore(cluster_mgr): introduce SlotRange class
Before: slot merging/splitting logic was mixed with business logic. Also, slots were represented as dictionary, which made the code less readable. Now, SlotRange handles the low-level logic, which makes the high-level code simpler to understand. Signed-off-by: Roman Gershman <[email protected]>
1 parent 555d6b5 commit b83be3c

File tree

1 file changed

+81
-52
lines changed

1 file changed

+81
-52
lines changed

tools/cluster_mgr.py

Lines changed: 81 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def start_node(node, dragonfly_bin, threads):
4646
"--dbfilename=",
4747
f"--logtostderr",
4848
"--proactor_affinity_mode=off",
49+
"--omit_basic_usage",
4950
],
5051
stderr=f,
5152
)
@@ -71,6 +72,55 @@ def send_command(node, command, print_errors=True):
7172
return Exception()
7273

7374

75+
class SlotRange:
76+
def __init__(self, start, end):
77+
assert start <= end
78+
self.start = start
79+
self.end = end
80+
81+
def to_dict(self):
82+
return {"start": self.start, "end": self.end}
83+
84+
@classmethod
85+
def from_dict(cls, d):
86+
return cls(d["start"], d["end"])
87+
88+
def __repr__(self):
89+
return f"({self.start}-{self.end})"
90+
91+
def merge(self, other):
92+
if self.end + 1 == other.start:
93+
self.end = other.end
94+
return True
95+
elif other.end + 1 == self.start:
96+
self.start = other.start
97+
return True
98+
return False
99+
100+
def contains(self, slot_id):
101+
return self.start <= slot_id <= self.end
102+
103+
def split(self, slot_id):
104+
assert self.contains(slot_id)
105+
106+
if self.start < self.end:
107+
if slot_id == self.start:
108+
return None, SlotRange(self.start + 1, self.end)
109+
elif slot_id == self.end:
110+
return SlotRange(self.start, self.end - 1), None
111+
elif self.start < slot_id < self.end:
112+
return SlotRange(self.start, slot_id - 1), SlotRange(slot_id + 1, self.end)
113+
return None, None
114+
115+
116+
# Custom JSON encoder to handle SlotRange objects
117+
class ClusterConfigEncoder(json.JSONEncoder):
118+
def default(self, obj):
119+
if isinstance(obj, SlotRange):
120+
return obj.to_dict()
121+
return super().default(obj)
122+
123+
74124
def build_node(node):
75125
return {"id": node.id, "ip": node.host, "port": node.port}
76126

@@ -81,15 +131,16 @@ def build_config_from_list(masters):
81131

82132
config = []
83133
for i, master in enumerate(masters):
134+
slot_range = SlotRange(i * slots_per_node, (i + 1) * slots_per_node - 1)
84135
c = {
85-
"slot_ranges": [{"start": i * slots_per_node, "end": (i + 1) * slots_per_node - 1}],
136+
"slot_ranges": [slot_range],
86137
"master": build_node(master.node),
87138
"replicas": [build_node(replica) for replica in master.replicas],
88139
}
89-
90140
config.append(c)
91141

92-
config[-1]["slot_ranges"][-1]["end"] += total_slots % len(masters)
142+
# Adjust the last slot range to include any remaining slots
143+
config[-1]["slot_ranges"][-1].end += total_slots % len(masters)
93144
return config
94145

95146

@@ -106,7 +157,10 @@ def get_nodes_from_config(config):
106157

107158
def push_config(config):
108159
def push_to_node(node, config):
109-
config_str = json.dumps(config, indent=2)
160+
# Use the custom encoder to convert SlotRange objects during serialization
161+
config_str = json.dumps(config, indent=2, cls=ClusterConfigEncoder)
162+
print(f"- Pushing to {node.port}: {config_str}")
163+
110164
response = send_command(node, ["dflycluster", "config", config_str])
111165
print(f"- Push to {node.port}: {response}")
112166

@@ -191,7 +245,7 @@ def build_node(node_list):
191245
def build_slots(slot_list):
192246
slots = []
193247
for i in range(0, len(slot_list), 2):
194-
slots.append({"start": slot_list[i], "end": slot_list[i + 1]})
248+
slots.append(SlotRange(slot_list[i], slot_list[i + 1]))
195249
return slots
196250

197251
client = redis.Redis(decode_responses=True, host=args.target_host, port=args.target_port)
@@ -308,76 +362,51 @@ def move(args):
308362
config = build_config_from_existing(args)
309363
new_owner = find_master(config, args.target_host, args.target_port)
310364

311-
def remove_slot(slot, from_range, from_shard):
312-
if from_range["start"] == slot:
313-
from_range["start"] += 1
314-
if from_range["start"] > from_range["end"]:
315-
from_shard["slot_ranges"].remove(from_range)
316-
elif from_range["end"] == slot:
317-
from_range["end"] -= 1
318-
if from_range["start"] > from_range["end"]:
319-
from_shard["slot_ranges"].remove(from_range)
320-
else:
321-
assert (
322-
slot > from_range["start"] and slot < from_range["end"]
323-
), f'{slot} {from_range["start"]} {from_range["end"]}'
324-
from_shard["slot_ranges"].append({"start": slot + 1, "end": from_range["end"]})
325-
from_range["end"] = slot - 1
365+
def remove_slot(slot, from_range: SlotRange, from_shard):
366+
left, right = from_range.split(slot)
367+
if left:
368+
from_shard["slot_ranges"].append(left)
369+
if right:
370+
from_shard["slot_ranges"].append(right)
371+
from_shard["slot_ranges"].remove(from_range)
326372

327373
def add_slot(slot, to_shard):
328-
for slot_range in to_shard["slot_ranges"]:
329-
if slot == slot_range["start"] - 1:
330-
slot_range["start"] -= 1
331-
return
332-
if slot == slot_range["end"] + 1:
333-
slot_range["end"] += 1
374+
slot_range = SlotRange(slot, slot)
375+
for existing_range in to_shard["slot_ranges"]:
376+
if existing_range.merge(slot_range):
334377
return
335-
to_shard["slot_ranges"].append({"start": slot, "end": slot})
378+
to_shard["slot_ranges"].append(slot_range)
336379

337380
def find_slot(slot, config):
338381
for shard in config:
339382
if shard == new_owner:
340383
continue
341384
for slot_range in shard["slot_ranges"]:
342-
if slot >= slot_range["start"] and slot <= slot_range["end"]:
385+
if slot_range.contains(slot):
343386
return shard, slot_range
344387
return None, None
345388

346389
def pack(slot_ranges):
347-
new_range = []
348-
while True:
349-
changed = False
350-
new_range = []
351-
slot_ranges.sort(key=lambda x: x["start"])
352-
for i, slot_range in enumerate(slot_ranges):
353-
added = False
354-
for j in range(i):
355-
prev_slot_range = slot_ranges[j]
356-
if prev_slot_range["end"] + 1 == slot_range["start"]:
357-
prev_slot_range["end"] = slot_range["end"]
358-
changed = True
359-
added = True
360-
break
361-
if not added:
362-
new_range.append(slot_range)
363-
slot_ranges = new_range
364-
if not changed:
365-
break
366-
return new_range
390+
slot_objects = sorted(slot_ranges, key=lambda x: x.start)
391+
packed = []
392+
for slot_range in slot_objects:
393+
if packed and packed[-1].merge(slot_range):
394+
continue
395+
packed.append(slot_range)
396+
return packed
367397

368398
for slot in range(args.slot_start, args.slot_end + 1):
369399
shard, slot_range = find_slot(slot, config)
370-
if shard == None:
371-
continue
372-
if shard == new_owner:
400+
if shard == None or shard == new_owner:
373401
continue
374402
remove_slot(slot, slot_range, shard)
375403
add_slot(slot, new_owner)
376404

377405
for shard in config:
378406
shard["slot_ranges"] = pack(shard["slot_ranges"])
379407

380-
print(f"Pushing new config:\n{json.dumps(config, indent=2)}\n")
408+
# Use the custom encoder for printing the JSON
409+
print(f"Pushing new config:\n{json.dumps(config, indent=2, cls=ClusterConfigEncoder)}\n")
381410
push_config(config)
382411

383412

0 commit comments

Comments
 (0)