1818import org .elasticsearch .cluster .block .ClusterBlockException ;
1919import org .elasticsearch .cluster .block .ClusterBlockLevel ;
2020import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
21+ import org .elasticsearch .cluster .node .DiscoveryNode ;
2122import org .elasticsearch .cluster .routing .GroupShardsIterator ;
2223import org .elasticsearch .cluster .routing .ShardIterator ;
2324import org .elasticsearch .cluster .routing .ShardRouting ;
2425import org .elasticsearch .cluster .service .ClusterService ;
2526import org .elasticsearch .common .inject .Inject ;
2627import org .elasticsearch .common .io .stream .StreamInput ;
28+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
2729import org .elasticsearch .index .engine .Engine ;
2830import org .elasticsearch .index .shard .IndexShard ;
2931import org .elasticsearch .index .shard .ShardId ;
3638import java .io .IOException ;
3739import java .util .ArrayList ;
3840import java .util .HashMap ;
41+ import java .util .Iterator ;
42+ import java .util .LinkedList ;
3943import java .util .List ;
4044import java .util .Map ;
45+ import java .util .Queue ;
46+ import java .util .concurrent .atomic .AtomicInteger ;
4147import java .util .concurrent .atomic .AtomicReferenceArray ;
4248
4349public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastAction <
@@ -46,6 +52,7 @@ public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastActi
4652 AnalyzeDiskUsageShardRequest ,
4753 AnalyzeDiskUsageShardResponse > {
4854 private final IndicesService indicesService ;
55+ private final ThreadPool threadPool ;
4956
5057 @ Inject
5158 public TransportAnalyzeIndexDiskUsageAction (
@@ -66,11 +73,92 @@ public TransportAnalyzeIndexDiskUsageAction(
6673 ThreadPool .Names .ANALYZE
6774 );
6875 this .indicesService = indexServices ;
76+ this .threadPool = transportService .getThreadPool ();
6977 }
7078
7179 @ Override
7280 protected void doExecute (Task task , AnalyzeIndexDiskUsageRequest request , ActionListener <AnalyzeIndexDiskUsageResponse > listener ) {
73- super .doExecute (task , request , listener );
81+ new LimitingRequestPerNodeBroadcastAction (task , request , listener , 5 ).start ();
82+ }
83+
84+ private record ShardRequest (
85+ DiscoveryNode node ,
86+ AnalyzeDiskUsageShardRequest shardRequest ,
87+ ActionListener <AnalyzeDiskUsageShardResponse > handler
88+ ) {
89+
90+ }
91+
92+ final class LimitingRequestPerNodeBroadcastAction extends AsyncBroadcastAction {
93+ private final Queue <ShardRequest > queue = new LinkedList <>();
94+ private final Map <DiscoveryNode , AtomicInteger > sendingCounters = ConcurrentCollections .newConcurrentMap ();
95+ private final int maxConcurrentRequestsPerNode ;
96+
97+ LimitingRequestPerNodeBroadcastAction (
98+ Task task ,
99+ AnalyzeIndexDiskUsageRequest request ,
100+ ActionListener <AnalyzeIndexDiskUsageResponse > listener ,
101+ int maxConcurrentRequestsPerNode
102+ ) {
103+ super (task , request , listener );
104+ this .maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode ;
105+ }
106+
107+ private void trySendRequests () {
108+ assert Thread .holdsLock (this ) == false ;
109+ final List <ShardRequest > readyRequests = new ArrayList <>();
110+ synchronized (this ) {
111+ final Iterator <ShardRequest > it = queue .iterator ();
112+ while (it .hasNext ()) {
113+ final ShardRequest r = it .next ();
114+ final AtomicInteger sending = sendingCounters .computeIfAbsent (r .node , k -> new AtomicInteger ());
115+ assert 0 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
116+ if (sending .get () < maxConcurrentRequestsPerNode ) {
117+ sending .incrementAndGet ();
118+ readyRequests .add (r );
119+ it .remove ();
120+ }
121+ }
122+ }
123+ if (readyRequests .isEmpty ()) {
124+ return ;
125+ }
126+ final Thread sendingThread = Thread .currentThread ();
127+ for (ShardRequest r : readyRequests ) {
128+ super .sendShardRequest (
129+ r .node ,
130+ r .shardRequest ,
131+ ActionListener .runAfter (r .handler , () -> onRequestResponded (sendingThread , r .node ))
132+ );
133+ }
134+ }
135+
136+ private void onRequestResponded (Thread sendingThread , DiscoveryNode node ) {
137+ final AtomicInteger sending = sendingCounters .get (node );
138+ assert sending != null && 1 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
139+ sending .decrementAndGet ();
140+ // fork to avoid StackOverflow
141+ if (sendingThread == Thread .currentThread ()) {
142+ threadPool .generic ().execute (this ::trySendRequests );
143+ } else {
144+ trySendRequests ();
145+ }
146+ }
147+
148+ @ Override
149+ protected synchronized void sendShardRequest (
150+ DiscoveryNode node ,
151+ AnalyzeDiskUsageShardRequest shardRequest ,
152+ ActionListener <AnalyzeDiskUsageShardResponse > listener
153+ ) {
154+ queue .add (new ShardRequest (node , shardRequest , listener ));
155+ }
156+
157+ @ Override
158+ public void start () {
159+ super .start ();
160+ trySendRequests ();
161+ }
74162 }
75163
76164 @ Override
0 commit comments