Skip to content

Commit 8eb09bf

Browse files
authored
Explicitly close the FlowReceiver (#31982)
1 parent 1e8c091 commit 8eb09bf

File tree

4 files changed

+24
-6
lines changed

4 files changed

+24
-6
lines changed

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class BasicAuthJcsmpSessionService extends SessionService {
4646
private final String password;
4747
private final String vpnName;
4848
@Nullable private JCSMPSession jcsmpSession;
49+
@Nullable private MessageReceiver messageReceiver;
4950
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();
5051

5152
/**
@@ -73,21 +74,25 @@ public void connect() {
7374

7475
@Override
7576
public void close() {
76-
if (isClosed()) {
77-
return;
78-
}
7977
retryCallableManager.retryCallable(
8078
() -> {
81-
checkStateNotNull(jcsmpSession).closeSession();
79+
if (messageReceiver != null) {
80+
messageReceiver.close();
81+
}
82+
if (!isClosed()) {
83+
checkStateNotNull(jcsmpSession).closeSession();
84+
}
8285
return 0;
8386
},
8487
ImmutableSet.of(IOException.class));
8588
}
8689

8790
@Override
8891
public MessageReceiver createReceiver() {
89-
return retryCallableManager.retryCallable(
90-
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
92+
this.messageReceiver =
93+
retryCallableManager.retryCallable(
94+
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
95+
return this.messageReceiver;
9196
}
9297

9398
@Override

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public interface MessageReceiver {
4949
*/
5050
BytesXMLMessage receive() throws IOException;
5151

52+
/** Closes the message receiver. */
53+
void close();
54+
5255
/**
5356
* Test clients may return {@literal true} to signal that all expected messages have been pulled
5457
* and the test may complete. Real clients should always return {@literal false}.

sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,11 @@ public BytesXMLMessage receive() throws IOException {
6969
throw new IOException(e);
7070
}
7171
}
72+
73+
@Override
74+
public void close() {
75+
if (!isClosed()) {
76+
this.flowReceiver.close();
77+
}
78+
}
7279
}

sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ public BytesXMLMessage receive() throws IOException {
9292
return getRecordFn.apply(counter.getAndIncrement());
9393
}
9494

95+
@Override
96+
public void close() {}
97+
9598
@Override
9699
public boolean isEOF() {
97100
return counter.get() >= minMessagesReceived;

0 commit comments

Comments
 (0)