Skip to content

Commit 6bc3d34

Browse files
committed
feat(server): implement Kafka MCP server with CLI and tools integration
Signed-off-by: Tommy Nguyen <[email protected]>
1 parent 2bd0e25 commit 6bc3d34

File tree

9 files changed

+1141
-55
lines changed

9 files changed

+1141
-55
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
.PHONY: build test clean run-dev release-snapshot run-docker run docker-compose-up docker-compose-down lint
22

33
# Variables
4-
BINARY_NAME=mcp-trino
4+
BINARY_NAME=kafka-mcp-server
55
VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
66
BUILD_DIR=bin
77

@@ -52,4 +52,4 @@ lint:
5252
@golangci-lint run --timeout=5m
5353

5454
# Default target
55-
all: clean build
55+
all: clean build

cmd/server/main.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
10+
"github.com/tuannvm/kafka-mcp-server/config"
11+
"github.com/tuannvm/kafka-mcp-server/kafka"
12+
"github.com/tuannvm/kafka-mcp-server/mcp"
13+
"github.com/tuannvm/kafka-mcp-server/server"
14+
)
15+
16+
// Version is set during build via -X ldflags
17+
var Version = "dev"
18+
19+
func main() {
20+
// Setup signal handling for graceful shutdown
21+
ctx, cancel := context.WithCancel(context.Background())
22+
defer cancel()
23+
24+
// Handle SIGINT and SIGTERM
25+
sigCh := make(chan os.Signal, 1)
26+
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
27+
go func() {
28+
sig := <-sigCh
29+
slog.Info("Received signal, shutting down", "signal", sig)
30+
cancel()
31+
}()
32+
33+
// Load configuration
34+
cfg := config.LoadConfig() // Changed from cfg, err := config.LoadConfig()
35+
36+
// Initialize Kafka client
37+
kafkaClient, err := kafka.NewClient(cfg)
38+
if err != nil {
39+
slog.Error("Failed to create Kafka client", "error", err)
40+
os.Exit(1)
41+
}
42+
defer kafkaClient.Close()
43+
44+
// Create MCP server
45+
s := server.NewMCPServer("kafka-mcp-server", Version)
46+
47+
// Register MCP resources and tools
48+
mcp.RegisterResources(s, kafkaClient)
49+
mcp.RegisterTools(s, kafkaClient, cfg)
50+
51+
// Start server
52+
slog.Info("Starting Kafka MCP server", "version", Version, "transport", cfg.MCPTransport)
53+
if err := server.Start(ctx, s, cfg); err != nil {
54+
slog.Error("Server error", "error", err)
55+
os.Exit(1)
56+
}
57+
58+
slog.Info("Server shutdown complete")
59+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ go 1.24.2
55
require (
66
github.com/mark3labs/mcp-go v0.21.1
77
github.com/stretchr/testify v1.10.0
8-
github.com/testcontainers/testcontainers-go v0.36.0
98
github.com/testcontainers/testcontainers-go/modules/kafka v0.36.0
109
github.com/twmb/franz-go v1.18.1
1110
github.com/twmb/franz-go/pkg/kmsg v1.9.0
@@ -49,6 +48,7 @@ require (
4948
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
5049
github.com/shirou/gopsutil/v4 v4.25.1 // indirect
5150
github.com/sirupsen/logrus v1.9.3 // indirect
51+
github.com/testcontainers/testcontainers-go v0.36.0 // indirect
5252
github.com/tklauser/go-sysconf v0.3.12 // indirect
5353
github.com/tklauser/numcpus v0.6.1 // indirect
5454
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect

0 commit comments

Comments
 (0)