Skip to content

Integration with MSK Connect & Debezium question #118

@snigdhasjg

Description

@snigdhasjg

We trying to setup MSK connect with glue schema registry.

Steps that we followed -

  1. Created MSK connect custom plugin with Debezium mysql & schema registry kafkaconnect converter dependency
  2. Created a VPC endpoint for glue. And in the inbound rule we have opened HTTPS 443 port for full VPC cidr block. Verified that, it is accessible via cli with --endpoint https://glue.us-east-1.amazonaws.com at the end
  3. Gave glue:* permission to AWS Kafka connect service role
  4. Configure a source connector. Used integration test config file as our reference worker.properties
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=<mysql-database-url>
database.port=3306
database.user=<username>
database.password=<password>
database.server.id=123456
database.server.name=<unique-server-name>
database.include.list=<single-database-name>
table.include.list=<single-table-name>
database.history.kafka.bootstrap.servers=<bootstrap-urls>
database.history.kafka.topic=<history-topic-name>
include.schema.changes=true
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

# Added Glue config
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.schemas.enable=false # tried both true & false
value.converter.schemas.enable=true
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD
key.converter.schemaName=KeySchemaAvro
value.converter.schemaName=ValueSchemaAvro
key.converter.registry.name=<registry-that-we-created>
value.converter.registry.name=<registry-that-we-created>
key.converter.endpoint=https://glue.us-east-1.amazonaws.com
value.converter.endpoint=https://glue.us-east-1.amazonaws.com

Now the error that we getting

[Worker-0d863e8b10266fd6f] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while fetching or registering schema definition = 
{
    "type": "record",
    "name": "SchemaChangeKey",
    "namespace": "io.debezium.connector.mysql",
    "fields": [
        {
            "name": "databaseName",
            "type": "string"
        }
    ],
    "connect.name": "io.debezium.connector.mysql.SchemaChangeKey"
}
, schema name = KeySchemaAvro

And at the end -

java.net.SocketTimeoutException: connect timed out

We can see database.history.kafka.topic being created as it's using JsonConverter.

Refer this for full error

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions