1414import java .time .Duration ;
1515import java .util .ArrayList ;
1616import java .util .EnumSet ;
17+ import java .util .Iterator ;
1718import java .util .List ;
1819import java .util .Map ;
1920import java .util .concurrent .CompletableFuture ;
@@ -4997,9 +4998,19 @@ public <T> List<WatchResponse> chunkedPush(
49974998 ) {
49984999 List <WatchResponse > responses = new ArrayList <>();
49995000 List <T > records = new ArrayList <>();
5001+ int offset = 0 ;
5002+ int waitBatchSize = batchSize / 10 ;
5003+ if (waitBatchSize < 1 ) {
5004+ waitBatchSize = batchSize ;
5005+ }
5006+
5007+ Iterator <T > it = objects .iterator ();
5008+ T current = it .next ();
5009+
5010+ while (true ) {
5011+ records .add (current );
50005012
5001- for (T item : objects ) {
5002- if (records .size () == batchSize ) {
5013+ if (records .size () == batchSize || !it .hasNext ()) {
50035014 WatchResponse watch = this .push (
50045015 indexName ,
50055016 new PushTaskPayload ().setAction (action ).setRecords (this .objectsToPushTaskRecords (records )),
@@ -5011,41 +5022,38 @@ public <T> List<WatchResponse> chunkedPush(
50115022 records .clear ();
50125023 }
50135024
5014- records .add (item );
5015- }
5025+ if (waitForTasks && responses .size () > 0 && (responses .size () % waitBatchSize == 0 || !it .hasNext ())) {
5026+ responses
5027+ .subList (offset , Math .min (offset + waitBatchSize , responses .size ()))
5028+ .forEach (response -> {
5029+ TaskUtils .retryUntil (
5030+ () -> {
5031+ try {
5032+ return this .getEvent (response .getRunID (), response .getEventID ());
5033+ } catch (AlgoliaApiException e ) {
5034+ if (e .getStatusCode () == 404 ) {
5035+ return null ;
5036+ }
5037+
5038+ throw e ;
5039+ }
5040+ },
5041+ (Event resp ) -> {
5042+ return resp != null ;
5043+ },
5044+ 50 ,
5045+ null
5046+ );
5047+ });
5048+
5049+ offset += waitBatchSize ;
5050+ }
50165051
5017- if (records .size () > 0 ) {
5018- WatchResponse watch = this .push (
5019- indexName ,
5020- new PushTaskPayload ().setAction (action ).setRecords (this .objectsToPushTaskRecords (records )),
5021- waitForTasks ,
5022- referenceIndexName ,
5023- requestOptions
5024- );
5025- responses .add (watch );
5026- }
5052+ if (!it .hasNext ()) {
5053+ break ;
5054+ }
50275055
5028- if (waitForTasks ) {
5029- responses .forEach (response -> {
5030- TaskUtils .retryUntil (
5031- () -> {
5032- try {
5033- return this .getEvent (response .getRunID (), response .getEventID ());
5034- } catch (AlgoliaApiException e ) {
5035- if (e .getStatusCode () == 404 ) {
5036- return null ;
5037- }
5038-
5039- throw e ;
5040- }
5041- },
5042- (Event resp ) -> {
5043- return resp != null ;
5044- },
5045- 50 ,
5046- null
5047- );
5048- });
5056+ current = it .next ();
50495057 }
50505058
50515059 return responses ;
0 commit comments