-
Notifications
You must be signed in to change notification settings - Fork 1
feat: scanning broker logs to discover information on clients #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new feature for scanning Kafka broker logs stored in S3 to discover information about clients. The implementation adds the ability to parse log files, extract client activity metadata, and generate CSV reports of client connections.
Key changes:
- Added S3 service integration for downloading and decompressing broker log files
- Implemented a Kafka API trace line parser to extract client metadata from log entries
- Created a broker logs scanner that processes multiple log files and generates CSV output
Reviewed Changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
internal/services/s3/s3_service.go |
S3 service for downloading and decompressing log files |
internal/generators/scan/broker_logs/kafka_trace_line_parser.go |
Parser for extracting client metadata from Kafka API trace logs |
internal/generators/scan/broker_logs/kafka_trace_line_parser_test.go |
Comprehensive test coverage for the log parser |
internal/generators/scan/broker_logs/broker_logs_scanner.go |
Main scanner that orchestrates log processing and CSV generation |
internal/cli/scan/broker_logs/scan_broker_logs.go |
CLI command implementation for broker logs scanning |
internal/client/s3.go |
S3 client factory function |
internal/cli/scan/scan.go |
Added broker logs command to scan CLI |
go.mod |
Added S3 SDK dependency |
internal/generators/scan/broker_logs/kafka_trace_line_parser.go
Outdated
Show resolved
Hide resolved
internal/generators/scan/broker_logs/kafka_trace_line_parser.go
Outdated
Show resolved
Hide resolved
Not to be that guy but we should add an entry in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new broker-logs
subcommand to the kcp scan
command that analyzes Kafka broker logs from S3 to discover client activity. The feature scans compressed log files to identify unique clients based on their composite key (clientID + topic + role + auth + principal) and generates a CSV report sorted by timestamp.
Key changes:
- New broker logs scanning functionality with S3 integration
- Kafka trace log parsing for PRODUCE and FETCH operations
- Support for multiple authentication types (IAM, SASL_SCRAM, TLS, UNAUTHENTICATED)
Reviewed Changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
internal/services/s3/s3_service.go | S3 service for downloading and decompressing log files |
internal/generators/scan/broker_logs/kafka_trace_line_parser.go | Parser for Kafka API trace log lines with regex patterns |
internal/generators/scan/broker_logs/kafka_trace_line_parser_test.go | Comprehensive test suite for log parsing functionality |
internal/generators/scan/broker_logs/broker_logs_scanner.go | Main scanner that coordinates log processing and CSV generation |
internal/cli/scan/broker_logs/scan_broker_logs.go | CLI command implementation |
internal/client/s3.go | S3 client factory |
internal/cli/scan/scan.go | Registration of new broker-logs subcommand |
go.mod | Addition of S3 SDK dependency |
README.md | Documentation for the new broker-logs command |
internal/generators/init/assets/README.md | Template documentation for the broker-logs command |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
requestMetadataByCompositeKey := bs.handleLogFiles(ctx, bucket, logFiles) | ||
|
||
if err := bs.generateCSV(requestMetadataByCompositeKey); err != nil { | ||
slog.Error("failed to write CSV file", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message on line 82 'failed to write CSV file' is generic and doesn't provide enough context about what specifically failed during CSV generation. Consider making it more specific, such as 'failed to generate CSV output file'.
slog.Error("failed to write CSV file", "error", err) | |
slog.Error("failed to generate CSV output file", "error", err) |
Copilot uses AI. Check for mistakes.
} | ||
|
||
func (bs *BrokerLogsScanner) generateCSV(requestMetadataByCompositeKey map[string]*RequestMetadata) error { | ||
fileName := "broker_logs_scan_results.csv" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CSV filename is hardcoded as 'broker_logs_scan_results.csv' on line 163. This could cause issues if multiple scans are run in the same directory. Consider making the filename configurable or include a timestamp to make it unique.
fileName := "broker_logs_scan_results.csv" | |
fileName := fmt.Sprintf("broker_logs_scan_results_%s.csv", time.Now().Format("20060102_150405")) |
Copilot uses AI. Check for mistakes.
} | ||
|
||
// DownloadAndDecompressLogFile downloads and decompresses a log file from S3 | ||
func (s *S3Service) DownloadAndDecompressLogFile(ctx context.Context, bucket, key string) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function loads entire log files into memory as byte arrays. For large log files, this could cause memory issues. Consider using streaming or chunked processing for better memory efficiency, especially when processing multiple large files.
func (s *S3Service) DownloadAndDecompressLogFile(ctx context.Context, bucket, key string) ([]byte, error) { | |
// DownloadAndDecompressLogFile downloads and decompresses a log file from S3. | |
// It returns an io.ReadCloser for streaming the decompressed content. | |
func (s *S3Service) DownloadAndDecompressLogFile(ctx context.Context, bucket, key string) (io.ReadCloser, error) { |
Copilot uses AI. Check for mistakes.
return nil, ErrorUnableToParseKafkaApiLine | ||
} | ||
|
||
timestamp, err := time.Parse("2006-01-02 15:04:05,000", timestampMatches[1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp parsing uses a hardcoded UTC timezone assumption. The parsed time should explicitly be set to UTC timezone to ensure consistency, as time.Parse
returns a time in UTC when no timezone is specified in the input, but this should be made explicit.
timestamp, err := time.Parse("2006-01-02 15:04:05,000", timestampMatches[1]) | |
timestamp, err := time.ParseInLocation("2006-01-02 15:04:05,000", timestampMatches[1], time.UTC) |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Though I think we should eventually look into parsing the logs in parallel
I had this originally but then it was becoming too complicated, but in reality what i have now is a much stripped back version of what i originally had. So may not be too hard to bring it back. |
with s3 broker logging enabled
AND
kafka.server.KafkaApis=TRACE enabled on each broker
AND
policy on user
given a folder in s3 for particular hour range
run scan
will result in scan of all unique clients it can find in logs, uniqueness based on
composite key of
clientID + topic + role + auth + principal
and sorted by timestamp
for example IAM
or SASL_SCRAM
or TLS
or UNAUTHENTICATED
ie