1818import org .elasticsearch .cluster .block .ClusterBlockException ;
1919import org .elasticsearch .cluster .block .ClusterBlockLevel ;
2020import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
21+ import org .elasticsearch .cluster .node .DiscoveryNode ;
2122import org .elasticsearch .cluster .routing .GroupShardsIterator ;
2223import org .elasticsearch .cluster .routing .ShardIterator ;
2324import org .elasticsearch .cluster .routing .ShardRouting ;
2425import org .elasticsearch .cluster .service .ClusterService ;
2526import org .elasticsearch .common .inject .Inject ;
2627import org .elasticsearch .common .io .stream .StreamInput ;
28+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
2729import org .elasticsearch .index .engine .Engine ;
2830import org .elasticsearch .index .shard .IndexShard ;
2931import org .elasticsearch .index .shard .ShardId ;
3638import java .io .IOException ;
3739import java .util .ArrayList ;
3840import java .util .HashMap ;
41+ import java .util .Iterator ;
42+ import java .util .LinkedList ;
3943import java .util .List ;
4044import java .util .Map ;
45+ import java .util .Queue ;
46+ import java .util .concurrent .atomic .AtomicInteger ;
4147import java .util .concurrent .atomic .AtomicReferenceArray ;
4248
4349public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastAction <
@@ -46,6 +52,7 @@ public class TransportAnalyzeIndexDiskUsageAction extends TransportBroadcastActi
4652 AnalyzeDiskUsageShardRequest ,
4753 AnalyzeDiskUsageShardResponse > {
4854 private final IndicesService indicesService ;
55+ private final ThreadPool threadPool ;
4956
5057 @ Inject
5158 public TransportAnalyzeIndexDiskUsageAction (
@@ -66,11 +73,96 @@ public TransportAnalyzeIndexDiskUsageAction(
6673 ThreadPool .Names .ANALYZE
6774 );
6875 this .indicesService = indexServices ;
76+ this .threadPool = transportService .getThreadPool ();
6977 }
7078
7179 @ Override
7280 protected void doExecute (Task task , AnalyzeIndexDiskUsageRequest request , ActionListener <AnalyzeIndexDiskUsageResponse > listener ) {
73- super .doExecute (task , request , listener );
81+ new LimitingRequestPerNodeBroadcastAction (task , request , listener , 5 ).start ();
82+ }
83+
84+ private static class ShardRequest {
85+ private final DiscoveryNode node ;
86+ private final AnalyzeDiskUsageShardRequest shardRequest ;
87+ private final ActionListener <AnalyzeDiskUsageShardResponse > handler ;
88+
89+ ShardRequest (DiscoveryNode node , AnalyzeDiskUsageShardRequest shardRequest , ActionListener <AnalyzeDiskUsageShardResponse > handler ) {
90+ this .node = node ;
91+ this .shardRequest = shardRequest ;
92+ this .handler = handler ;
93+ }
94+ }
95+
96+ final class LimitingRequestPerNodeBroadcastAction extends AsyncBroadcastAction {
97+ private final Queue <ShardRequest > queue = new LinkedList <>();
98+ private final Map <DiscoveryNode , AtomicInteger > sendingCounters = ConcurrentCollections .newConcurrentMap ();
99+ private final int maxConcurrentRequestsPerNode ;
100+
101+ LimitingRequestPerNodeBroadcastAction (
102+ Task task ,
103+ AnalyzeIndexDiskUsageRequest request ,
104+ ActionListener <AnalyzeIndexDiskUsageResponse > listener ,
105+ int maxConcurrentRequestsPerNode
106+ ) {
107+ super (task , request , listener );
108+ this .maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode ;
109+ }
110+
111+ private void trySendRequests () {
112+ assert Thread .holdsLock (this ) == false ;
113+ final List <ShardRequest > readyRequests = new ArrayList <>();
114+ synchronized (this ) {
115+ final Iterator <ShardRequest > it = queue .iterator ();
116+ while (it .hasNext ()) {
117+ final ShardRequest r = it .next ();
118+ final AtomicInteger sending = sendingCounters .computeIfAbsent (r .node , k -> new AtomicInteger ());
119+ assert 0 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
120+ if (sending .get () < maxConcurrentRequestsPerNode ) {
121+ sending .incrementAndGet ();
122+ readyRequests .add (r );
123+ it .remove ();
124+ }
125+ }
126+ }
127+ if (readyRequests .isEmpty ()) {
128+ return ;
129+ }
130+ final Thread sendingThread = Thread .currentThread ();
131+ for (ShardRequest r : readyRequests ) {
132+ super .sendShardRequest (
133+ r .node ,
134+ r .shardRequest ,
135+ ActionListener .runAfter (r .handler , () -> onRequestResponded (sendingThread , r .node ))
136+ );
137+ }
138+ }
139+
140+ private void onRequestResponded (Thread sendingThread , DiscoveryNode node ) {
141+ final AtomicInteger sending = sendingCounters .get (node );
142+ assert sending != null && 1 <= sending .get () && sending .get () <= maxConcurrentRequestsPerNode : sending ;
143+ sending .decrementAndGet ();
144+ // fork to avoid StackOverflow
145+ if (sendingThread == Thread .currentThread ()) {
146+ threadPool .generic ().execute (this ::trySendRequests );
147+ } else {
148+ trySendRequests ();
149+ }
150+ }
151+
152+ @ Override
153+ protected synchronized void sendShardRequest (
154+ DiscoveryNode node ,
155+ AnalyzeDiskUsageShardRequest shardRequest ,
156+ ActionListener <AnalyzeDiskUsageShardResponse > listener
157+ ) {
158+ queue .add (new ShardRequest (node , shardRequest , listener ));
159+ }
160+
161+ @ Override
162+ public void start () {
163+ super .start ();
164+ trySendRequests ();
165+ }
74166 }
75167
76168 @ Override
0 commit comments