Skip to content

Commit 2eb5423

Browse files
committed
[fix][admin] Fix KeyValue schema compatibility check caused OOM (#21645)
1 parent 78a3964 commit 2eb5423

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,13 @@ public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> testCompati
146146
.thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
147147
.thenCompose(strategy -> {
148148
String schemaId = getSchemaId();
149+
final SchemaType schemaType = SchemaType.valueOf(payload.getType());
150+
byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
151+
if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) {
152+
data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data);
153+
}
149154
return pulsar().getSchemaRegistryService().isCompatible(schemaId,
150-
SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
155+
SchemaData.builder().data(data)
151156
.isDeleted(false)
152157
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
153158
.user(defaultIfEmpty(clientAppId(), ""))

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service.schema;
2020

21+
import static org.testng.Assert.assertThrows;
2122
import static org.testng.Assert.fail;
2223
import static org.testng.AssertJUnit.assertEquals;
2324
import static org.testng.AssertJUnit.assertFalse;
@@ -40,16 +41,23 @@
4041
import java.util.UUID;
4142
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.ExecutionException;
44+
import org.apache.pulsar.broker.BrokerTestUtil;
4345
import org.apache.pulsar.broker.PulsarServerException;
4446
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4547
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
4648
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
4749
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
50+
import org.apache.pulsar.client.admin.PulsarAdminException;
51+
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
4852
import org.apache.pulsar.common.naming.TopicName;
4953
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
54+
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
5055
import org.apache.pulsar.common.protocol.schema.SchemaData;
5156
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
57+
import org.apache.pulsar.common.schema.KeyValueEncodingType;
5258
import org.apache.pulsar.common.schema.LongSchemaVersion;
59+
import org.apache.pulsar.common.schema.SchemaInfo;
60+
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
5361
import org.apache.pulsar.common.schema.SchemaType;
5462
import org.testng.Assert;
5563
import org.testng.annotations.AfterMethod;
@@ -385,4 +393,32 @@ private static SchemaData getSchemaData(String schemaJson) {
385393
private SchemaVersion version(long version) {
386394
return new LongSchemaVersion(version);
387395
}
396+
397+
@Test
398+
public void testKeyValueSchema() throws Exception {
399+
final String topicName = "persistent://public/default/testKeyValueSchema";
400+
admin.topics().createNonPartitionedTopic(BrokerTestUtil.newUniqueName(topicName));
401+
402+
final SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
403+
"keyValue",
404+
SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0])
405+
.build(),
406+
SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
407+
.build(), KeyValueEncodingType.SEPARATED);
408+
assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.schemas().testCompatibility(topicName, schemaInfo));
409+
admin.schemas().createSchema(topicName, schemaInfo);
410+
411+
final IsCompatibilityResponse isCompatibilityResponse = admin.schemas().testCompatibility(topicName, schemaInfo);
412+
Assert.assertTrue(isCompatibilityResponse.isCompatibility());
413+
414+
final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName);
415+
Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0);
416+
417+
final Long version1 = admin.schemas().getVersionBySchema(topicName, schemaInfo);
418+
Assert.assertEquals(version1, 0);
419+
420+
final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo());
421+
Assert.assertEquals(version2, 0);
422+
423+
}
388424
}

0 commit comments

Comments
 (0)