@@ -839,7 +839,7 @@ public void ingestionStreamConfigsTest() {
839839 e .getMessage ().contains ("Multiple stream configs are not supported with pauseless consumption enabled" ));
840840 }
841841
842- // Test for multiple stream configs with pauseless consumption disabled - should pass
842+ // Test for multiple stream configs with pauseless consumption disabled - fail if duplicate topic names are present
843843 StreamIngestionConfig streamIngestionConfigWithoutPauseless =
844844 new StreamIngestionConfig (Arrays .asList (streamConfigs , streamConfigs ));
845845 streamIngestionConfigWithoutPauseless .setPauselessConsumptionEnabled (false );
@@ -854,9 +854,31 @@ public void ingestionStreamConfigsTest() {
854854
855855 try {
856856 TableConfigUtils .validate (tableConfigWithoutPauseless , schema );
857- // Should pass - multiple stream configs are allowed when pauseless consumption is disabled
857+ fail ( " Should fail if duplicate topic names are present in multiple stream configs" );
858858 } catch (IllegalStateException e ) {
859- fail ("Should not fail when pauseless consumption is disabled with multiple stream configs: " + e .getMessage ());
859+ assertTrue (e .getMessage ().contains ("Duplicate topic names found in streamConfigs" ));
860+ }
861+
862+ // Test for multiple stream configs with pauseless consumption disabled - pass if no duplicate topic names
863+ Map <String , String > kafkaStreamConfigs = getKafkaStreamConfigs ();
864+ streamConfigs = getStreamConfigs ();
865+ streamIngestionConfigWithoutPauseless =
866+ new StreamIngestionConfig (Arrays .asList (streamConfigs , kafkaStreamConfigs ));
867+ streamIngestionConfigWithoutPauseless .setPauselessConsumptionEnabled (false );
868+
869+ ingestionConfigWithoutPauseless = new IngestionConfig ();
870+ ingestionConfigWithoutPauseless .setStreamIngestionConfig (streamIngestionConfigWithoutPauseless );
871+
872+ tableConfigWithoutPauseless = new TableConfigBuilder (TableType .REALTIME ).setTableName (TABLE_NAME )
873+ .setTimeColumnName ("timeColumn" )
874+ .setIngestionConfig (ingestionConfigWithoutPauseless )
875+ .build ();
876+
877+ try {
878+ TableConfigUtils .validate (tableConfigWithoutPauseless , schema );
879+ } catch (IllegalStateException e ) {
880+ fail ("Should not fail when no duplicate topic and no pauseless ingestion are present in multiple stream configs" ,
881+ e );
860882 }
861883 }
862884
@@ -2945,7 +2967,7 @@ public void testValidateImplicitRealtimeTablePartitionSelectorConfigs() {
29452967 private Map <String , String > getStreamConfigs () {
29462968 Map <String , String > streamConfigs = new HashMap <>();
29472969 streamConfigs .put ("streamType" , "kafka" );
2948- streamConfigs .put ("stream.kafka.topic.name" , "test " );
2970+ streamConfigs .put ("stream.kafka.topic.name" , "test_topic " );
29492971 streamConfigs .put ("stream.kafka.decoder.class.name" ,
29502972 "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder" );
29512973 return streamConfigs ;
@@ -2954,7 +2976,7 @@ private Map<String, String> getStreamConfigs() {
29542976 private Map <String , String > getKafkaStreamConfigs () {
29552977 Map <String , String > streamConfigs = new HashMap <>();
29562978 streamConfigs .put ("streamType" , "kafka" );
2957- streamConfigs .put ("stream.kafka.topic.name" , "test " );
2979+ streamConfigs .put ("stream.kafka.topic.name" , "test_kafka_topic " );
29582980 streamConfigs .put ("stream.kafka.decoder.class.name" ,
29592981 "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder" );
29602982 streamConfigs .put ("stream.kafka.decoder.prop.descriptorFile" , "file://test" );
@@ -2965,7 +2987,7 @@ private Map<String, String> getKafkaStreamConfigs() {
29652987 private Map <String , String > getPulsarStreamConfigs () {
29662988 Map <String , String > streamConfigs = new HashMap <>();
29672989 streamConfigs .put ("streamType" , "pulsar" );
2968- streamConfigs .put ("stream.pulsar.topic.name" , "test " );
2990+ streamConfigs .put ("stream.pulsar.topic.name" , "test_pulsar_topic " );
29692991 streamConfigs .put ("stream.pulsar.decoder.prop.descriptorFile" , "file://test" );
29702992 streamConfigs .put ("stream.pulsar.decoder.prop.protoClassName" , "test" );
29712993 streamConfigs .put ("stream.pulsar.decoder.class.name" ,
0 commit comments