Skip to content

Commit 014734d

Browse files
committed
IGNITE-22410 Implement rebalance triggers for zone based partitions
1 parent 66a3d69 commit 014734d

File tree

10 files changed

+2125
-111
lines changed

10 files changed

+2125
-111
lines changed

modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public class DistributionZoneRebalanceEngine {
9999
/** Executor for scheduling rebalances. */
100100
private final ScheduledExecutorService rebalanceScheduler;
101101

102+
/** Zone rebalance manager. */
103+
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 this class will replace DistributionZoneRebalanceEngine
104+
// TODO: after switching to zone-based replication
105+
private final DistributionZoneRebalanceEngineV2 distributionZoneRebalanceEngineV2;
106+
102107
/**
103108
* Constructor.
104109
*
@@ -122,6 +127,12 @@ public DistributionZoneRebalanceEngine(
122127
this.dataNodesListener = createDistributionZonesDataNodesListener();
123128
this.partitionsCounterListener = createPartitionsCounterListener();
124129
this.rebalanceScheduler = rebalanceScheduler;
130+
this.distributionZoneRebalanceEngineV2 = new DistributionZoneRebalanceEngineV2(
131+
busyLock,
132+
metaStorageManager,
133+
distributionZoneManager,
134+
catalogService
135+
);
125136
}
126137

127138
/**
@@ -148,7 +159,7 @@ protected CompletableFuture<Void> onReplicasUpdate(AlterZoneEventParameters para
148159

149160
long recoveryRevision = recoveryFinishFuture.join();
150161

151-
return rebalanceTriggersRecovery(recoveryRevision);
162+
return rebalanceTriggersRecovery(recoveryRevision).thenCompose(v -> distributionZoneRebalanceEngineV2.start());
152163
});
153164
}
154165

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.distributionzones.rebalance;
19+
20+
import static java.util.concurrent.CompletableFuture.allOf;
21+
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
22+
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
23+
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
24+
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
25+
import static org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZoneIdDataNodes;
26+
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
27+
28+
import java.util.Set;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import org.apache.ignite.internal.affinity.AffinityUtils;
33+
import org.apache.ignite.internal.affinity.Assignment;
34+
import org.apache.ignite.internal.catalog.CatalogManager;
35+
import org.apache.ignite.internal.catalog.CatalogService;
36+
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
37+
import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
38+
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
39+
import org.apache.ignite.internal.distributionzones.Node;
40+
import org.apache.ignite.internal.distributionzones.utils.CatalogAlterZoneEventListener;
41+
import org.apache.ignite.internal.logger.IgniteLogger;
42+
import org.apache.ignite.internal.logger.Loggers;
43+
import org.apache.ignite.internal.metastorage.MetaStorageManager;
44+
import org.apache.ignite.internal.metastorage.WatchEvent;
45+
import org.apache.ignite.internal.metastorage.WatchListener;
46+
import org.apache.ignite.internal.replicator.ZonePartitionId;
47+
import org.apache.ignite.internal.util.ExceptionUtils;
48+
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
49+
import org.apache.ignite.internal.util.IgniteUtils;
50+
51+
/**
52+
* Zone rebalance manager.
53+
* // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 this class will replace DistributionZoneRebalanceEngine
54+
* // TODO: after switching to zone-based replication
55+
*/
56+
public class DistributionZoneRebalanceEngineV2 {
57+
/** The logger. */
58+
private static final IgniteLogger LOG = Loggers.forClass(DistributionZoneRebalanceEngineV2.class);
59+
60+
/** Prevents double stopping of the component. */
61+
private final AtomicBoolean stopGuard = new AtomicBoolean();
62+
63+
/** External busy lock. */
64+
private final IgniteSpinBusyLock busyLock;
65+
66+
/** Meta Storage manager. */
67+
private final MetaStorageManager metaStorageManager;
68+
69+
/** Distribution zones manager. */
70+
private final DistributionZoneManager distributionZoneManager;
71+
72+
/** Meta storage listener for data nodes changes. */
73+
private final WatchListener dataNodesListener;
74+
75+
/** Catalog service. */
76+
private final CatalogService catalogService;
77+
78+
/**
79+
* Constructor.
80+
*
81+
* @param busyLock External busy lock.
82+
* @param metaStorageManager Meta Storage manager.
83+
* @param distributionZoneManager Distribution zones manager.
84+
* @param catalogService Catalog service.
85+
*/
86+
public DistributionZoneRebalanceEngineV2(
87+
IgniteSpinBusyLock busyLock,
88+
MetaStorageManager metaStorageManager,
89+
DistributionZoneManager distributionZoneManager,
90+
CatalogManager catalogService
91+
) {
92+
this.busyLock = busyLock;
93+
this.metaStorageManager = metaStorageManager;
94+
this.distributionZoneManager = distributionZoneManager;
95+
this.catalogService = catalogService;
96+
97+
this.dataNodesListener = createDistributionZonesDataNodesListener();
98+
}
99+
100+
/**
101+
* Starts the rebalance engine by registering corresponding meta storage and configuration listeners.
102+
*/
103+
public CompletableFuture<Void> start() {
104+
return IgniteUtils.inBusyLockAsync(busyLock, () -> {
105+
catalogService.listen(ZONE_ALTER, new CatalogAlterZoneEventListener(catalogService) {
106+
@Override
107+
protected CompletableFuture<Void> onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
108+
return onUpdateReplicas(parameters);
109+
}
110+
});
111+
112+
// TODO: IGNITE-18694 - Recovery for the case when zones watch listener processed event but assignments were not updated.
113+
metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), dataNodesListener);
114+
115+
return nullCompletedFuture();
116+
});
117+
}
118+
119+
/**
120+
* Stops the rebalance engine by unregistering meta storage watches.
121+
*/
122+
public void stop() {
123+
if (!stopGuard.compareAndSet(false, true)) {
124+
return;
125+
}
126+
127+
metaStorageManager.unregisterWatch(dataNodesListener);
128+
}
129+
130+
private WatchListener createDistributionZonesDataNodesListener() {
131+
return new WatchListener() {
132+
@Override
133+
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
134+
return IgniteUtils.inBusyLockAsync(busyLock, () -> {
135+
Set<Node> dataNodes = parseDataNodes(evt.entryEvent().newEntry().value());
136+
137+
if (dataNodes == null) {
138+
// The zone was removed so data nodes was removed too.
139+
return nullCompletedFuture();
140+
}
141+
142+
int zoneId = extractZoneIdDataNodes(evt.entryEvent().newEntry().key());
143+
144+
// It is safe to get the latest version of the catalog as we are in the metastore thread.
145+
int catalogVersion = catalogService.latestCatalogVersion();
146+
147+
CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zoneId, catalogVersion);
148+
149+
if (zoneDescriptor == null) {
150+
// Zone has been removed.
151+
return nullCompletedFuture();
152+
}
153+
154+
Set<String> filteredDataNodes = filterDataNodes(
155+
dataNodes,
156+
zoneDescriptor,
157+
distributionZoneManager.nodesAttributes()
158+
);
159+
160+
if (filteredDataNodes.isEmpty()) {
161+
return nullCompletedFuture();
162+
}
163+
164+
return triggerPartitionsRebalanceForZone(
165+
evt.entryEvent().newEntry().revision(),
166+
zoneDescriptor,
167+
filteredDataNodes
168+
);
169+
});
170+
}
171+
172+
@Override
173+
public void onError(Throwable e) {
174+
LOG.warn("Unable to process data nodes event", e);
175+
}
176+
};
177+
}
178+
179+
private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters parameters) {
180+
return recalculateAssignmentsAndScheduleRebalance(
181+
parameters.zoneDescriptor(),
182+
parameters.causalityToken(),
183+
parameters.catalogVersion()
184+
);
185+
}
186+
187+
/**
188+
* Recalculate assignments for zone partitions and schedule rebalance (by update rebalance metastore keys).
189+
*
190+
* @param zoneDescriptor Zone descriptor.
191+
* @param causalityToken Causality token.
192+
* @param catalogVersion Catalog version.
193+
* @return The future, which completes when the all metastore updates done.
194+
*/
195+
private CompletableFuture<Void> recalculateAssignmentsAndScheduleRebalance(
196+
CatalogZoneDescriptor zoneDescriptor,
197+
long causalityToken,
198+
int catalogVersion
199+
) {
200+
201+
return distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneDescriptor.id())
202+
.thenCompose(dataNodes -> {
203+
if (dataNodes.isEmpty()) {
204+
return nullCompletedFuture();
205+
}
206+
207+
return triggerPartitionsRebalanceForZone(
208+
causalityToken,
209+
zoneDescriptor,
210+
dataNodes
211+
);
212+
});
213+
}
214+
215+
private CompletableFuture<Void> triggerPartitionsRebalanceForZone(
216+
long revision,
217+
CatalogZoneDescriptor zoneDescriptor,
218+
Set<String> dataNodes
219+
) {
220+
CompletableFuture<?>[] partitionFutures = ZoneRebalanceUtil.triggerZonePartitionsRebalance(
221+
zoneDescriptor,
222+
dataNodes,
223+
revision,
224+
metaStorageManager
225+
);
226+
227+
// This set is used to deduplicate exceptions (if there is an exception from upstream, for instance,
228+
// when reading from MetaStorage, it will be encountered by every partition future) to avoid noise
229+
// in the logs.
230+
Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
231+
232+
for (int partId = 0; partId < partitionFutures.length; partId++) {
233+
int finalPartId = partId;
234+
235+
partitionFutures[partId].exceptionally(e -> {
236+
Throwable cause = ExceptionUtils.unwrapCause(e);
237+
238+
if (unwrappedCauses.add(cause)) {
239+
// The exception is specific to this partition.
240+
LOG.error(
241+
"Exception on updating assignments for [zone={}, partition={}]",
242+
e,
243+
zoneInfo(zoneDescriptor), finalPartId
244+
);
245+
} else {
246+
// The exception is from upstream and not specific for this partition, so don't log the partition index.
247+
LOG.error(
248+
"Exception on updating assignments for [zone={}]",
249+
e,
250+
zoneInfo(zoneDescriptor)
251+
);
252+
}
253+
254+
return null;
255+
});
256+
}
257+
258+
return allOf(partitionFutures);
259+
}
260+
261+
private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
262+
return zoneDescriptor.id() + "/" + zoneDescriptor.name();
263+
}
264+
265+
static CompletableFuture<Set<Assignment>> calculateZoneAssignments(
266+
ZonePartitionId zonePartitionId,
267+
CatalogService catalogService,
268+
DistributionZoneManager distributionZoneManager
269+
) {
270+
int catalogVersion = catalogService.latestCatalogVersion();
271+
272+
CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zonePartitionId.zoneId(), catalogVersion);
273+
274+
int zoneId = zonePartitionId.zoneId();
275+
276+
return distributionZoneManager.dataNodes(
277+
zoneDescriptor.updateToken(),
278+
catalogVersion,
279+
zoneId
280+
).thenApply(dataNodes ->
281+
AffinityUtils.calculateAssignmentForPartition(
282+
dataNodes,
283+
zonePartitionId.partitionId(),
284+
zoneDescriptor.replicas()
285+
)
286+
);
287+
}
288+
}

0 commit comments

Comments
 (0)