-
Notifications
You must be signed in to change notification settings - Fork 3k
Description
Describe the bug
According to the smalleye kafka documentation, the graceful shutdown property can only be set for incoming channels Incoming config has 'graceful-shutdown'. But I don't see such a property for outgoing channel Outgoing doesn't has 'graceful-shutdown' poperty
Trying to make a RelocateConfigSourceInterceptor for topic properties, I came across a problem that an incorrect property may be generated (for @Outgoing("out")
mp.messaging.incoming.out.connector is generated), which leads to an error at the channel validation step.
In SmallRyeReactiveMessagingKafkaProcessor there is a section of code that only works for dev/test modes. Here, both incoming and outgoing channels are included in the general array. Then, connectors are searched for and the graceful-shutdown property is set. See here
And here methods are called that receive a boolean value equal to true (which further in the code corresponds to only incoming channels).
The question is - is this really correct? Maybe it is worth using only incoming channels here, and outgoing ones should be removed? Or should I split this part into two to correctly pass the value for boolean incoming?
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
// List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING); //remove
// List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL); //remove
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(incomings);
// annotations.addAll(outgoings); //remove
// annotations.addAll(channels); //remove
for (AnnotationInstance annotation : annotations) {
String channelName = annotation.value().asString();
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
continue;
}
String key = getChannelPropertyKey(channelName, "graceful-shutdown", true);
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
});
}
or like
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
for (AnnotationInstance annotation : incomings) {
String channelName = annotation.value().asString();
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
continue;
}
String key = getChannelPropertyKey(channelName, "graceful-shutdown", true);
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
});
}
List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING);
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(outgoings);
annotations.addAll(channels);
for (AnnotationInstance annotation : annotations) {
String channelName = annotation.value().asString();
if (!discoveryState.isKafkaConnector(channelsManagedByConnectors, false, channelName)) {
continue;
}
String key = getChannelPropertyKey(channelName, "graceful-shutdown", false);
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
});
}
Thanks
Expected behavior
No response
Actual behavior
No response
How to Reproduce?
No response
Output of uname -a
or ver
No response
Output of java -version
No response
Quarkus version or git rev
3.24
Build tool (ie. output of mvnw --version
or gradlew --version
)
No response
Additional information
No response