Skip to content

Commit e99eade

Browse files
leixmleixianming
andauthored
[#133] feat(netty): Add Netty Utils (#727)
### What changes were proposed in this pull request? Add netty utils. ### Why are the changes needed? Add netty utils for netty replace grpc. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Co-authored-by: leixianming <[email protected]>
1 parent 1021031 commit e99eade

File tree

14 files changed

+789
-1
lines changed

14 files changed

+789
-1
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import java.lang.reflect.Field;
21+
import java.util.concurrent.ThreadFactory;
22+
23+
import io.netty.buffer.PooledByteBufAllocator;
24+
import io.netty.channel.Channel;
25+
import io.netty.channel.EventLoopGroup;
26+
import io.netty.channel.ServerChannel;
27+
import io.netty.channel.epoll.EpollEventLoopGroup;
28+
import io.netty.channel.epoll.EpollServerSocketChannel;
29+
import io.netty.channel.epoll.EpollSocketChannel;
30+
import io.netty.channel.nio.NioEventLoopGroup;
31+
import io.netty.channel.socket.nio.NioServerSocketChannel;
32+
import io.netty.channel.socket.nio.NioSocketChannel;
33+
import io.netty.util.concurrent.DefaultThreadFactory;
34+
import io.netty.util.internal.PlatformDependent;
35+
import io.netty.util.internal.SystemPropertyUtil;
36+
37+
/** copy from spark, In order to override the createPooledByteBufAllocator method,
38+
* the property DEFAULT_TINY_CACHE_SIZE does not exist in netty>4.1.47. */
39+
public class NettyUtils {
40+
41+
private static final int DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
42+
43+
/** Creates a new ThreadFactory which prefixes each thread with the given name. */
44+
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
45+
return new DefaultThreadFactory(threadPoolPrefix, true);
46+
}
47+
48+
/** Creates a Netty EventLoopGroup based on the IOMode. */
49+
public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
50+
ThreadFactory threadFactory = createThreadFactory(threadPrefix);
51+
52+
switch (mode) {
53+
case NIO:
54+
return new NioEventLoopGroup(numThreads, threadFactory);
55+
case EPOLL:
56+
return new EpollEventLoopGroup(numThreads, threadFactory);
57+
default:
58+
throw new IllegalArgumentException("Unknown io mode: " + mode);
59+
}
60+
}
61+
62+
/** Returns the correct (client) SocketChannel class based on IOMode. */
63+
public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
64+
switch (mode) {
65+
case NIO:
66+
return NioSocketChannel.class;
67+
case EPOLL:
68+
return EpollSocketChannel.class;
69+
default:
70+
throw new IllegalArgumentException("Unknown io mode: " + mode);
71+
}
72+
}
73+
74+
/** Returns the correct ServerSocketChannel class based on IOMode. */
75+
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
76+
switch (mode) {
77+
case NIO:
78+
return NioServerSocketChannel.class;
79+
case EPOLL:
80+
return EpollServerSocketChannel.class;
81+
default:
82+
throw new IllegalArgumentException("Unknown io mode: " + mode);
83+
}
84+
}
85+
86+
/**
87+
* Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame.
88+
* This is used before all decoders.
89+
*/
90+
public static TransportFrameDecoder createFrameDecoder() {
91+
return new TransportFrameDecoder();
92+
}
93+
94+
/** Returns the remote address on the channel or "&lt;unknown remote&gt;" if none exists. */
95+
public static String getRemoteAddress(Channel channel) {
96+
if (channel != null && channel.remoteAddress() != null) {
97+
return channel.remoteAddress().toString();
98+
}
99+
return "<unknown remote>";
100+
}
101+
102+
/**
103+
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
104+
* are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
105+
* but released by the executor thread rather than the event loop thread. Those thread-local
106+
* caches actually delay the recycling of buffers, leading to larger memory usage.
107+
*/
108+
public static PooledByteBufAllocator createPooledByteBufAllocator(
109+
boolean allowDirectBufs,
110+
boolean allowCache,
111+
int numCores) {
112+
if (numCores == 0) {
113+
numCores = Runtime.getRuntime().availableProcessors();
114+
}
115+
return new PooledByteBufAllocator(
116+
allowDirectBufs && PlatformDependent.directBufferPreferred(),
117+
Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
118+
Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
119+
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
120+
getPrivateStaticField("DEFAULT_MAX_ORDER"),
121+
allowCache ? DEFAULT_TINY_CACHE_SIZE : 0,
122+
allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
123+
allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
124+
);
125+
}
126+
127+
/** Used to get defaults from Netty's private static fields. */
128+
private static int getPrivateStaticField(String name) {
129+
try {
130+
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
131+
f.setAccessible(true);
132+
return f.getInt(null);
133+
} catch (Exception e) {
134+
throw new RuntimeException(e);
135+
}
136+
}
137+
}

common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@
8080
<groupId>org.apache.commons</groupId>
8181
<artifactId>commons-lang3</artifactId>
8282
</dependency>
83+
<dependency>
84+
<groupId>io.netty</groupId>
85+
<artifactId>netty-all</artifactId>
86+
<version>${netty.version}</version>
87+
</dependency>
8388
<dependency>
8489
<groupId>org.apache.hadoop</groupId>
8590
<artifactId>hadoop-common</artifactId>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.netty;
19+
20+
public enum IOMode {
21+
NIO,
22+
EPOLL
23+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.netty.protocol;
19+
20+
import io.netty.buffer.ByteBuf;
21+
22+
public interface Encodable {
23+
24+
int encodedLength();
25+
26+
void encode(ByteBuf buf);
27+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.netty.protocol;
19+
20+
import io.netty.buffer.ByteBuf;
21+
22+
public abstract class Message implements Encodable {
23+
24+
public abstract Type type();
25+
26+
public enum Type implements Encodable {
27+
UNKNOWN_TYPE(-1),
28+
RPC_RESPONSE(0);
29+
30+
private final byte id;
31+
32+
Type(int id) {
33+
assert id < 128 : "Cannot have more than 128 message types";
34+
this.id = (byte) id;
35+
}
36+
37+
public byte id() {
38+
return id;
39+
}
40+
41+
@Override
42+
public int encodedLength() {
43+
return 1;
44+
}
45+
46+
@Override
47+
public void encode(ByteBuf buf) {
48+
buf.writeByte(id);
49+
}
50+
51+
public static Type decode(ByteBuf buf) {
52+
byte id = buf.readByte();
53+
switch (id) {
54+
case 0:
55+
return RPC_RESPONSE;
56+
case -1:
57+
throw new IllegalArgumentException("User type messages cannot be decoded.");
58+
default:
59+
throw new IllegalArgumentException("Unknown message type: " + id);
60+
}
61+
}
62+
}
63+
64+
public static Message decode(Type msgType, ByteBuf in) {
65+
switch (msgType) {
66+
case RPC_RESPONSE:
67+
return RpcResponse.decode(in);
68+
default:
69+
throw new IllegalArgumentException("Unexpected message type: " + msgType);
70+
}
71+
}
72+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.netty.protocol;
19+
20+
import io.netty.buffer.ByteBuf;
21+
22+
import org.apache.uniffle.common.rpc.StatusCode;
23+
import org.apache.uniffle.common.util.ByteBufUtils;
24+
25+
public class RpcResponse extends Message {
26+
private long requestId;
27+
private StatusCode statusCode;
28+
private String retMessage;
29+
30+
public RpcResponse(long requestId, StatusCode statusCode) {
31+
this(requestId, statusCode, null);
32+
}
33+
34+
public RpcResponse(long requestId, StatusCode statusCode, String retMessage) {
35+
this.requestId = requestId;
36+
this.statusCode = statusCode;
37+
this.retMessage = retMessage;
38+
}
39+
40+
public StatusCode getStatusCode() {
41+
return statusCode;
42+
}
43+
44+
public String getRetMessage() {
45+
return retMessage;
46+
}
47+
48+
@Override
49+
public String toString() {
50+
return "RpcResponse{"
51+
+ "requestId=" + requestId
52+
+ ", statusCode=" + statusCode
53+
+ ", retMessage='" + retMessage
54+
+ '\'' + '}';
55+
}
56+
57+
@Override
58+
public int encodedLength() {
59+
return Long.BYTES + Integer.BYTES + ByteBufUtils.encodedLength(retMessage);
60+
}
61+
62+
@Override
63+
public void encode(ByteBuf buf) {
64+
buf.writeLong(requestId);
65+
buf.writeInt(statusCode.ordinal());
66+
ByteBufUtils.writeLengthAndString(buf, retMessage);
67+
}
68+
69+
70+
public static RpcResponse decode(ByteBuf buf) {
71+
long requestId = buf.readLong();
72+
StatusCode statusCode = StatusCode.fromCode(buf.readInt());
73+
String retMessage = ByteBufUtils.readLengthAndString(buf);
74+
return new RpcResponse(requestId, statusCode, retMessage);
75+
}
76+
77+
public long getRequestId() {
78+
return requestId;
79+
}
80+
81+
@Override
82+
public Type type() {
83+
return Type.RPC_RESPONSE;
84+
}
85+
}

0 commit comments

Comments
 (0)