Skip to content

feat(kafka-client): Allow for kafka client configuration to be passed through via CLI #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"fastify": "^5.3.2",
"openapi-fetch": "^0.13.7",
"pino": "^9.6.0",
"properties-file": "^3.5.12",
"zod": "^3.24.4"
},
"files": [
Expand Down
71 changes: 52 additions & 19 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { TransportType } from "@src/mcp/transports/types.js";
import * as dotenv from "dotenv";
import fs from "fs";
import path from "path";
import { getProperties, KeyValuePairObject } from "properties-file";
import pkg from "../package.json" with { type: "json" };

// Define the interface for our CLI options
Expand All @@ -15,6 +16,7 @@ export interface CLIOptions {
blockTools?: string[];
listTools?: boolean;
disableConfluentCloudTools?: boolean;
kafkaConfig: KeyValuePairObject;
}

/**
Expand Down Expand Up @@ -78,8 +80,7 @@ function parseToolList(value: string): string[] {
function readFileLines(filePath: string): string[] {
const absPath = path.resolve(filePath);
if (!fs.existsSync(absPath)) {
logger.error(`Tool list file not found: ${absPath}`);
process.exit(1);
throw new Error(`Tool list file not found: ${absPath}`);
}
const lines = fs
.readFileSync(absPath, "utf-8")
Expand All @@ -101,6 +102,10 @@ export function parseCliArgs(): CLIOptions {
)
.version(getPackageVersion())
.option("-e, --env-file <path>", "Load environment variables from file")
.option(
"-k, --kafka-config-file <file>",
"Path to a properties file for configuring kafka clients",
)
.addOption(
new Option(
"-t, --transport <types>",
Expand Down Expand Up @@ -134,16 +139,14 @@ export function parseCliArgs(): CLIOptions {
"--disable-confluent-cloud-tools",
"Disable all tools that require Confluent Cloud REST APIs (cloud-only tools).",
)
.action((options) => {
if (options.envFile) {
loadEnvironmentVariables(options.envFile);
}
})
.allowExcessArguments(false)
.exitOverride();

try {
const opts = program.parse().opts();
if (opts.envFile) {
loadEnvironmentVariables(opts.envFile);
}
// Precedence: CLI > file > undefined
let allowTools: string[] | undefined = undefined;
let blockTools: string[] | undefined = undefined;
Expand All @@ -157,6 +160,10 @@ export function parseCliArgs(): CLIOptions {
} else if (opts.blockToolsFile) {
blockTools = readFileLines(opts.blockToolsFile);
}
let kafkaConfig: KeyValuePairObject = {};
if (opts.kafkaConfigFile) {
kafkaConfig = parsePropertiesFile(opts.kafkaConfigFile);
}
return {
envFile: opts.envFile,
transports: Array.isArray(opts.transport)
Expand All @@ -166,15 +173,14 @@ export function parseCliArgs(): CLIOptions {
blockTools,
listTools: !!opts.listTools,
disableConfluentCloudTools: !!opts.disableConfluentCloudTools,
kafkaConfig: kafkaConfig,
};
} catch (error: unknown) {
// Commander uses error.name === 'CommanderError' for its errors
if (
error instanceof CommanderError &&
(error.code === "commander.helpDisplayed" ||
error.code === "commander.version")
) {
// Help or version was displayed, exit silently
process.exit(0);
}
if (error instanceof Error) {
Expand All @@ -187,9 +193,9 @@ export function parseCliArgs(): CLIOptions {
"Error parsing CLI options",
);
} else {
logger.error({ error }, "Error parsing CLI options");
logger.error({ error: String(error) }, "Error parsing CLI options");
}
process.exit(0);
process.exit(1);
}
}

Expand All @@ -202,19 +208,14 @@ export function loadEnvironmentVariables(envFile: string): void {

// Check if file exists
if (!fs.existsSync(envPath)) {
logger.error(`Environment file not found: ${envPath}`);
process.exit(1);
throw new Error(`Environment file not found: ${envPath}`);
}

// Load environment variables from file
const result = dotenv.config({ path: envPath });

if (result.error) {
logger.error(
{ error: result.error },
"Error loading environment variables",
);
process.exit(1);
throw new Error(`Error loading environment variables: ${result.error}`);
}

logger.info(`Loaded environment variables from ${envPath}`);
Expand Down Expand Up @@ -272,5 +273,37 @@ export function getFilteredToolNames(cliOptions: CLIOptions): ToolName[] {
(t) => !validBlock.includes(t),
);
}
return filteredToolNames.sort();
// Deduplicate and sort
const deduped = Array.from(new Set(filteredToolNames)).sort();
if (
(!cliOptions.allowTools || cliOptions.allowTools.length === 0) &&
(!cliOptions.blockTools || cliOptions.blockTools.length === 0)
) {
logger.info(
"No allow/block tool lists provided; all tools are enabled by default.",
);
}
return deduped;
}

/**
* Loads configuration from a properties file
* @param filePath - Path to the properties file
* @returns configuration object
*/
export function parsePropertiesFile(filePath: string): KeyValuePairObject {
const absPath = path.resolve(filePath);
if (!fs.existsSync(absPath)) {
throw new Error(`Properties file not found: ${absPath}`);
}
try {
const properties: KeyValuePairObject = getProperties(
fs.readFileSync(absPath, "utf-8"),
);
return properties;
} catch (err) {
throw new Error(
`Failed to parse properties file: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
38 changes: 21 additions & 17 deletions src/confluent/client-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @fileoverview Provides client management functionality for Kafka and Confluent Cloud services.
*/

import { KafkaJS } from "@confluentinc/kafka-javascript";
import { GlobalConfig, KafkaJS } from "@confluentinc/kafka-javascript";
import { SchemaRegistryClient } from "@confluentinc/schemaregistry";
import {
ConfluentAuth,
Expand Down Expand Up @@ -67,7 +67,7 @@ export interface ClientManager
}

export interface ClientManagerConfig {
kafka: KafkaJS.CommonConstructorConfig;
kafka: GlobalConfig;
endpoints: ConfluentEndpoints;
auth: {
cloud: ConfluentAuth;
Expand All @@ -88,6 +88,7 @@ export class DefaultClientManager
private confluentCloudFlinkBaseUrl: string | undefined;
private confluentCloudSchemaRegistryBaseUrl: string | undefined;
private confluentCloudKafkaRestBaseUrl: string | undefined;
private readonly kafkaConfig: GlobalConfig;
private readonly kafkaClient: Lazy<KafkaJS.Kafka>;
private readonly adminClient: AsyncLazy<KafkaJS.Admin>;
private readonly producer: AsyncLazy<KafkaJS.Producer>;
Expand Down Expand Up @@ -115,7 +116,8 @@ export class DefaultClientManager
this.confluentCloudSchemaRegistryBaseUrl = config.endpoints.schemaRegistry;
this.confluentCloudKafkaRestBaseUrl = config.endpoints.kafka;

this.kafkaClient = new Lazy(() => new KafkaJS.Kafka(config.kafka));
this.kafkaConfig = config.kafka;
this.kafkaClient = new Lazy(() => new KafkaJS.Kafka(this.kafkaConfig));
this.adminClient = new AsyncLazy(
async () => {
logger.info("Connecting Kafka Admin");
Expand All @@ -128,10 +130,7 @@ export class DefaultClientManager
this.producer = new AsyncLazy(
async () => {
logger.info("Connecting Kafka Producer");
const producer = this.kafkaClient.get().producer({
"compression.type": "gzip",
"linger.ms": 5,
});
const producer = this.kafkaClient.get().producer();
await producer.connect();
return producer;
},
Expand Down Expand Up @@ -213,18 +212,23 @@ export class DefaultClientManager

/** @inheritdoc */
async getConsumer(sessionId?: string): Promise<KafkaJS.Consumer> {
const baseGroupId = "mcp-confluent"; // should be configurable?
// Build the config inline, merging with defaults
const baseGroupId =
(this.kafkaConfig["group.id"] as string) || "mcp-confluent";
const groupId = sessionId ? `${baseGroupId}-${sessionId}` : baseGroupId;
logger.info(`Creating new Kafka Consumer with groupId: ${groupId}`);
return this.kafkaClient.get().consumer({
kafkaJS: {
fromBeginning: true,
groupId,
allowAutoTopicCreation: false,
autoCommit: false,
},
});
const consumerConfig = {
// Spread all user-provided config
...this.kafkaConfig,
// Override with our logic
"group.id": groupId,
"auto.offset.reset": this.kafkaConfig["auto.offset.reset"] || "earliest",
"allow.auto.create.topics":
this.kafkaConfig["allow.auto.create.topics"] || false,
"enable.auto.commit": this.kafkaConfig["enable.auto.commit"] || false,
};
return this.kafkaClient.get().consumer(consumerConfig);
}

/**
* a function that sets a new confluent cloud rest endpoint.
* Closes the current client first.
Expand Down
34 changes: 20 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node

import { KafkaJS } from "@confluentinc/kafka-javascript";
import { GlobalConfig } from "@confluentinc/kafka-javascript";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import {
getFilteredToolNames,
Expand All @@ -13,7 +13,7 @@ import { ToolFactory } from "@src/confluent/tools/tool-factory.js";
import { ToolName } from "@src/confluent/tools/tool-name.js";
import { EnvVar } from "@src/env-schema.js";
import { initEnv } from "@src/env.js";
import { kafkaLogger, logger } from "@src/logger.js";
import { logger } from "@src/logger.js";
import { TransportManager } from "@src/mcp/transports/index.js";

// Parse command line arguments and load environment variables if --env-file is specified
Expand All @@ -24,18 +24,24 @@ async function main() {
// Initialize environment after CLI args are processed
const env = await initEnv();

const kafkaClientConfig: KafkaJS.CommonConstructorConfig = {
kafkaJS: {
brokers: env.BOOTSTRAP_SERVERS?.split(",") ?? [],
clientId: "mcp-client",
ssl: true,
sasl: {
mechanism: "plain",
username: env.KAFKA_API_KEY!,
password: env.KAFKA_API_SECRET!,
},
logger: kafkaLogger,
},
// Merge environment variables with kafka config from CLI
// some additional configurations could be set in the client manager
// like seperating groupIds by sessionId
const kafkaClientConfig: GlobalConfig = {
// Base configuration from environment variables
"bootstrap.servers": env.BOOTSTRAP_SERVERS!,
"client.id": "mcp-client",
"security.protocol": "plaintext",
...(env.KAFKA_API_KEY && env.KAFKA_API_SECRET
? {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "PLAIN",
"sasl.username": env.KAFKA_API_KEY!,
"sasl.password": env.KAFKA_API_SECRET!,
}
: {}),
// Merge any additional properties from the kafka config file
...cliOptions.kafkaConfig,
Copy link
Preview

Copilot AI May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider clarifying the precedence of configuration merging between environment variables and CLI-provided Kafka configuration; ensure that CLI settings are intended to override environment variables as designed.

Copilot uses AI. Check for mistakes.

};

const clientManager = new DefaultClientManager({
Expand Down