Skip to content

Commit 0e7ba30

Browse files
authored
feat: enhance buffer management by separating TCP and UDP buffer pools in Common struct
1 parent 40aaf7f commit 0e7ba30

File tree

1 file changed

+66
-35
lines changed

1 file changed

+66
-35
lines changed

internal/common.go

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ type Common struct {
4949
rateLimiter *conn.RateLimiter // 全局限速器
5050
readTimeout time.Duration // 读取超时
5151
bufReader *bufio.Reader // 缓冲读取器
52-
bufferPool *sync.Pool // 缓冲区池
52+
tcpBufferPool *sync.Pool // TCP缓冲区池
53+
udpBufferPool *sync.Pool // UDP缓冲区池
5354
signalChan chan string // 信号通道
5455
checkPoint time.Time // 检查点时间
5556
slotLimit int32 // 槽位限制
@@ -91,19 +92,29 @@ const (
9192
defaultProxyProtocol = "0" // 默认代理协议
9293
)
9394

94-
// getCommonBuffer 获取共用缓冲区
95-
func (c *Common) getCommonBuffer(size int) []byte {
96-
buf := c.bufferPool.Get().(*[]byte)
97-
if cap(*buf) < size {
98-
return make([]byte, size)
95+
// getTCPBuffer 获取TCP缓冲区
96+
func (c *Common) getTCPBuffer() []byte {
97+
buf := c.tcpBufferPool.Get().(*[]byte)
98+
return (*buf)[:tcpDataBufSize]
99+
}
100+
101+
// putTCPBuffer 归还TCP缓冲区
102+
func (c *Common) putTCPBuffer(buf []byte) {
103+
if buf != nil && cap(buf) >= tcpDataBufSize {
104+
c.tcpBufferPool.Put(&buf)
99105
}
100-
return (*buf)[:size]
101106
}
102107

103-
// putCommonBuffer 归还共用缓冲区
104-
func (c *Common) putCommonBuffer(buf []byte) {
105-
if buf != nil {
106-
c.bufferPool.Put(&buf)
108+
// getUDPBuffer 获取UDP缓冲区
109+
func (c *Common) getUDPBuffer() []byte {
110+
buf := c.udpBufferPool.Get().(*[]byte)
111+
return (*buf)[:udpDataBufSize]
112+
}
113+
114+
// putUDPBuffer 归还UDP缓冲区
115+
func (c *Common) putUDPBuffer(buf []byte) {
116+
if buf != nil && cap(buf) >= udpDataBufSize {
117+
c.udpBufferPool.Put(&buf)
107118
}
108119
}
109120

@@ -722,9 +733,16 @@ func (c *Common) commonTCPLoop() {
722733

723734
c.logger.Debug("TCP launch signal: cid %v -> %v", id, c.tunnelTCPConn.RemoteAddr())
724735

736+
buffer1 := c.getTCPBuffer()
737+
buffer2 := c.getTCPBuffer()
738+
defer func() {
739+
c.putTCPBuffer(buffer1)
740+
c.putTCPBuffer(buffer2)
741+
}()
742+
725743
// 交换数据
726744
c.logger.Debug("Starting exchange: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
727-
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout, c.getCommonBuffer(tcpDataBufSize)))
745+
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout, buffer1, buffer2))
728746
}(targetConn)
729747
}
730748
}
@@ -736,17 +754,17 @@ func (c *Common) commonUDPLoop() {
736754
return
737755
}
738756

739-
buffer := c.getCommonBuffer(udpDataBufSize)
757+
buffer := c.getUDPBuffer()
740758

741759
// 读取来自目标的UDP数据
742760
x, clientAddr, err := c.targetUDPConn.ReadFromUDP(buffer)
743761
if err != nil {
744762
if c.ctx.Err() != nil || err == net.ErrClosed {
745-
c.putCommonBuffer(buffer)
763+
c.putUDPBuffer(buffer)
746764
return
747765
}
748766
c.logger.Error("commonUDPLoop: readFromUDP failed: %v", err)
749-
c.putCommonBuffer(buffer)
767+
c.putUDPBuffer(buffer)
750768

751769
select {
752770
case <-c.ctx.Done():
@@ -771,7 +789,7 @@ func (c *Common) commonUDPLoop() {
771789
// 尝试获取UDP连接槽位
772790
if !c.tryAcquireSlot(true) {
773791
c.logger.Error("commonUDPLoop: UDP slot limit reached: %v/%v", c.udpSlot, c.slotLimit)
774-
c.putCommonBuffer(buffer)
792+
c.putUDPBuffer(buffer)
775793
continue
776794
}
777795

@@ -798,8 +816,8 @@ func (c *Common) commonUDPLoop() {
798816
c.releaseSlot(true)
799817
}()
800818

801-
buffer := c.getCommonBuffer(udpDataBufSize)
802-
defer c.putCommonBuffer(buffer)
819+
buffer := c.getUDPBuffer()
820+
defer c.putUDPBuffer(buffer)
803821
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: c.readTimeout}
804822

805823
for {
@@ -855,13 +873,13 @@ func (c *Common) commonUDPLoop() {
855873
c.logger.Error("commonUDPLoop: write to tunnel failed: %v", err)
856874
c.targetUDPSession.Delete(sessionKey)
857875
remoteConn.Close()
858-
c.putCommonBuffer(buffer)
876+
c.putUDPBuffer(buffer)
859877
continue
860878
}
861879

862880
// 传输完成
863881
c.logger.Debug("Transfer complete: %v <-> %v", remoteConn.LocalAddr(), c.targetUDPConn.LocalAddr())
864-
c.putCommonBuffer(buffer)
882+
c.putUDPBuffer(buffer)
865883
}
866884
}
867885

@@ -992,9 +1010,16 @@ func (c *Common) commonTCPOnce(signalURL *url.URL) {
9921010
return
9931011
}
9941012

1013+
buffer1 := c.getTCPBuffer()
1014+
buffer2 := c.getTCPBuffer()
1015+
defer func() {
1016+
c.putTCPBuffer(buffer1)
1017+
c.putTCPBuffer(buffer2)
1018+
}()
1019+
9951020
// 交换数据
9961021
c.logger.Debug("Starting exchange: %v <-> %v", remoteConn.LocalAddr(), targetConn.LocalAddr())
997-
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout, c.getCommonBuffer(tcpDataBufSize)))
1022+
c.logger.Debug("Exchange complete: %v", conn.DataExchange(remoteConn, targetConn, c.readTimeout, buffer1, buffer2))
9981023
}
9991024

10001025
// commonUDPOnce 共用处理单个UDP请求
@@ -1051,8 +1076,8 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
10511076
go func() {
10521077
defer func() { done <- struct{}{} }()
10531078

1054-
buffer := c.getCommonBuffer(udpDataBufSize)
1055-
defer c.putCommonBuffer(buffer)
1079+
buffer := c.getUDPBuffer()
1080+
defer c.putUDPBuffer(buffer)
10561081
reader := &conn.TimeoutReader{Conn: remoteConn, Timeout: c.readTimeout}
10571082
for {
10581083
if c.ctx.Err() != nil {
@@ -1085,8 +1110,8 @@ func (c *Common) commonUDPOnce(signalURL *url.URL) {
10851110
go func() {
10861111
defer func() { done <- struct{}{} }()
10871112

1088-
buffer := c.getCommonBuffer(udpDataBufSize)
1089-
defer c.putCommonBuffer(buffer)
1113+
buffer := c.getUDPBuffer()
1114+
defer c.putUDPBuffer(buffer)
10901115
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: c.readTimeout}
10911116
for {
10921117
if c.ctx.Err() != nil {
@@ -1242,10 +1267,16 @@ func (c *Common) singleTCPLoop() error {
12421267
c.logger.Error("singleTCPLoop: sendProxyV1Header failed: %v", err)
12431268
return
12441269
}
1270+
buffer1 := c.getTCPBuffer()
1271+
buffer2 := c.getTCPBuffer()
1272+
defer func() {
1273+
c.putTCPBuffer(buffer1)
1274+
c.putTCPBuffer(buffer2)
1275+
}()
12451276

12461277
// 交换数据
12471278
c.logger.Debug("Starting exchange: %v <-> %v", tunnelConn.LocalAddr(), targetConn.LocalAddr())
1248-
c.logger.Debug("Exchange complete: %v", conn.DataExchange(tunnelConn, targetConn, c.readTimeout, c.getCommonBuffer(tcpDataBufSize)))
1279+
c.logger.Debug("Exchange complete: %v", conn.DataExchange(tunnelConn, targetConn, c.readTimeout, buffer1, buffer2))
12491280
}(tunnelConn)
12501281
}
12511282
}
@@ -1257,18 +1288,18 @@ func (c *Common) singleUDPLoop() error {
12571288
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
12581289
}
12591290

1260-
buffer := c.getCommonBuffer(udpDataBufSize)
1291+
buffer := c.getUDPBuffer()
12611292

12621293
// 读取来自隧道的UDP数据
12631294
x, clientAddr, err := c.tunnelUDPConn.ReadFromUDP(buffer)
12641295
if err != nil {
12651296
if c.ctx.Err() != nil || err == net.ErrClosed {
1266-
c.putCommonBuffer(buffer)
1297+
c.putUDPBuffer(buffer)
12671298
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
12681299
}
12691300
c.logger.Error("singleUDPLoop: ReadFromUDP failed: %v", err)
12701301

1271-
c.putCommonBuffer(buffer)
1302+
c.putUDPBuffer(buffer)
12721303
select {
12731304
case <-c.ctx.Done():
12741305
return fmt.Errorf("singleUDPLoop: context error: %w", c.ctx.Err())
@@ -1291,7 +1322,7 @@ func (c *Common) singleUDPLoop() error {
12911322
// 尝试获取UDP连接槽位
12921323
if !c.tryAcquireSlot(true) {
12931324
c.logger.Error("singleUDPLoop: UDP slot limit reached: %v/%v", c.udpSlot, c.slotLimit)
1294-
c.putCommonBuffer(buffer)
1325+
c.putUDPBuffer(buffer)
12951326
continue
12961327
}
12971328

@@ -1300,7 +1331,7 @@ func (c *Common) singleUDPLoop() error {
13001331
if err != nil {
13011332
c.logger.Error("singleUDPLoop: dialTimeout failed: %v", err)
13021333
c.releaseSlot(true)
1303-
c.putCommonBuffer(buffer)
1334+
c.putUDPBuffer(buffer)
13041335
continue
13051336
}
13061337
targetConn = newSession
@@ -1315,8 +1346,8 @@ func (c *Common) singleUDPLoop() error {
13151346
c.releaseSlot(true)
13161347
}()
13171348

1318-
buffer := c.getCommonBuffer(udpDataBufSize)
1319-
defer c.putCommonBuffer(buffer)
1349+
buffer := c.getUDPBuffer()
1350+
defer c.putUDPBuffer(buffer)
13201351
reader := &conn.TimeoutReader{Conn: targetConn, Timeout: c.readTimeout}
13211352

13221353
for {
@@ -1364,12 +1395,12 @@ func (c *Common) singleUDPLoop() error {
13641395
if targetConn != nil {
13651396
targetConn.Close()
13661397
}
1367-
c.putCommonBuffer(buffer)
1398+
c.putUDPBuffer(buffer)
13681399
return fmt.Errorf("singleUDPLoop: write to target failed: %w", err)
13691400
}
13701401

13711402
// 传输完成
13721403
c.logger.Debug("Transfer complete: %v <-> %v", targetConn.LocalAddr(), c.tunnelUDPConn.LocalAddr())
1373-
c.putCommonBuffer(buffer)
1404+
c.putUDPBuffer(buffer)
13741405
}
13751406
}

0 commit comments

Comments
 (0)