Skip to content

softleader/pagination-stream

Repository files navigation

version Maven Central

pagination-stream

Pagination stream API

<dependency>
  <groupId>tw.com.softleader.data</groupId>
  <artifactId>pagination-stream</artifactId>
  <version>last-release-version</version>
</dependency>

You can find the latest version on the Release Page.

Java Module:

requires pagination.stream;
requires spring.data.commons;

Usage

First, define the method to be called. It must meet the following criteria:

  1. Input must be any combination of 0 to 10 args, ending with a Pageable object.
  2. Return type must be a Page<T>, for example:
Page<MyData> data(int a, long b, String c, Pageable pageable) {
  ...
}  

Then, create a Stream<List<T>> object using PageSupport#pagedStream. Each list in the stream represents the content of a page. For example:

PageSupport
  .pagedStream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .forEach(page -> { // page will be List<T>
    // do something to every page
  })

If you’re not concerned about page boundaries and simply want to process all data elements, use PageSupport#stream to create a Stream<T>:

PageSupport
  .stream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .forEach(data -> { // data will be MyData
    // do something to every single data
  })

During execution, the method uses the pagination information from each result to retrieve the next page by calling Pageable#next(), continuing until the last page (Pn):

 +-----+-----+-----+-----+
 |  P1 |  P1 |  P1 |  .. |  → no more data => End
 +-----+-----+-----+-----+

Fixed Pageable

The fixedStream or fixedPagedStream methods are intended for scenarios using a fixed Pageable as the paging condition. Each iteration fetches the currently available matching data from the source and processes it, repeating this process until no more matching data is found.

 +-----+-----+-----+-----+
 |  P1 |  P1 |  P1 |  .. | (no more data) => End
 +-----+-----+-----+-----+

Example:

PageSupport
  .fixedStream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .forEach(data -> { // data will be of type MyData
    // Each iteration uses the originally provided Pageable to query data
    // It is expected that the processing logic will change the state of the data source
    // So the number of matching records will gradually decrease
    // The stream ends when no data is returned or when the attempt limit is reached
  });

To prevent infinite loops, a attempt strategy is used. By default, the maximum number of attempts is calculated as total pages in the first page × 3. If this limit is exceeded, an AttemptExhaustedException will be thrown. It is recommended to catch the exception for follow-up handling.

AttemptPolicyFactory provides several common built-in strategies, such as:

// It is recommended to use import static to improve code readability
import static tw.com.softleader.data.stream.AttemptPolicyFactory.*;

PageSupport
  .fixedStream(fetch::data, 1, 2L, "3", Pageable.ofSize(10), maxAttempts(100)); // Directly specify the maximum number of attempts
  ...

You can implement your own AttemptPolicy to define a custom logic, for example:

import static tw.com.softleader.data.stream.AttemptPolicyFactory.*;

class MyAttemptPolicy implements AttemptPolicy {

  @Override
  public boolean canProceed(long currentAttempt) {
    return ...; // Custom logic
  }
}

PageSupport.fixedStream(
    fetch::data,
    1,
    2L,
    "3",
    Pageable.ofSize(10),
    ofPolicy(new MyAttemptPolicy()))
  ...

Builder Pattern

PageSupport also provides a builder pattern. You can start building with PageSupport#of:

PageSupport
  .of(fetch::data)
  .args(1, 2L, "3", Pageable.ofSize(10))
  .stream()
  . ...

Using the builder pattern allows you to define the page stream configuration in advance and create multiple stream objects. For example:

var fetcher = PageSupport.of(fetch::data);

fetcher.args(1, 2L, "3", Pageable.ofSize(10))
  .stream()
  . ...
  
fetcher.args(10, 11L, "12", Pageable.ofSize(10))
  .pagedStream()
  . ...

Parallel

PageSupport also supports parallel streams, providing better performance in scenarios where page order is not important. For example:

PageSupport
  .stream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .parallel()
  ...

In parallel scenarios, the first page of data (P1) is fetched sequentially as a basis for splitting. Assuming P1 shows that there are four pages (P1, P2, P3, P4), the remaining three pages will be split into multiple Spliterators (S1, S2, S3). Each spliterator is an independent subtask.

              +-----+
              |  P1 | (Base: fetched first)
              +-----+
                |
  +-------------+-------------+
  |             |             |
+-----+     +-----+       +-----+
|  P2 |     |  P3 |       |  P4 |
+-----+     +-----+       +-----+
  |           |             |
  v           v             v
+-----+     +-----+       +-----+
|  S1 |     |  S2 |       |  S3 |
+-----+     +-----+       +-----+

In summary, the key points of parallel processing are:

  1. The first page is fetched sequentially to serve as the basis for splitting spliterators (subtasks).
  2. The minimum unit of processing for a subtask is each page.
  3. Each subtask may not necessarily run on an independent thread, but will be handled by Java's parallelism mechanisms.

APIs starting with fixed* do not support parallel processing.

Performance Impact

Please note, using parallel streams does not always guarantee better performance. Factors such as the cost of thread creation and management can sometimes result in worse performance. Be sure to evaluate the specifics of your scenario before using parallel streams.

Before using, read more on the topic:

Example

Page from Repository

Suppose you have a complex computation, and the data comes from a database. The code example is as follows:

interface PersonRepository extends JpaRepository<Person, Long> {
 
  Page<Person> findAll(Specification<Person> spec, Pageable pageable);
}

class DifficultCalculationService {
  
  PersonRepository repository;
  
  long calculate(Specification<Person> spec, Pageable pageable) {
    return PageSupport.stream(repository::findAll, spec, pageable)
      .mapToLong(person -> {
        ...
      })
      .sum();
  }
}

Page from Remote API

Suppose the data comes from a remote API call using OpenFeign, the code example is as follows:

@FeignClient(name = "person-api", url = "http://somewhere/out/there")
interface PersonClient {
 
  @GetMapping("/people")
  Page<Person> findAll(PersonCriteria criteria, Pageable pageable);
}

class DifficultCalculationService {
  
  PersonClient client;
  
  long calculate(PersonCriteria criteria, Pageable pageable) {
    return PageSupport.stream(client::findAll, criteria, pageable)
      .mapToLong(person -> {
        ...
      })
      .sum();
  }
}

Test

When testing logic that uses pagination-stream, we can mock the fetcher using tools like Mockito to focus on our business logic. For example:

@ExtendWith(MockitoExtension.class)
class DifficultCalculationServiceTest {

  @Mock
  PersonRepository repository;

  @InjectMocks
  DifficultCalculationService service;

  @Test
  void test() {
    var spec = ...;
    var pageable = Pageable.ofSize(10);

    var p1 = new PageImpl<>(List.of(new Person(1), new Person(2)), pageable, 4);
    var p2 = new PageImpl<>(List.of(new Person(3), new Person(4)), pageable.next(), 4);

    when(repository.findAll(spec, pageable)).thenReturn(p1);
    when(repository.findAll(spec, pageable.next())).thenReturn(p2);

    var actual = service.calculate(spec, pageable);

    assertThat(actual).isEqualTo(1+2+3+4);
  }
}

Caution

When dealing with situations where the number of records cannot be controlled, it is recommended to use streaming logic as much as possible to avoid impacting your app, such as memory OOM!

If you need to use .collect(Collectors.toList()) immediately after creating the Page Stream, this is a warning sign that you need to reconsider if there is a better way to handle it!

// Good
PageSupport.stream(fetch::data, pageable)
  .map(/* process each data in each page */)
  . .... // next step

// Bad
PageSupport.stream(fetch::data, pageable)
  .collect(...) // BOOM!

About

Steam support for Spring pagination

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •