Skip to content

Replace "protobuf-dynamic" lib with built-in protobuf methods #18401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions extensions-core/protobuf-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,50 @@
package org.apache.druid.data.input.protobuf;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;

/**
* Abstract base class for protobuf bytes decoders that use Google's protobuf library directly
* to parse binary descriptor sets and decode protobuf messages.
*
* <p>This class provides common functionality for loading protobuf descriptors from various sources
* and decoding binary protobuf messages into {@link DynamicMessage} objects. Concrete implementations
* define how to obtain the protobuf descriptor set (e.g., from files or inline base64 strings).
*/
public abstract class DescriptorBasedProtobufBytesDecoder implements ProtobufBytesDecoder
{
private Descriptors.Descriptor descriptor;

/**
* An optional message Protobuf message type in the descriptor. Both short name and fully qualified name are accepted.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from our docs.

* If not specified, the first message type found in the descriptor will be used.
*/
@Nullable
private final String protoMessageType;

public DescriptorBasedProtobufBytesDecoder(
final String protoMessageType
@Nullable final String protoMessageType
)
{
this.protoMessageType = protoMessageType;
}

@JsonProperty
@Nullable
public String getProtoMessageType()
{
return protoMessageType;
Expand All @@ -56,48 +75,197 @@ public Descriptors.Descriptor getDescriptor()
}

@VisibleForTesting
void initDescriptor()
void initializeDescriptor()
{
if (this.descriptor == null) {
final DynamicSchema dynamicSchema = generateDynamicSchema();
this.descriptor = generateDescriptor(dynamicSchema);
final var descriptorSet = loadFileDescriptorSet();
this.descriptor = buildMessageDescriptor(descriptorSet);
}
}

protected abstract DynamicSchema generateDynamicSchema();

/**
* Uses the generated descriptor to parse a message from the byte stream.
*/
@Override
public DynamicMessage parse(ByteBuffer bytes)
{
if (descriptor == null) {
throw new IAE("Descriptor needs to be initialized before parsing");
}

try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes));
return message;
return DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(bytes));
}
catch (Exception e) {
throw new ParseException(null, e, "Fail to decode protobuf message!");
throw new ParseException(
null,
e,
"Failed to decode protobuf message with the provided descriptor [%s]",
descriptor.getFullName()
);
}
}

private Descriptors.Descriptor generateDescriptor(DynamicSchema dynamicSchema)
/**
* Load the FileDescriptorSet containing protobuf schema definitions from the concrete implementation's source.
*/
protected abstract DescriptorProtos.FileDescriptorSet loadFileDescriptorSet();

/**
* Build a message descriptor from a FileDescriptorSet.
*/
protected Descriptors.Descriptor buildMessageDescriptor(
final DescriptorProtos.FileDescriptorSet descriptorSet
)
{
Set<String> messageTypes = dynamicSchema.getMessageTypes();
if (messageTypes.size() == 0) {
throw new ParseException(null, "No message types found in the descriptor.");
}
try {
// Build descriptors with dependency resolution
final var builtDescriptors = new HashMap<String, Descriptors.FileDescriptor>();
final var userDescriptors = new ArrayList<Descriptors.FileDescriptor>();

for (final var fileProto : descriptorSet.getFileList()) {
final var fileDescriptor = buildFileDescriptor(fileProto, descriptorSet, builtDescriptors);
userDescriptors.add(fileDescriptor);
}

// Find the target message type - only from user descriptors, not known deps
if (protoMessageType == null) {
// Return first message type found from user descriptors
for (final var fileDescriptor : userDescriptors) {
if (!fileDescriptor.getMessageTypes().isEmpty()) {
return fileDescriptor.getMessageTypes().get(0);
}
}
throw new ParseException(null, "No message types found in the descriptor set");
}

// Find specific message type by name (including nested types)
for (final var fileDescriptor : userDescriptors) {
final var desc = findMessageByName(fileDescriptor, protoMessageType);
if (desc != null) {
return desc;
}
}

// Collect available types for better error message
final var availableTypes = new TreeSet<String>();
for (final var fileDescriptor : userDescriptors) {
collectAvailableTypes(fileDescriptor, availableTypes);
}

String messageType = protoMessageType == null ? (String) messageTypes.toArray()[0] : protoMessageType;
Descriptors.Descriptor desc = dynamicSchema.getMessageDescriptor(messageType);
if (desc == null) {
throw new ParseException(
null,
StringUtils.format(
"Protobuf message type %s not found in the specified descriptor. Available messages types are %s",
protoMessageType,
messageTypes
)
"Protobuf message type [%s] not found in the descriptor set. Available types: %s",
protoMessageType,
availableTypes
);
}
return desc;
catch (Descriptors.DescriptorValidationException e) {
throw new ParseException(null, e, "Invalid protobuf descriptor");
}
}

@Nullable
private Descriptors.Descriptor findMessageByName(
final Descriptors.FileDescriptor fileDescriptor,
final String messageTypeName
)
{
// Try simple name match first
for (final var messageType : fileDescriptor.getMessageTypes()) {
if (messageType.getName().equals(messageTypeName) || messageType.getFullName().equals(messageTypeName)) {
return messageType;
}

// Try nested types (e.g., "ParentMessage.NestedMessage")
final var nested = findNestedMessage(messageType, messageTypeName);
if (nested != null) {
return nested;
}
}
return null;
}

@Nullable
private Descriptors.Descriptor findNestedMessage(final Descriptors.Descriptor parent, final String messageTypeName)
{
for (final var nested : parent.getNestedTypes()) {
if (nested.getName().equals(messageTypeName) ||
nested.getFullName().equals(messageTypeName) ||
(parent.getName() + "." + nested.getName()).equals(messageTypeName)) {
return nested;
}

// Recursively search deeper nesting
final var deeperNested = findNestedMessage(nested, messageTypeName);
if (deeperNested != null) {
return deeperNested;
}
}

return null;
}

private Descriptors.FileDescriptor buildFileDescriptor(
final DescriptorProtos.FileDescriptorProto fileProto,
final DescriptorProtos.FileDescriptorSet descriptorSet,
final Map<String, Descriptors.FileDescriptor> builtDescriptors
) throws Descriptors.DescriptorValidationException
{
// Return if already built
if (builtDescriptors.containsKey(fileProto.getName())) {
return builtDescriptors.get(fileProto.getName());
}

// Collect dependencies from the descriptor set
final var dependencies = new ArrayList<Descriptors.FileDescriptor>();
for (final var dependencyName : fileProto.getDependencyList()) {
// Look for the dependency in the descriptor set
for (final var depProto : descriptorSet.getFileList()) {
if (depProto.getName().equals(dependencyName)) {
final var depDescriptor = buildFileDescriptor(depProto, descriptorSet, builtDescriptors);
dependencies.add(depDescriptor);
break;
}
}
}

final var fileDescriptor = Descriptors.FileDescriptor.buildFrom(
fileProto,
dependencies.toArray(new Descriptors.FileDescriptor[0])
);

builtDescriptors.put(fileProto.getName(), fileDescriptor);

return fileDescriptor;
}

private void collectAvailableTypes(
final Descriptors.FileDescriptor fileDescriptor,
final TreeSet<String> availableTypes
)
{
for (final var messageType : fileDescriptor.getMessageTypes()) {
// Add short name and full name
availableTypes.add(messageType.getName());
availableTypes.add(messageType.getFullName());

// Add nested types
collectNestedTypes(messageType, availableTypes);
}
}

private void collectNestedTypes(final Descriptors.Descriptor parentDescriptor, final TreeSet<String> availableTypes)
{
for (final var nestedType : parentDescriptor.getNestedTypes()) {
// Add different naming variations for nested types
availableTypes.add(nestedType.getName());
availableTypes.add(nestedType.getFullName());
availableTypes.add(parentDescriptor.getName() + "." + nestedType.getName());

// Recursively collect deeper nested types
collectNestedTypes(nestedType, availableTypes);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DescriptorProtos;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Objects;

public class FileBasedProtobufBytesDecoder extends DescriptorBasedProtobufBytesDecoder
Expand All @@ -43,9 +40,11 @@ public FileBasedProtobufBytesDecoder(
)
{
super(protoMessageType);

Preconditions.checkNotNull(descriptorFilePath);
this.descriptorFilePath = descriptorFilePath;
initDescriptor();

initializeDescriptor();
}

@JsonProperty("descriptor")
Expand All @@ -55,39 +54,22 @@ public String getDescriptorFilePath()
}

@Override
protected DynamicSchema generateDynamicSchema()
protected DescriptorProtos.FileDescriptorSet loadFileDescriptorSet()
{
InputStream fin;

fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath);
if (fin == null) {
URL url;
try {
url = new URL(descriptorFilePath);
}
catch (MalformedURLException e) {
throw new ParseException(
descriptorFilePath,
e,
"Descriptor not found in class path or malformed URL:" + descriptorFilePath
);
try (InputStream fin = this.getClass().getClassLoader().getResourceAsStream(descriptorFilePath)) {
if (fin == null) {
throw new ParseException(descriptorFilePath, "Descriptor not found in class path [%s]", descriptorFilePath);
}
try {
fin = url.openConnection().getInputStream();
}
catch (IOException e) {
throw new ParseException(url.toString(), e, "Cannot read descriptor file: " + url);

final var descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(fin);
if (descriptorSet.getFileCount() == 0) {
throw new ParseException(null, "No file descriptors found in the descriptor set");
}
}

try {
return DynamicSchema.parseFrom(fin);
}
catch (Descriptors.DescriptorValidationException e) {
throw new ParseException(null, e, "Invalid descriptor file: " + descriptorFilePath);
return descriptorSet;
}
catch (IOException e) {
throw new ParseException(null, e, "Cannot read descriptor file: " + descriptorFilePath);
throw new ParseException(descriptorFilePath, e, "Failed to initialize descriptor");
}
}

Expand Down
Loading
Loading