Skip to content

Support for Spring cloud SchemaRegistryClient #264

@snigdhasjg

Description

@snigdhasjg

Feature request

Spring uses org.springframework.cloud.stream.schema.registry.client.SchemaRegistryClient for its Avro conversion

We can use GlueClient to make calls to get schema.
Also AWSSchemaRegistryClient wraps requests related to the schema management, which is easy to use without needing to worry about underlying errors and logic.

On the other hand spring's interface need few more things which is challenging to get either via GlueClient or AWSSchemaRegistryClient. With combination of both and making extra calls to ensure, we get what we need.

What we have achieved so far:

@Component
public class GlueSchemaRegistryClient implements SchemaRegistryClient {
    private static final Logger LOG = LoggerFactory.getLogger(GlueSchemaRegistryClient.class);

    private final GlueClient glueClient;
    private final AWSSchemaRegistryClient awsSchemaRegistryClient;
    private final String registryName;

    public GlueSchemaRegistryClient(GlueClient glueClient, AWSSchemaRegistryClient awsSchemaRegistryClient, GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration) {
        this.glueClient = glueClient;
        this.awsSchemaRegistryClient = awsSchemaRegistryClient;
        this.registryName = glueSchemaRegistryConfiguration.getRegistryName();
    }

    @Override
    public SchemaRegistrationResponse register(String subject, String format, String schema) {
        UUID schemaId = awsSchemaRegistryClient.createSchema(subject, format.toUpperCase(), schema, Collections.emptyMap());
        GetSchemaVersionResponse schemaVersionResponse = awsSchemaRegistryClient.getSchemaVersionResponse(schemaId.toString());

        SchemaRegistrationResponse response = new SchemaRegistrationResponse();
        response.setId(7); // TODO: Setting some random id, need to check if needed or not
        response.setSchemaReference(new SchemaReference(subject, schemaVersionResponse.versionNumber().intValue(), format));
        return response;
    }

    @Override
    public String fetch(SchemaReference schemaReference) {
        GetSchemaVersionResponse getSchemaVersionResponse = glueClient.getSchemaVersion(GetSchemaVersionRequest.builder()
                .schemaId(SchemaId.builder()
                        .registryName(registryName)
                        .schemaName(schemaReference.getSubject())
                        .build())
                .schemaVersionNumber(SchemaVersionNumber.builder()
                        .versionNumber((long) schemaReference.getVersion())
                        .build())
                .build());

        return getSchemaVersionResponse.schemaDefinition();
    }

    @Override
    public String fetch(int id) {
        LOG.error("Shouldn't get called for non-cached client");
        throw new UnsupportedOperationException("Not supported by glue, also not needed as non-cached class");
    }

}

But this can be done efficiently if AWSSchemaRegistryClient exposes 2 APIs

1. GetSchemaVersionResponse createSchema(String schemaName, DataFormat format, String schemaDefinition);
2. GetSchemaVersionResponse getSchemaVersion(String schemaName, Long versionNumber);

cc: @harshadnawathe @DivyaUppu

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions