Skip to content

Commit b5aa4ea

Browse files
authored
fix(tool): modify list schema to use SchemaRegistryClient to be accessible for CP (#37)
* feat(schema): Enhance ListSchemasHandler with improved error handling and logging - Integrated SchemaRegistryClient for fetching schema subjects and metadata. - Updated argument validation to include a boolean for deleted schemas. - Enhanced logging to provide detailed debug information during schema retrieval. - Improved error handling for fetching schema metadata and versions, ensuring clearer error messages in responses. * refactor(schema): Remove baseUrl from ListSchemasHandler and update required environment variables - Eliminated the baseUrl argument from the ListSchemasHandler, simplifying the argument structure. - Updated the getRequiredEnvVars method to include SCHEMA_REGISTRY_ENDPOINT as a necessary environment variable, enhancing configuration management.
1 parent 6db99f9 commit b5aa4ea

File tree

1 file changed

+125
-31
lines changed

1 file changed

+125
-31
lines changed
Lines changed: 125 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { SchemaRegistryClient } from "@confluentinc/schemaregistry";
12
import { ClientManager } from "@src/confluent/client-manager.js";
23
import { CallToolResult } from "@src/confluent/schema.js";
34
import {
@@ -6,17 +7,10 @@ import {
67
} from "@src/confluent/tools/base-tools.js";
78
import { ToolName } from "@src/confluent/tools/tool-name.js";
89
import { EnvVar } from "@src/env-schema.js";
9-
import env from "@src/env.js";
10-
import { wrapAsPathBasedClient } from "openapi-fetch";
10+
import { logger } from "@src/logger.js";
1111
import { z } from "zod";
1212

1313
const listSchemasArguments = z.object({
14-
baseUrl: z
15-
.string()
16-
.describe("The base URL of the Schema Registry REST API.")
17-
.url()
18-
.default(() => env.SCHEMA_REGISTRY_ENDPOINT ?? "")
19-
.optional(),
2014
latestOnly: z
2115
.boolean()
2216
.describe("If true, only return the latest version of each schema.")
@@ -26,44 +20,140 @@ const listSchemasArguments = z.object({
2620
.string()
2721
.describe("The prefix of the subject to list schemas for.")
2822
.optional(),
29-
deleted: z.string().describe("List deleted schemas.").optional(),
23+
deleted: z
24+
.boolean()
25+
.describe("List deleted schemas. (Only used if latestOnly is false)")
26+
.default(false)
27+
.optional(),
3028
});
3129

3230
export class ListSchemasHandler extends BaseToolHandler {
3331
async handle(
3432
clientManager: ClientManager,
3533
toolArguments: Record<string, unknown>,
3634
): Promise<CallToolResult> {
37-
const { baseUrl, latestOnly, subjectPrefix, deleted } =
35+
const { latestOnly, subjectPrefix, deleted } =
3836
listSchemasArguments.parse(toolArguments);
3937

40-
if (baseUrl !== undefined && baseUrl !== "") {
41-
clientManager.setConfluentCloudSchemaRegistryEndpoint(baseUrl);
42-
}
43-
44-
const pathBasedClient = wrapAsPathBasedClient(
45-
clientManager.getConfluentCloudSchemaRegistryRestClient(),
38+
logger.debug(
39+
{
40+
latestOnly,
41+
subjectPrefix,
42+
deleted,
43+
},
44+
"ListSchemasHandler.handle called with arguments",
4645
);
4746

48-
// First get all schemas
49-
const { data: response, error: error } = await pathBasedClient[
50-
"/schemas"
51-
].GET({
52-
query: {
53-
...(latestOnly ? { latestOnly: true } : {}),
54-
...(subjectPrefix ? { subjectPrefix: subjectPrefix } : {}),
55-
...(deleted ? { deleted: true } : {}),
56-
},
57-
});
47+
const registry: SchemaRegistryClient =
48+
clientManager.getSchemaRegistryClient();
49+
50+
try {
51+
let subjects: string[] = await registry.getAllSubjects();
52+
logger.debug(
53+
{ subjectsCount: subjects.length },
54+
"Fetched all subjects from registry",
55+
);
56+
if (subjectPrefix) {
57+
subjects = subjects.filter((s) => s.startsWith(subjectPrefix));
58+
logger.debug(
59+
{ filteredSubjectsCount: subjects.length, subjectPrefix },
60+
"Filtered subjects by prefix",
61+
);
62+
}
5863

59-
if (error) {
64+
const result: Record<string, unknown> = {};
65+
for (const subject of subjects) {
66+
if (latestOnly) {
67+
try {
68+
const latest = await registry.getLatestSchemaMetadata(subject);
69+
logger.debug({ subject, latest }, "Fetched latest schema metadata");
70+
result[subject] = {
71+
version: latest.version,
72+
id: latest.id,
73+
schemaType: latest.schemaType,
74+
schema: latest.schema,
75+
};
76+
} catch (err) {
77+
logger.warn(
78+
{
79+
subject,
80+
error: err instanceof Error ? err.message : String(err),
81+
},
82+
"Failed to fetch latest schema metadata",
83+
);
84+
result[subject] = {
85+
error: err instanceof Error ? err.message : String(err),
86+
};
87+
}
88+
} else {
89+
try {
90+
const versions: number[] = await registry.getAllVersions(subject);
91+
logger.debug({ subject, versions }, "Fetched all schema versions");
92+
result[subject] = [];
93+
const versionPromises = versions.map(async (version) => {
94+
try {
95+
const schema = await registry.getSchemaMetadata(
96+
subject,
97+
version,
98+
deleted,
99+
);
100+
logger.debug(
101+
{ subject, version, schema },
102+
"Fetched schema metadata for version",
103+
);
104+
(result[subject] as unknown[]).push({
105+
version: schema.version,
106+
id: schema.id,
107+
schemaType: schema.schemaType,
108+
schema: schema.schema,
109+
});
110+
} catch (err) {
111+
logger.warn(
112+
{
113+
subject,
114+
version,
115+
error: err instanceof Error ? err.message : String(err),
116+
},
117+
"Failed to fetch schema metadata for version",
118+
);
119+
(result[subject] as unknown[]).push({
120+
version,
121+
error: err instanceof Error ? err.message : String(err),
122+
});
123+
}
124+
});
125+
await Promise.all(versionPromises);
126+
} catch (err) {
127+
logger.warn(
128+
{
129+
subject,
130+
error: err instanceof Error ? err.message : String(err),
131+
},
132+
"Failed to fetch all versions for subject",
133+
);
134+
result[subject] = {
135+
error: err instanceof Error ? err.message : String(err),
136+
};
137+
}
138+
}
139+
}
140+
logger.info(
141+
{ subjects: Object.keys(result).length },
142+
"Returning schema listing result",
143+
);
144+
return this.createResponse(JSON.stringify(result));
145+
} catch (error) {
146+
logger.error(
147+
{
148+
error: error instanceof Error ? error.message : JSON.stringify(error),
149+
},
150+
"Failed to list schemas",
151+
);
60152
return this.createResponse(
61-
`Failed to list schemas: ${JSON.stringify(error)}`,
153+
`Failed to list schemas: ${error instanceof Error ? error.message : JSON.stringify(error)}`,
62154
true,
63155
);
64156
}
65-
66-
return this.createResponse(`${JSON.stringify(response)}`);
67157
}
68158

69159
getToolConfig(): ToolConfig {
@@ -75,6 +165,10 @@ export class ListSchemasHandler extends BaseToolHandler {
75165
}
76166

77167
getRequiredEnvVars(): EnvVar[] {
78-
return ["SCHEMA_REGISTRY_API_KEY", "SCHEMA_REGISTRY_API_SECRET"];
168+
return [
169+
"SCHEMA_REGISTRY_ENDPOINT",
170+
"SCHEMA_REGISTRY_API_KEY",
171+
"SCHEMA_REGISTRY_API_SECRET",
172+
];
79173
}
80174
}

0 commit comments

Comments
 (0)