Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ The `kcp scan` command includes the following sub-commands:

- `cluster`
- `region`
- `broker-logs`

The sub-commands require the following minimum AWS IAM permissions:

Expand Down Expand Up @@ -257,6 +258,27 @@ The sub-commands require the following minimum AWS IAM permissions:
}
```

`broker-logs`:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<BROKER_LOGS_BUCKET>",
"arn:aws:s3:::<BROKER_LOGS_BUCKET>/*"
]
}
]
}
```

---

#### `kcp scan region`
Expand Down Expand Up @@ -367,6 +389,46 @@ The command generates two files - `cluster_scan_<cluster-name>.md` and `cluster_

---

#### `kcp scan broker-logs`

This command scans a hour window folder in s3 to identify as many clients as possible in the cluster.

**Prerequisites**
- Enable trace logging for `kafka.server.KafkaApis=TRACE` for each broker
- Enable s3 broker log delivery for the cluster

**Example Usage**

```shell
kcp scan broker-logs \
--region us-east-1 \
--s3-uri s3://my-cluster-logs-bucket/AWSLogs/635910096382/KafkaBrokerLogs/us-east-1/kcp-pub-cluster-90a919bc-5967-4805-8a47-09dad9019d9b-5/2025-08-13-14/
```

**Output:**
The command generates a csv file - `broker_logs_scan_results.csv` containing:

- All the unique clients it could identify based on a combination of values:
- i.e. clientID + topic + role + auth + principal

example output
```csv
Client ID,Role,Topic,Auth,Principal,Timestamp
consumer1,Consumer,test-topic-1,SASL_SCRAM,User:kafka-user-2,2025-08-18 10:15:16
default-producer-id,Producer,test-topic-1,SASL_SCRAM,User:kafka-user-2,2025-08-18 10:15:18
consumer2,Consumer,test-topic-1,UNAUTHENTICATED,User:ANONYMOUS,2025-08-18 10:18:22
default-producer-id,Producer,test-topic-1,UNAUTHENTICATED,User:ANONYMOUS,2025-08-18 10:18:24
```

Alternatively, the following environment variables need to be set:

```shell
export REGION=<aws-region>
export S3_URI=<folder-in-s3>
```

---

### `kcp report`

The `kcp report` command includes the following sub-commands:
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/costexplorer v1.54.0
github.com/aws/aws-sdk-go-v2/service/iam v1.46.0
github.com/aws/aws-sdk-go-v2/service/kafka v1.42.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1
github.com/charmbracelet/glamour v0.10.0
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.7
Expand All @@ -21,13 +22,17 @@ require (

require (
github.com/alecthomas/chroma/v2 v2.14.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.18.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.37.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsK
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go-v2 v1.38.0 h1:UCRQ5mlqcFk9HJDIqENSLR3wiG1VTWlyUfLDEvY7RxU=
github.com/aws/aws-sdk-go-v2 v1.38.0/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0/go.mod h1:/mXlTIVG9jbxkqDnr5UQNQxW1HRYxeGklkM9vAFeabg=
github.com/aws/aws-sdk-go-v2/config v1.31.0 h1:9yH0xiY5fUnVNLRWO0AtayqwU1ndriZdN78LlhruJR4=
github.com/aws/aws-sdk-go-v2/config v1.31.0/go.mod h1:VeV3K72nXnhbe4EuxxhzsDc/ByrCSlZwUnWH52Nde/I=
github.com/aws/aws-sdk-go-v2/credentials v1.18.4 h1:IPd0Algf1b+Qy9BcDp0sCUcIWdCQPSzDoMK3a8pcbUM=
Expand All @@ -22,6 +24,8 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3 h1:joyyUFhiTQQmVK6ImzN
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.3/go.mod h1:+vNIyZQP3b3B1tSLI0lxvrU9cfM7gpdRXMFfm67ZcPc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1 h1:4HbnOGE9491a9zYJ9VpPh1ApgEq6ZlD4Kuv1PJenFpc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.1/go.mod h1:Z6QnHC6TmpJWUxAy8FI4JzA7rTwl6EIANkyK9OR5z5w=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.48.0 h1:NmaelAJldom/+eWDlbYdurKrPL+TSvwKeHH/TnEYih8=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.48.0/go.mod h1:s+juX6Mf6RF+y14IK9Ed02U/q86Tqc3PKHIDtuzBMa4=
github.com/aws/aws-sdk-go-v2/service/costexplorer v1.54.0 h1:5e/C1PaQywGtklpMotdHKon/8MfsDyzJ9WFh0ge8G38=
Expand All @@ -30,10 +34,16 @@ github.com/aws/aws-sdk-go-v2/service/iam v1.46.0 h1:bJgrqPT2vy+OrJpSeVfZ4e4zaD/E
github.com/aws/aws-sdk-go-v2/service/iam v1.46.0/go.mod h1:WsQuuejKHNC3UWs+n4usF+nNy1DFGYgWRugqFf+gGD4=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1 h1:ps3nrmBWdWwakZBydGX1CxeYFK80HsQ79JLMwm7Y4/c=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.1/go.mod h1:bAdfrfxENre68Hh2swNaGEVuFYE74o0SaSCAlaG9E74=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3 h1:ieRzyHXypu5ByllM7Sp4hC5f/1Fy5wqxqY0yB85hC7s=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.3/go.mod h1:O5ROz8jHiOAKAwx179v+7sHMhfobFVi6nZt8DEyiYoM=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1 h1:MdVYlN5pcQu1t1OYx4Ajo3fKl1IEhzgdPQbYFCRjYS8=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.1/go.mod h1:iikmNLrvHm2p4a3/4BPeix2S9P+nW8yM1IZW73x8bFA=
github.com/aws/aws-sdk-go-v2/service/kafka v1.42.0 h1:/EB9p30Te0eClIIptu3g3meO38O87YVVdE/UVbyTvWA=
github.com/aws/aws-sdk-go-v2/service/kafka v1.42.0/go.mod h1:uvEnl4e5ie3zB+2qlQCqcv0CgOyI8ajOegLEKERjqfs=
github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1 h1:Hsqo8+dFxSdDvv9B2PgIx1AJAnDpqgS0znVI+R+MoGY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.85.1/go.mod h1:8Q0TAPXD68Z8YqlcIGHs/UNIDHsxErV9H4dl4vJEpgw=
github.com/aws/aws-sdk-go-v2/service/sso v1.28.0 h1:Mc/MKBf2m4VynyJkABoVEN+QzkfLqGj0aiJuEe7cMeM=
github.com/aws/aws-sdk-go-v2/service/sso v1.28.0/go.mod h1:iS5OmxEcN4QIPXARGhavH7S8kETNL11kym6jhoS7IUQ=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.0 h1:6csaS/aJmqZQbKhi1EyEMM7yBW653Wy/B9hnBofW+sw=
Expand Down
102 changes: 102 additions & 0 deletions internal/cli/scan/broker_logs/scan_broker_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package broker_logs

import (
"fmt"

"github.com/confluentinc/kcp/internal/client"
"github.com/confluentinc/kcp/internal/generators/scan/broker_logs"
"github.com/confluentinc/kcp/internal/services/s3"
"github.com/confluentinc/kcp/internal/utils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

var (
s3Uri string
region string
)

func NewScanBrokerLogsCmd() *cobra.Command {
brokerLogsCmd := &cobra.Command{
Use: "broker-logs",
Short: "Scan the broker logs for client activity",
Long: "Scan the broker logs to help identify clients that are using the cluster based on activity in the logs",
SilenceErrors: true,
Args: cobra.NoArgs,
PreRunE: preRunScanBrokerLogs,
RunE: runScanBrokerLogs,
}

groups := map[*pflag.FlagSet]string{}

requiredFlags := pflag.NewFlagSet("required", pflag.ExitOnError)
requiredFlags.SortFlags = false
requiredFlags.StringVar(&region, "region", "", "The AWS region")
requiredFlags.StringVar(&s3Uri, "s3-uri", "", "The S3 URI to the broker logs folder (e.g., s3://my-bucket/kafka-logs/2025-08-04-06/)")

brokerLogsCmd.Flags().AddFlagSet(requiredFlags)

groups[requiredFlags] = "Required Flags"

brokerLogsCmd.SetUsageFunc(func(c *cobra.Command) error {
fmt.Printf("%s\n\n", c.Short)

flagOrder := []*pflag.FlagSet{requiredFlags}
groupNames := []string{"Required Flags"}

for i, fs := range flagOrder {
usage := fs.FlagUsages()
if usage != "" {
fmt.Printf("%s:\n%s\n", groupNames[i], usage)
}
}

fmt.Println("All flags can be provided via environment variables (uppercase, with underscores).")

return nil
})

return brokerLogsCmd
}

func preRunScanBrokerLogs(cmd *cobra.Command, args []string) error {
if err := utils.BindEnvToFlags(cmd); err != nil {
return err
}

return nil
}

func runScanBrokerLogs(cmd *cobra.Command, args []string) error {
opts, err := parseScanBrokerLogsOpts()
if err != nil {
return fmt.Errorf("failed to parse scan broker logs opts: %v", err)
}

s3Client, err := client.NewS3Client(opts.Region)
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}

s3Service := s3.NewS3Service(s3Client)

brokerLogsScanner, err := broker_logs.NewBrokerLogsScanner(s3Service, *opts)
if err != nil {
return fmt.Errorf("failed to create broker logs scanner: %v", err)
}

if err := brokerLogsScanner.Run(); err != nil {
return err
}

return nil
}

func parseScanBrokerLogsOpts() (*broker_logs.BrokerLogsScannerOpts, error) {
opts := broker_logs.BrokerLogsScannerOpts{
S3Uri: s3Uri,
Region: region,
}

return &opts, nil
}
2 changes: 2 additions & 0 deletions internal/cli/scan/scan.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scan

import (
"github.com/confluentinc/kcp/internal/cli/scan/broker_logs"
"github.com/confluentinc/kcp/internal/cli/scan/cluster"
"github.com/confluentinc/kcp/internal/cli/scan/region"
"github.com/spf13/cobra"
Expand All @@ -16,6 +17,7 @@ func NewScanCmd() *cobra.Command {
scanCmd.AddCommand(
cluster.NewScanClusterCmd(),
region.NewScanRegionCmd(),
broker_logs.NewScanBrokerLogsCmd(),
)

return scanCmd
Expand Down
21 changes: 21 additions & 0 deletions internal/client/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import (
"context"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

func NewS3Client(region string) (*s3.Client, error) {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, err
}

if region != "" {
cfg.Region = region
}

return s3.NewFromConfig(cfg), nil
}
62 changes: 62 additions & 0 deletions internal/generators/init/assets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The `kcp scan` command includes the following sub-commands:

- `cluster`
- `region`
- `broker-logs`

The sub-commands require the following minimum AWS IAM permissions:

Expand Down Expand Up @@ -156,6 +157,27 @@ The sub-commands require the following minimum AWS IAM permissions:
}
```

`broker-logs`:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<BROKER_LOGS_BUCKET>",
"arn:aws:s3:::<BROKER_LOGS_BUCKET>/*"
]
}
]
}
```

---

#### `kcp scan region`
Expand Down Expand Up @@ -266,6 +288,46 @@ The command generates two files - `cluster_scan_<cluster-name>.md` and `cluster_

---

#### `kcp scan broker-logs`

This command scans a hour window folder in s3 to identify as many clients as possible in the cluster.

**Prerequisites**
- Enable trace logging for `kafka.server.KafkaApis=TRACE` for each broker
- Enable s3 broker log delivery for the cluster

**Example Usage**

```shell
kcp scan broker-logs \
--region us-east-1 \
--s3-uri s3://my-cluster-logs-bucket/AWSLogs/635910096382/KafkaBrokerLogs/us-east-1/kcp-pub-cluster-90a919bc-5967-4805-8a47-09dad9019d9b-5/2025-08-13-14/
```

**Output:**
The command generates a csv file - `broker_logs_scan_results.csv` containing:

- All the unique clients it could identify based on a combination of values:
- i.e. clientID + topic + role + auth + principal

example output
```csv
Client ID,Role,Topic,Auth,Principal,Timestamp
consumer1,Consumer,test-topic-1,SASL_SCRAM,User:kafka-user-2,2025-08-18 10:15:16
default-producer-id,Producer,test-topic-1,SASL_SCRAM,User:kafka-user-2,2025-08-18 10:15:18
consumer2,Consumer,test-topic-1,UNAUTHENTICATED,User:ANONYMOUS,2025-08-18 10:18:22
default-producer-id,Producer,test-topic-1,UNAUTHENTICATED,User:ANONYMOUS,2025-08-18 10:18:24
```

Alternatively, the following environment variables need to be set:

```shell
export REGION=<aws-region>
export S3_URI=<folder-in-s3>
```

---

### `kcp report`

The `kcp report` command includes the following sub-commands:
Expand Down
Loading