Skip to content

fix(scan): kcp fails to connect to older Kafka clusters. #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions internal/cli/report/cluster/metrics/report_cluster_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,38 @@ func runReportClusterMetrics(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create msk client: %v", err)
}

mskService := msk.NewMSKService(mskClient)

// Get cluster information to extract Kafka version
cluster, err := mskService.DescribeCluster(cmd.Context(), &opts.ClusterArn)
if err != nil {
return fmt.Errorf("failed to describe cluster: %v", err)
}

// Extract and convert Kafka version
var kafkaVersion string
if cluster.ClusterType == kafkatypes.ClusterTypeProvisioned {
kafkaVersion = utils.ConvertKafkaVersion(cluster.Provisioned.CurrentBrokerSoftwareInfo.KafkaVersion)
} else {
// TODO: For severless clusters, how should we handle this? Currently defaulting to 4.0.0
kafkaVersion = "4.0.0"
}

kafkaAdminFactory := func(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker) (client.KafkaAdmin, error) {
switch opts.AuthType {
case types.AuthTypeIAM:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithIAMAuth())
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithIAMAuth())
case types.AuthTypeSASLSCRAM:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithSASLSCRAMAuth(opts.SASLScramUsername, opts.SASLScramPassword))
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithSASLSCRAMAuth(opts.SASLScramUsername, opts.SASLScramPassword))
case types.AuthTypeUnauthenticated:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithUnauthenticatedAuth())
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithUnauthenticatedAuth())
case types.AuthTypeTLS:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithTLSAuth(opts.TLSCACert, opts.TLSClientCert, opts.TLSClientKey))
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithTLSAuth(opts.TLSCACert, opts.TLSClientCert, opts.TLSClientKey))
default:
return nil, fmt.Errorf("❌ Auth type: %v not yet supported", opts.AuthType)
}
}

mskService := msk.NewMSKService(mskClient)

cloudWatchClient, err := client.NewCloudWatchClient(opts.Region)
if err != nil {
return fmt.Errorf("failed to create cloudwatch client: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions internal/cli/scan/cluster/scan_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,16 @@ func runScanCluster(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create msk client: %v", err)
}

kafkaAdminFactory := func(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker) (client.KafkaAdmin, error) {
kafkaAdminFactory := func(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker, kafkaVersion string) (client.KafkaAdmin, error) {
switch opts.AuthType {
case types.AuthTypeIAM:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithIAMAuth())
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithIAMAuth())
case types.AuthTypeSASLSCRAM:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithSASLSCRAMAuth(opts.SASLScramUsername, opts.SASLScramPassword))
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithSASLSCRAMAuth(opts.SASLScramUsername, opts.SASLScramPassword))
case types.AuthTypeUnauthenticated:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithUnauthenticatedAuth())
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithUnauthenticatedAuth())
case types.AuthTypeTLS:
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, client.WithTLSAuth(opts.TLSCACert, opts.TLSClientCert, opts.TLSClientKey))
return client.NewKafkaAdmin(brokerAddresses, clientBrokerEncryptionInTransit, opts.Region, kafkaVersion, client.WithTLSAuth(opts.TLSCACert, opts.TLSClientCert, opts.TLSClientKey))
default:
return nil, fmt.Errorf("❌ Auth type: %v not yet supported", opts.AuthType)
}
Expand Down
13 changes: 9 additions & 4 deletions internal/client/kafka_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func configureTLSAuth(config *sarama.Config, caCertFile string, clientCertFile s
return nil
}

func configureCommonSettings(config *sarama.Config, clientID string) {
config.Version = sarama.V4_0_0_0
func configureCommonSettings(config *sarama.Config, clientID string, kafkaVersion sarama.KafkaVersion) {
config.Version = kafkaVersion
config.ClientID = clientID

// Network-level timeout configurations
Expand Down Expand Up @@ -252,7 +252,7 @@ func (k *KafkaAdminClient) Close() error {
}

// NewKafkaAdmin creates a new Kafka admin client for the given broker addresses and region
func NewKafkaAdmin(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker, region string, opts ...AdminOption) (KafkaAdmin, error) {
func NewKafkaAdmin(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker, region string, kafkaVersion string, opts ...AdminOption) (KafkaAdmin, error) {
// Default configuration
config := AdminConfig{
authType: types.AuthTypeIAM, // Default to IAM auth
Expand All @@ -263,8 +263,13 @@ func NewKafkaAdmin(brokerAddresses []string, clientBrokerEncryptionInTransit kaf
opt(&config)
}

saramaKafkaVersion, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
return nil, fmt.Errorf("❌ Failed to parse Kafka version: %v", err)
}

saramaConfig := sarama.NewConfig()
configureCommonSettings(saramaConfig, "kcp-cli")
configureCommonSettings(saramaConfig, "kcp-cli", saramaKafkaVersion)

switch config.authType {
case types.AuthTypeIAM:
Expand Down
66 changes: 62 additions & 4 deletions internal/client/kafka_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/IBM/sarama"
kafkatypes "github.com/aws/aws-sdk-go-v2/service/kafka/types"
"github.com/confluentinc/kcp/internal/types"
"github.com/confluentinc/kcp/internal/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -296,7 +297,7 @@ func TestConfigureCommonSettings(t *testing.T) {
config := sarama.NewConfig()
clientID := "test-client"

configureCommonSettings(config, clientID)
configureCommonSettings(config, clientID, sarama.V4_0_0_0)

// Verify common settings
assert.Equal(t, sarama.V4_0_0_0, config.Version)
Expand Down Expand Up @@ -556,7 +557,7 @@ func TestNewKafkaAdmin(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
admin, err := NewKafkaAdmin(tt.brokerAddresses, tt.clientBrokerEncryptionInTransit, tt.region, tt.opts...)
admin, err := NewKafkaAdmin(tt.brokerAddresses, tt.clientBrokerEncryptionInTransit, tt.region, "4.0.0", tt.opts...)

if tt.expectError {
require.Error(t, err)
Expand All @@ -574,7 +575,7 @@ func TestNewKafkaAdmin(t *testing.T) {
func TestNewKafkaAdmin_DefaultConfiguration(t *testing.T) {
t.Skip("skipping integration test that requires real Kafka brokers")
// Test that NewKafkaAdmin uses IAM auth by default
admin, err := NewKafkaAdmin([]string{"broker1:9098"}, kafkatypes.ClientBrokerTls, "us-west-2")
admin, err := NewKafkaAdmin([]string{"broker1:9098"}, kafkatypes.ClientBrokerTls, "us-west-2", "4.0.0")

// This will likely fail due to network/credentials, but we can verify the error message
if err != nil {
Expand All @@ -595,7 +596,7 @@ func TestNewKafkaAdmin_MultipleOptions(t *testing.T) {
WithSASLSCRAMAuth("user", "pass"), // This should override the IAM auth
}

admin, err := NewKafkaAdmin([]string{"broker1:9096"}, kafkatypes.ClientBrokerTls, "us-west-2", opts...)
admin, err := NewKafkaAdmin([]string{"broker1:9096"}, kafkatypes.ClientBrokerTls, "us-west-2", "4.0.0", opts...)

// This will likely fail due to network/credentials, but we can verify the error message
if err != nil {
Expand Down Expand Up @@ -628,3 +629,60 @@ func TestClusterKafkaMetadata_Structure(t *testing.T) {
assert.Equal(t, int32(1), metadata.ControllerID)
assert.Equal(t, "test-cluster", metadata.ClusterID)
}

func TestSaramaKafkaVersionParsing(t *testing.T) {
tests := []struct {
name string
input string
expectedOutput sarama.KafkaVersion
}{
{
name: "4.0.x.kraft should convert to sarama.V4_0_0_0",
input: "4.0.x.kraft",
expectedOutput: sarama.V4_0_0_0,
},
{
name: "3.9.x should convert to sarama.V3_9_0_0",
input: "3.9.x",
expectedOutput: sarama.V3_9_0_0,
},
{
name: "3.9.x.kraft should convert to sarama.V3_9_0_0",
input: "3.9.x.kraft",
expectedOutput: sarama.V3_9_0_0,
},
{
name: "3.7.x.kraft should convert to sarama.V3_7_0_0",
input: "3.7.x.kraft",
expectedOutput: sarama.V3_7_0_0,
},
{
name: "3.6.0.1 should convert to sarama.V3_6_0_0",
input: "3.6.0.1",
expectedOutput: sarama.V3_6_0_0,
},
{
name: "3.6.0 should remain sarama.V3_6_0_0",
input: "3.6.0",
expectedOutput: sarama.V3_6_0_0,
},
{
name: "2.8.2.tiered should convert to sarama.V2_8_2_0",
input: "2.8.2.tiered",
expectedOutput: sarama.V2_8_2_0,
},
{
name: "2.6.0 should remain sarama.V2_6_0_0",
input: "2.6.0",
expectedOutput: sarama.V2_6_0_0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := sarama.ParseKafkaVersion(utils.ConvertKafkaVersion(&tt.input))
assert.NoError(t, err)
assert.Equal(t, tt.expectedOutput, result)
})
}
}
32 changes: 26 additions & 6 deletions internal/generators/scan/cluster/cluster_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
kafkatypes "github.com/aws/aws-sdk-go-v2/service/kafka/types"
"github.com/confluentinc/kcp/internal/client"
"github.com/confluentinc/kcp/internal/types"
"github.com/confluentinc/kcp/internal/utils"
)

// KafkaAdminFactory is a function type that creates a KafkaAdmin client
type KafkaAdminFactory func(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker) (client.KafkaAdmin, error)
type KafkaAdminFactory func(brokerAddresses []string, clientBrokerEncryptionInTransit kafkatypes.ClientBroker, kafkaVersion string) (client.KafkaAdmin, error)

type ClusterScannerOpts struct {
Region string
Expand Down Expand Up @@ -321,8 +322,9 @@ func (cs *ClusterScanner) scanKafkaResources(clusterInfo *types.ClusterInformati
}

clientBrokerEncryptionInTransit := types.GetClientBrokerEncryptionInTransit(clusterInfo.Cluster)
kafkaVersion := cs.getKafkaVersion(clusterInfo)

admin, err := cs.kafkaAdminFactory(brokerAddresses, clientBrokerEncryptionInTransit)
admin, err := cs.kafkaAdminFactory(brokerAddresses, clientBrokerEncryptionInTransit, kafkaVersion)
if err != nil {
return fmt.Errorf("❌ Failed to setup admin client: %v", err)
}
Expand All @@ -343,11 +345,16 @@ func (cs *ClusterScanner) scanKafkaResources(clusterInfo *types.ClusterInformati
}
clusterInfo.Topics = topics

acls, err := cs.scanKafkaAcls(admin)
if err != nil {
return err
// Serverless clusters do not support Kafka Admin API and instead returns an EOF error - this should be handled gracefully
if clusterInfo.Cluster.ClusterType == kafkatypes.ClusterTypeProvisioned {
acls, err := cs.scanKafkaAcls(admin)
if err != nil {
return err
}
clusterInfo.Acls = acls
} else {
slog.Warn("⚠️ Serverless clusters do not support querying Kafka ACLs, skipping ACLs scan")
}
clusterInfo.Acls = acls

return nil
}
Expand Down Expand Up @@ -379,3 +386,16 @@ func (cs *ClusterScanner) scanKafkaAcls(admin client.KafkaAdmin) ([]types.Acls,

return flattenedAcls, nil
}

func (cs *ClusterScanner) getKafkaVersion(clusterInfo *types.ClusterInformation) string {
switch clusterInfo.Cluster.ClusterType {
case kafkatypes.ClusterTypeProvisioned:
return utils.ConvertKafkaVersion(clusterInfo.Cluster.Provisioned.CurrentBrokerSoftwareInfo.KafkaVersion)
case kafkatypes.ClusterTypeServerless:
slog.Warn("⚠️ Serverless clusters do not return a Kafka version, defaulting to 4.0.0")
return "4.0.0"
default:
slog.Warn(fmt.Sprintf("⚠️ Unknown cluster type: %v, defaulting to 4.0.0", clusterInfo.Cluster.ClusterType))
return "4.0.0"
}
}
Loading