Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions driver/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,22 @@
<method>java.lang.String startNodeElementId()</method>
</difference>

<difference>
<className>org/neo4j/driver/Result</className>
<differenceType>7012</differenceType>
<method>boolean isOpen()</method>
</difference>

<difference>
<className>org/neo4j/driver/async/ResultCursor</className>
<differenceType>7012</differenceType>
<method>java.util.concurrent.CompletionStage isOpenAsync()</method>
</difference>

<difference>
<className>org/neo4j/driver/reactive/RxResult</className>
<differenceType>7012</differenceType>
<method>org.reactivestreams.Publisher isOpen()</method>
</difference>

</differences>
17 changes: 15 additions & 2 deletions driver/src/main/java/org/neo4j/driver/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Stream;

import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.util.Resource;

Expand Down Expand Up @@ -140,12 +141,24 @@ public interface Result extends Iterator<Record>

/**
* Return the result summary.
*
* <p>
* If the records in the result is not fully consumed, then calling this method will exhausts the result.
*
* <p>
* If you want to access unconsumed records after summary, you shall use {@link Result#list()} to buffer all records into memory before summary.
*
* @return a summary for the whole query result.
*/
ResultSummary consume();

/**
* Determine if result is open.
* <p>
* Result is considered to be open if it has not been consumed ({@link #consume()}) and its creator object (e.g. session or transaction) has not been closed
* (including committed or rolled back).
* <p>
* Attempts to access data on closed result will produce {@link ResultConsumedException}.
*
* @return {@code true} if result is open and {@code false} otherwise.
*/
boolean isOpen();
}
13 changes: 13 additions & 0 deletions driver/src/main/java/org/neo4j/driver/async/ResultCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.driver.Records;
import org.neo4j.driver.Result;
import org.neo4j.driver.exceptions.NoSuchRecordException;
import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.summary.ResultSummary;

/**
Expand Down Expand Up @@ -154,4 +155,16 @@ public interface ResultCursor
* completed exceptionally if query execution or provided function fails.
*/
<T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction );

/**
* Determine if result is open.
* <p>
* Result is considered to be open if it has not been consumed ({@link #consumeAsync()}) and its creator object (e.g. session or transaction) has not been
* closed (including committed or rolled back).
* <p>
* Attempts to access data on closed result will produce {@link ResultConsumedException}.
*
* @return a {@link CompletionStage} completed with {@code true} if result is open and {@code false} otherwise.
*/
CompletionStage<Boolean> isOpenAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public List<Record> list()
}

@Override
public <T> List<T> list( Function<Record, T> mapFunction )
public <T> List<T> list( Function<Record,T> mapFunction )
{
return blockingGet( cursor.listAsync( mapFunction ) );
}
Expand All @@ -111,6 +111,12 @@ public ResultSummary consume()
return blockingGet( cursor.consumeAsync() );
}

@Override
public boolean isOpen()
{
return blockingGet( cursor.isOpenAsync() );
}

@Override
public void remove()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
return pullAllHandler.listAsync( mapFunction );
}

@Override
public CompletionStage<Boolean> isOpenAsync()
{
throw new UnsupportedOperationException();
}

@Override
public CompletionStage<Throwable> discardAllFailureAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
return assertNotDisposed().thenCompose( ignored -> delegate.listAsync( mapFunction ) );
}

@Override
public CompletionStage<Boolean> isOpenAsync()
{
return CompletableFuture.completedFuture( !isDisposed() );
}

@Override
public CompletionStage<Throwable> discardAllFailureAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ public Publisher<ResultSummary> consume()
} ) );
}

@Override
public Publisher<Boolean> isOpen()
{
return Mono.fromCompletionStage( getCursorFuture() )
.map( cursor -> !cursor.isDone() );
}

// For testing purpose
Supplier<CompletionStage<RxResultCursor>> cursorFutureSupplier()
{
Expand Down
14 changes: 13 additions & 1 deletion driver/src/main/java/org/neo4j/driver/reactive/RxResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.neo4j.driver.reactive;

import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.List;

import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.summary.ResultSummary;
Expand Down Expand Up @@ -108,4 +108,16 @@ public interface RxResult
* @return a cold publisher of result summary which only arrives after all records.
*/
Publisher<ResultSummary> consume();

/**
* Determine if result is open.
* <p>
* Result is considered to be open if it has not been consumed ({@link #consume()}) and its creator object (e.g. session or transaction) has not been closed
* (including committed or rolled back).
* <p>
* Attempts to access data on closed result will produce {@link ResultConsumedException}.
*
* @return a publisher emitting {@code true} if result is open and {@code false} otherwise.
*/
Publisher<Boolean> isOpen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.neo4j.driver.internal;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -50,10 +52,13 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.junit.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.Records.column;
Expand Down Expand Up @@ -351,7 +356,24 @@ void shouldNotPeekIntoTheFutureWhenResultIsEmpty()
assertThrows( NoSuchRecordException.class, result::peek );
}

private Result createResult(int numberOfRecords )
@ParameterizedTest
@ValueSource( booleans = {true, false} )
void shouldDelegateIsOpen( boolean expectedState )
{
// GIVEN
AsyncResultCursor cursor = mock( AsyncResultCursor.class );
given( cursor.isOpenAsync() ).willReturn( CompletableFuture.completedFuture( expectedState ) );
Result result = new InternalResult( null, cursor );

// WHEN
boolean actualState = result.isOpen();

// THEN
assertEquals( expectedState, actualState );
then( cursor ).should().isOpenAsync();
}

private Result createResult( int numberOfRecords )
{
RunResponseHandler runHandler = new RunResponseHandler( new CompletableFuture<>(), BoltProtocolV3.METADATA_EXTRACTOR, mock( Connection.class ), null );
runHandler.onSuccess( singletonMap( "fields", value( Arrays.asList( "k1", "k2" ) ) ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,22 @@ void shouldPropagateFailureInConsumeAsync()
assertEquals( error, e );
}

private static AsyncResultCursorImpl newCursor(PullAllResponseHandler pullAllHandler )
@Test
void shouldThrowOnIsOpenAsync()
{
// GIVEN
AsyncResultCursorImpl cursor = new AsyncResultCursorImpl( null, null, null );

// WHEN & THEN
assertThrows( UnsupportedOperationException.class, cursor::isOpenAsync );
}

private static AsyncResultCursorImpl newCursor( PullAllResponseHandler pullAllHandler )
{
return new AsyncResultCursorImpl( null, newRunResponseHandler(), pullAllHandler );
}

private static AsyncResultCursorImpl newCursor(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
private static AsyncResultCursorImpl newCursor( RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
{
return new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,38 @@ void shouldFailOnMapSuccessfulRunCompletionAsyncFailure()
then( delegate ).should().mapSuccessfulRunCompletionAsync();
assertSame( error, actual );
}

@Test
void shouldBeOpenOnCreation()
{
assertTrue( await( cursor.isOpenAsync() ) );
}

@Test
void shouldCloseOnConsume()
{
// Given
boolean initialState = await( cursor.isOpenAsync() );

// When
await( cursor.consumeAsync() );

// Then
assertTrue( initialState );
assertFalse( await( cursor.isOpenAsync() ) );
}

@Test
void shouldCloseOnDiscardAll()
{
// Given
boolean initialState = await( cursor.isOpenAsync() );

// When
await( cursor.discardAllFailureAsync() );

// Then
assertTrue( initialState );
assertFalse( await( cursor.isOpenAsync() ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.neo4j.driver.internal.reactive;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -46,7 +48,10 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.Values.values;
Expand Down Expand Up @@ -193,6 +198,23 @@ void shouldErrorIfFailedToStream()
.assertNext( summary -> assertThat( summary, instanceOf( ResultSummary.class ) ) ).verifyComplete();
}

@ParameterizedTest
@ValueSource( booleans = {true, false} )
void shouldDelegateIsOpen( boolean expectedState )
{
// Given
RxResultCursor cursor = mock( RxResultCursor.class );
given( cursor.isDone() ).willReturn( !expectedState );
RxResult result = new InternalRxResult( () -> CompletableFuture.completedFuture( cursor ) );

// When
Boolean actualState = Mono.from( result.isOpen() ).block();

// Then
assertEquals( expectedState, actualState );
then( cursor ).should().isDone();
}

private InternalRxResult newRxResult( PullResponseHandler pullHandler )
{
RunResponseHandler runHandler = mock( RunResponseHandler.class );
Expand Down