Skip to content

KAFKA-19537: Improve Exit Code Handling in StreamsGroupCommand #20293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.OffsetsUtils;
Expand All @@ -68,24 +69,24 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import joptsimple.OptionException;


public class StreamsGroupCommand {

static final String MISSING_COLUMN_VALUE = "-";

public static void main(String[] args) {
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
StreamsGroupCommandOptions opts = null;
try {
opts.checkArgs();
opts = new StreamsGroupCommandOptions(args);
Objects.requireNonNull(opts).checkArgs();
// should have exactly one action
long numberOfActions = Stream.of(
opts.listOpt,
Expand All @@ -98,12 +99,19 @@ public static void main(String[] args) {
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, or --delete-offsets.");

run(opts);
} catch (OptionException e) {
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
Exit.exit(0);
} catch (IllegalArgumentException e) {
if (opts != null)
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
else
CommandLineUtils.printErrorAndExit(e.getMessage());
} catch (Throwable e) {
printError("Executing streams group command failed due to " + e.getMessage(), Optional.of(e));
Exit.exit(1);
}
}

public static void run(StreamsGroupCommandOptions opts) {
public static void run(StreamsGroupCommandOptions opts) throws IllegalArgumentException, ExecutionException, InterruptedException {
try (StreamsGroupService streamsGroupService = new StreamsGroupService(opts, Map.of())) {
if (opts.options.has(opts.listOpt)) {
streamsGroupService.listGroups();
Expand All @@ -123,10 +131,6 @@ public static void run(StreamsGroupCommandOptions opts) {
} else {
throw new IllegalArgumentException("Unknown action!");
}
} catch (IllegalArgumentException e) {
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
} catch (Throwable e) {
printError("Executing streams group command failed due to " + e.getMessage(), Optional.of(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -133,6 +135,7 @@ public void testDeleteOffsetsNonExistingGroup() {
Map.Entry<Errors, Map<TopicPartition, Throwable>> res = service.deleteOffsets();
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
}
validateDeleteOffsetsExitCode(args, 0);
}

@Test
Expand Down Expand Up @@ -312,6 +315,7 @@ public void testDeleteOffsetsOfEmptyStreamsGroupWithAllTopics() {
} catch (Exception e) {
throw new RuntimeException(e);
}
validateDeleteOffsetsExitCode(args, 0);
}

private void assertError(Map.Entry<Errors, Map<TopicPartition, Throwable>> res,
Expand Down Expand Up @@ -437,4 +441,26 @@ private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[]
opts, cluster.createAdminClient());

}

/**
* Executes the StreamsGroupCommand with the given arguments and validates the exit code.
* <p>
* This helper method is used to test scenarios where the command is expected to exit
* with a specific status code (e.g., 0 for success, 1 for an error). It captures the
* exit code by using a mock {@link Exit.Procedure} and asserts that it matches the
* expected value.
*
* @param args The command-line arguments to pass to the StreamsGroupCommand.
* @param expectedExitCode The expected exit code from the command execution.
*/
private static void validateDeleteOffsetsExitCode(String[] args, int expectedExitCode) {
ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
Exit.setExitProcedure(exitProcedure);
try {
StreamsGroupCommand.main(args);
Assertions.assertEquals(expectedExitCode, exitProcedure.statusCode());
} finally {
Exit.resetExitProcedure();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -130,6 +131,7 @@ public static void closeCluster() {
public void testDeleteWithUnrecognizedOption() {
final String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
assertThrows(OptionException.class, () -> getStreamsGroupService(args));
validateDeleteExitCode(args, 1);
}

@Test
Expand Down Expand Up @@ -210,6 +212,7 @@ public void testDeleteSingleGroupWithoutDeletingInternalTopics() throws Exceptio
result.get(appId),
"The expected error was not detected while deleting streams group");
}
validateDeleteExitCode(args, 0);
}

@Test
Expand All @@ -233,6 +236,7 @@ public void testDeleteSingleGroupWithDeletingInternalTopics() throws Exception {
TestUtils.waitForCondition(() -> getInternalTopics(appId).isEmpty(),
"The internal topics of the streams group " + appId + " were not deleted as expected.");
}
validateDeleteExitCode(args, 0);
}

@Test
Expand All @@ -243,6 +247,8 @@ public void testDeleteMultipleGroupsWithoutDeletingInternalTopics() throws Excep

String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};

validateDeleteExitCode(args, 0);

StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args);
KafkaStreams streams1 = startKSApp(appId1, service);
KafkaStreams streams2 = startKSApp(appId2, service);
Expand Down Expand Up @@ -341,6 +347,8 @@ public void testDeleteAllGroupsWithDeletingInternalTopics() throws Exception {

String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups", "--delete-all-internal-topics"};

validateDeleteExitCode(args, 0);

StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args);
KafkaStreams streams1 = startKSApp(appId1, service);
KafkaStreams streams2 = startKSApp(appId2, service);
Expand Down Expand Up @@ -566,4 +574,26 @@ private static StreamsBuilder builder(String inputTopic, String outputTopic) {
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
return builder;
}

/**
* Executes the StreamsGroupCommand with the given arguments and validates the exit code.
* <p>
* This helper method is used to test scenarios where the command is expected to exit
* with a specific status code (e.g., 0 for success, 1 for an error). It captures the
* exit code by using a mock {@link Exit.Procedure} and asserts that it matches the
* expected value.
*
* @param args The command-line arguments to pass to the StreamsGroupCommand.
* @param expectedExitCode The expected exit code from the command execution.
*/
private static void validateDeleteExitCode(String[] args, int expectedExitCode) {
ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
Exit.setExitProcedure(exitProcedure);
try {
StreamsGroupCommand.main(args);
Assertions.assertEquals(expectedExitCode, exitProcedure.statusCode());
} finally {
Exit.resetExitProcedure();
}
}
}
Loading