Skip to content

Commit 638f375

Browse files
Restores PackableMethod serialization mode (#14323)
* Restore the serialization mode of PackableMethod * Add custom function when Setting Method Descriptor * Format code --------- Co-authored-by: earthchen <[email protected]>
1 parent c174d5a commit 638f375

File tree

11 files changed

+89
-323
lines changed

11 files changed

+89
-323
lines changed

dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ public void cancel(Throwable throwable) {
7979
closed();
8080
}
8181
}
82-
this.cancellationContext.cancel(throwable);
82+
if (cancellationContext != null) {
83+
cancellationContext.cancel(throwable);
84+
}
8385
long errorCode = 0;
8486
if (throwable instanceof ErrorCodeHolder) {
8587
errorCode = ((ErrorCodeHolder) throwable).getErrorCode();

dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
*/
1717
package org.apache.dubbo.remoting.http12.message;
1818

19-
import org.apache.dubbo.common.io.StreamUtils;
2019
import org.apache.dubbo.remoting.http12.CompositeInputStream;
2120
import org.apache.dubbo.remoting.http12.exception.DecodeException;
2221

2322
import java.io.ByteArrayInputStream;
24-
import java.io.ByteArrayOutputStream;
2523
import java.io.IOException;
2624
import java.io.InputStream;
2725

@@ -47,8 +45,6 @@ public class LengthFieldStreamingDecoder implements StreamingDecoder {
4745

4846
private int requiredLength;
4947

50-
private InputStream dataHeader = StreamUtils.EMPTY;
51-
5248
public LengthFieldStreamingDecoder() {
5349
this(4);
5450
}
@@ -147,16 +143,12 @@ private void deliver() {
147143
}
148144

149145
private void processHeader() throws IOException {
150-
ByteArrayOutputStream bos = new ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength);
151146
byte[] offsetData = new byte[lengthFieldOffset];
152147
int ignore = accumulate.read(offsetData);
153-
bos.write(offsetData);
154148
processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset);
155149
byte[] lengthBytes = new byte[lengthFieldLength];
156150
ignore = accumulate.read(lengthBytes);
157-
bos.write(lengthBytes);
158151
requiredLength = bytesToInt(lengthBytes);
159-
this.dataHeader = new ByteArrayInputStream(bos.toByteArray());
160152

161153
// Continue reading the frame body.
162154
state = DecodeState.PAYLOAD;
@@ -184,8 +176,8 @@ private void processBody() throws IOException {
184176
requiredLength = lengthFieldOffset + lengthFieldLength;
185177
}
186178

187-
protected void invokeListener(InputStream inputStream) {
188-
this.listener.onFragmentMessage(dataHeader, inputStream);
179+
public void invokeListener(InputStream inputStream) {
180+
this.listener.onFragmentMessage(inputStream);
189181
}
190182

191183
protected byte[] readRawMessage(InputStream inputStream, int length) throws IOException {

dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,6 @@ interface FragmentListener {
3939
*/
4040
void onFragmentMessage(InputStream rawMessage);
4141

42-
/**
43-
* @param rawMessage raw message
44-
*/
45-
default void onFragmentMessage(InputStream dataHeader, InputStream rawMessage) {
46-
onFragmentMessage(rawMessage);
47-
}
48-
4942
default void onClose() {}
5043
}
5144

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.dubbo.common.URL;
2020
import org.apache.dubbo.common.constants.CommonConstants;
21+
import org.apache.dubbo.common.io.StreamUtils;
2122
import org.apache.dubbo.common.utils.CollectionUtils;
2223
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
2324
import org.apache.dubbo.rpc.Invoker;
@@ -28,6 +29,8 @@
2829
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
2930
import org.apache.dubbo.rpc.stub.StubSuppliers;
3031

32+
import java.io.IOException;
33+
import java.io.InputStream;
3134
import java.util.Arrays;
3235
import java.util.List;
3336

@@ -124,9 +127,10 @@ public static MethodDescriptor findReflectionMethodDescriptor(
124127
}
125128

126129
public static MethodDescriptor findTripleMethodDescriptor(
127-
ServiceDescriptor serviceDescriptor, String methodName, byte[] data) {
130+
ServiceDescriptor serviceDescriptor, String methodName, InputStream rawMessage) throws IOException {
128131
MethodDescriptor methodDescriptor = findReflectionMethodDescriptor(serviceDescriptor, methodName);
129132
if (methodDescriptor == null) {
133+
byte[] data = StreamUtils.readBytes(rawMessage);
130134
List<MethodDescriptor> methodDescriptors = serviceDescriptor.getMethods(methodName);
131135
TripleRequestWrapper request = TripleRequestWrapper.parseFrom(data);
132136
String[] paramTypes = request.getArgTypes().toArray(new String[0]);
@@ -141,6 +145,7 @@ public static MethodDescriptor findTripleMethodDescriptor(
141145
if (methodDescriptor == null) {
142146
throw new UnimplementedException("method:" + methodName);
143147
}
148+
rawMessage.reset();
144149
}
145150
return methodDescriptor;
146151
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ protected RpcInvocation buildRpcInvocation(RpcInvocationBuildContext context) {
204204
methodDescriptor = DescriptorUtils.findMethodDescriptor(
205205
context.getServiceDescriptor(), context.getMethodName(), context.isHasStub());
206206
context.setMethodDescriptor(methodDescriptor);
207+
onSettingMethodDescriptor(methodDescriptor);
207208
}
208209
MethodMetadata methodMetadata = context.getMethodMetadata();
209210
if (methodMetadata == null) {
@@ -280,4 +281,6 @@ protected final HttpMessageListener getHttpMessageListener() {
280281
protected void setHttpMessageListener(HttpMessageListener httpMessageListener) {
281282
this.httpMessageListener = httpMessageListener;
282283
}
284+
285+
protected void onSettingMethodDescriptor(MethodDescriptor methodDescriptor) {}
283286
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java

Lines changed: 56 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,62 @@
1616
*/
1717
package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
1818

19+
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.common.config.ConfigurationUtils;
21+
import org.apache.dubbo.common.io.StreamUtils;
22+
import org.apache.dubbo.common.utils.ArrayUtils;
1923
import org.apache.dubbo.remoting.http12.exception.DecodeException;
2024
import org.apache.dubbo.remoting.http12.exception.EncodeException;
25+
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
2126
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
2227
import org.apache.dubbo.remoting.http12.message.MediaType;
28+
import org.apache.dubbo.rpc.model.FrameworkModel;
29+
import org.apache.dubbo.rpc.model.MethodDescriptor;
30+
import org.apache.dubbo.rpc.model.PackableMethod;
31+
import org.apache.dubbo.rpc.model.PackableMethodFactory;
2332

2433
import java.io.IOException;
2534
import java.io.InputStream;
2635
import java.io.OutputStream;
2736
import java.nio.charset.Charset;
37+
import java.util.Map;
38+
import java.util.concurrent.ConcurrentHashMap;
2839

29-
import com.google.protobuf.Message;
30-
31-
import static org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME;
40+
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
41+
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
3242

3343
public class GrpcCompositeCodec implements HttpMessageCodec {
3444

35-
private final ProtobufHttpMessageCodec protobufHttpMessageCodec;
45+
private static final String PACKABLE_METHOD_CACHE = "PACKABLE_METHOD_CACHE";
3646

37-
private final WrapperHttpMessageCodec wrapperHttpMessageCodec;
47+
private final URL url;
3848

39-
public GrpcCompositeCodec(
40-
ProtobufHttpMessageCodec protobufHttpMessageCodec, WrapperHttpMessageCodec wrapperHttpMessageCodec) {
41-
this.protobufHttpMessageCodec = protobufHttpMessageCodec;
42-
this.wrapperHttpMessageCodec = wrapperHttpMessageCodec;
43-
}
49+
private final FrameworkModel frameworkModel;
50+
51+
private final String mediaType;
4452

45-
public void setEncodeTypes(Class<?>[] encodeTypes) {
46-
this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes);
53+
private PackableMethod packableMethod;
54+
55+
public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String mediaType) {
56+
this.url = url;
57+
this.frameworkModel = frameworkModel;
58+
this.mediaType = mediaType;
4759
}
4860

49-
public void setDecodeTypes(Class<?>[] decodeTypes) {
50-
this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes);
61+
public void loadPackableMethod(MethodDescriptor methodDescriptor) {
62+
if (methodDescriptor instanceof PackableMethod) {
63+
packableMethod = (PackableMethod) methodDescriptor;
64+
return;
65+
}
66+
Map<MethodDescriptor, PackableMethod> cacheMap = (Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
67+
.getServiceMetadata()
68+
.getAttributeMap()
69+
.computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new ConcurrentHashMap<>());
70+
packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> frameworkModel
71+
.getExtensionLoader(PackableMethodFactory.class)
72+
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
73+
.getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
74+
.create(methodDescriptor, url, mediaType));
5175
}
5276

5377
@Override
@@ -58,34 +82,38 @@ public void encode(OutputStream outputStream, Object data, Charset charset) thro
5882
try {
5983
int compressed = 0;
6084
outputStream.write(compressed);
61-
if (isProtobuf(data)) {
62-
ProtobufWriter.write(protobufHttpMessageCodec, outputStream, data);
63-
return;
64-
}
65-
// wrapper
66-
wrapperHttpMessageCodec.encode(outputStream, data);
67-
} catch (IOException e) {
85+
byte[] bytes = packableMethod.packResponse(data);
86+
writeLength(outputStream, bytes.length);
87+
outputStream.write(bytes);
88+
} catch (HttpStatusException e) {
89+
throw e;
90+
} catch (Exception e) {
6891
throw new EncodeException(e);
6992
}
7093
}
7194

7295
@Override
7396
public Object decode(InputStream inputStream, Class<?> targetType, Charset charset) throws DecodeException {
74-
if (isProtoClass(targetType)) {
75-
return protobufHttpMessageCodec.decode(inputStream, targetType, charset);
97+
try {
98+
byte[] data = StreamUtils.readBytes(inputStream);
99+
return packableMethod.parseRequest(data);
100+
} catch (HttpStatusException e) {
101+
throw e;
102+
} catch (Exception e) {
103+
throw new DecodeException(e);
76104
}
77-
return wrapperHttpMessageCodec.decode(inputStream, targetType, charset);
78105
}
79106

80107
@Override
81108
public Object[] decode(InputStream inputStream, Class<?>[] targetTypes, Charset charset) throws DecodeException {
82-
if (targetTypes.length > 1) {
83-
return wrapperHttpMessageCodec.decode(inputStream, targetTypes, charset);
109+
Object message = decode(inputStream, ArrayUtils.isEmpty(targetTypes) ? null : targetTypes[0], charset);
110+
if (message instanceof Object[]) {
111+
return (Object[]) message;
84112
}
85-
return HttpMessageCodec.super.decode(inputStream, targetTypes, charset);
113+
return new Object[] {message};
86114
}
87115

88-
private static void writeLength(OutputStream outputStream, int length) {
116+
private void writeLength(OutputStream outputStream, int length) {
89117
try {
90118
outputStream.write(((length >> 24) & 0xFF));
91119
outputStream.write(((length >> 16) & 0xFF));
@@ -100,39 +128,4 @@ private static void writeLength(OutputStream outputStream, int length) {
100128
public MediaType mediaType() {
101129
return MediaType.APPLICATION_GRPC;
102130
}
103-
104-
private static boolean isProtobuf(Object data) {
105-
if (data == null) {
106-
return false;
107-
}
108-
return isProtoClass(data.getClass());
109-
}
110-
111-
private static boolean isProtoClass(Class<?> clazz) {
112-
while (clazz != Object.class && clazz != null) {
113-
Class<?>[] interfaces = clazz.getInterfaces();
114-
if (interfaces.length > 0) {
115-
for (Class<?> clazzInterface : interfaces) {
116-
if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
117-
return true;
118-
}
119-
}
120-
}
121-
clazz = clazz.getSuperclass();
122-
}
123-
return false;
124-
}
125-
126-
/**
127-
* lazy init protobuf class
128-
*/
129-
private static class ProtobufWriter {
130-
131-
private static void write(HttpMessageCodec codec, OutputStream outputStream, Object data) {
132-
int serializedSize = ((Message) data).getSerializedSize();
133-
// write length
134-
writeLength(outputStream, serializedSize);
135-
codec.encode(outputStream, data);
136-
}
137-
}
138131
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodecFactory.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,14 @@
2222
import org.apache.dubbo.remoting.http12.message.HttpMessageDecoderFactory;
2323
import org.apache.dubbo.remoting.http12.message.HttpMessageEncoderFactory;
2424
import org.apache.dubbo.remoting.http12.message.MediaType;
25-
import org.apache.dubbo.remoting.utils.UrlUtils;
2625
import org.apache.dubbo.rpc.model.FrameworkModel;
2726

2827
@Activate
2928
public class GrpcCompositeCodecFactory implements HttpMessageEncoderFactory, HttpMessageDecoderFactory {
3029

3130
@Override
3231
public HttpMessageCodec createCodec(URL url, FrameworkModel frameworkModel, String mediaType) {
33-
String serializeName = UrlUtils.serializationOrDefault(url);
34-
WrapperHttpMessageCodec wrapperHttpMessageCodec = new WrapperHttpMessageCodec(url, frameworkModel);
35-
wrapperHttpMessageCodec.setSerializeType(serializeName);
36-
ProtobufHttpMessageCodec protobufHttpMessageCodec = new ProtobufHttpMessageCodec();
37-
return new GrpcCompositeCodec(protobufHttpMessageCodec, wrapperHttpMessageCodec);
32+
return new GrpcCompositeCodec(url, frameworkModel, mediaType);
3833
}
3934

4035
@Override

0 commit comments

Comments
 (0)