Skip to content

Commit 016ed90

Browse files
j2rong4cnCopilot
andauthored
feat(stream): fast buffer freeing for large cache (#1053)
Signed-off-by: j2rong4cn <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent d76407b commit 016ed90

File tree

8 files changed

+204
-75
lines changed

8 files changed

+204
-75
lines changed

internal/bootstrap/config.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ func InitConfig() {
7777
log.Fatalf("update config struct error: %+v", err)
7878
}
7979
}
80+
if !conf.Conf.Force {
81+
confFromEnv()
82+
}
83+
8084
if conf.Conf.MaxConcurrency > 0 {
8185
net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency}
8286
}
@@ -92,25 +96,31 @@ func InitConfig() {
9296
conf.MaxBufferLimit = conf.Conf.MaxBufferLimit * utils.MB
9397
}
9498
log.Infof("max buffer limit: %dMB", conf.MaxBufferLimit/utils.MB)
95-
if !conf.Conf.Force {
96-
confFromEnv()
99+
if conf.Conf.MmapThreshold > 0 {
100+
conf.MmapThreshold = conf.Conf.MmapThreshold * utils.MB
101+
} else {
102+
conf.MmapThreshold = 0
97103
}
104+
log.Infof("mmap threshold: %dMB", conf.Conf.MmapThreshold)
105+
98106
if len(conf.Conf.Log.Filter.Filters) == 0 {
99107
conf.Conf.Log.Filter.Enable = false
100108
}
101109
// convert abs path
102110
convertAbsPath := func(path *string) {
103-
if !filepath.IsAbs(*path) {
111+
if *path != "" && !filepath.IsAbs(*path) {
104112
*path = filepath.Join(pwd, *path)
105113
}
106114
}
115+
convertAbsPath(&conf.Conf.Database.DBFile)
116+
convertAbsPath(&conf.Conf.Scheme.CertFile)
117+
convertAbsPath(&conf.Conf.Scheme.KeyFile)
118+
convertAbsPath(&conf.Conf.Scheme.UnixFile)
119+
convertAbsPath(&conf.Conf.Log.Name)
107120
convertAbsPath(&conf.Conf.TempDir)
108121
convertAbsPath(&conf.Conf.BleveDir)
109-
convertAbsPath(&conf.Conf.Log.Name)
110-
convertAbsPath(&conf.Conf.Database.DBFile)
111-
if conf.Conf.DistDir != "" {
112-
convertAbsPath(&conf.Conf.DistDir)
113-
}
122+
convertAbsPath(&conf.Conf.DistDir)
123+
114124
err := os.MkdirAll(conf.Conf.TempDir, 0o777)
115125
if err != nil {
116126
log.Fatalf("create temp dir error: %+v", err)

internal/conf/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ type Config struct {
120120
Log LogConfig `json:"log" envPrefix:"LOG_"`
121121
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
122122
MaxBufferLimit int `json:"max_buffer_limitMB" env:"MAX_BUFFER_LIMIT_MB"`
123+
MmapThreshold int `json:"mmap_thresholdMB" env:"MMAP_THRESHOLD_MB"`
123124
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
124125
MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"`
125126
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
@@ -176,6 +177,7 @@ func DefaultConfig(dataDir string) *Config {
176177
},
177178
},
178179
MaxBufferLimit: -1,
180+
MmapThreshold: 4,
179181
MaxConnections: 0,
180182
MaxConcurrency: 64,
181183
TlsInsecureSkipVerify: true,

internal/conf/var.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ var PrivacyReg []*regexp.Regexp
2525
var (
2626
// StoragesLoaded loaded success if empty
2727
StoragesLoaded = false
28+
// 单个Buffer最大限制
2829
MaxBufferLimit = 16 * 1024 * 1024
30+
// 超过该阈值的Buffer将使用 mmap 分配,可主动释放内存
31+
MmapThreshold = 4 * 1024 * 1024
2932
)
3033
var (
3134
RawIndexHtml string

internal/net/request.go

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package net
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
@@ -15,6 +14,7 @@ import (
1514
"github.com/OpenListTeam/OpenList/v4/internal/conf"
1615
"github.com/OpenListTeam/OpenList/v4/internal/model"
1716
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
17+
"github.com/rclone/rclone/lib/mmap"
1818

1919
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
2020
"github.com/aws/aws-sdk-go/aws/awsutil"
@@ -255,7 +255,10 @@ func (d *downloader) sendChunkTask(newConcurrency bool) error {
255255
finalSize += firstSize - minSize
256256
}
257257
}
258-
buf.Reset(int(finalSize))
258+
err := buf.Reset(int(finalSize))
259+
if err != nil {
260+
return err
261+
}
259262
ch := chunk{
260263
start: d.pos,
261264
size: finalSize,
@@ -645,11 +648,13 @@ func (mr MultiReadCloser) Close() error {
645648
}
646649

647650
type Buf struct {
648-
buffer *bytes.Buffer
649-
size int //expected size
650-
ctx context.Context
651-
off int
652-
rw sync.Mutex
651+
size int //expected size
652+
ctx context.Context
653+
offR int
654+
offW int
655+
rw sync.Mutex
656+
buf []byte
657+
mmap bool
653658

654659
readSignal chan struct{}
655660
readPending bool
@@ -658,89 +663,122 @@ type Buf struct {
658663
// NewBuf is a buffer that can have 1 read & 1 write at the same time.
659664
// when read is faster write, immediately feed data to read after written
660665
func NewBuf(ctx context.Context, maxSize int) *Buf {
661-
return &Buf{
662-
ctx: ctx,
663-
buffer: bytes.NewBuffer(make([]byte, 0, maxSize)),
664-
size: maxSize,
665-
666+
br := &Buf{
667+
ctx: ctx,
668+
size: maxSize,
666669
readSignal: make(chan struct{}, 1),
667670
}
671+
if conf.MmapThreshold > 0 && maxSize >= conf.MmapThreshold {
672+
m, err := mmap.Alloc(maxSize)
673+
if err == nil {
674+
br.buf = m
675+
br.mmap = true
676+
return br
677+
}
678+
}
679+
br.buf = make([]byte, maxSize)
680+
return br
668681
}
669-
func (br *Buf) Reset(size int) {
682+
683+
func (br *Buf) Reset(size int) error {
670684
br.rw.Lock()
671685
defer br.rw.Unlock()
672-
if br.buffer == nil {
673-
return
686+
if br.buf == nil {
687+
return io.ErrClosedPipe
688+
}
689+
if size > cap(br.buf) {
690+
return fmt.Errorf("reset size %d exceeds max size %d", size, cap(br.buf))
674691
}
675-
br.buffer.Reset()
676692
br.size = size
677-
br.off = 0
693+
br.offR = 0
694+
br.offW = 0
695+
return nil
678696
}
679697

680-
func (br *Buf) Read(p []byte) (n int, err error) {
698+
func (br *Buf) Read(p []byte) (int, error) {
681699
if err := br.ctx.Err(); err != nil {
682700
return 0, err
683701
}
684702
if len(p) == 0 {
685703
return 0, nil
686704
}
687-
if br.off >= br.size {
705+
if br.offR >= br.size {
688706
return 0, io.EOF
689707
}
690708
for {
691709
br.rw.Lock()
692-
if br.buffer != nil {
693-
n, err = br.buffer.Read(p)
694-
} else {
695-
err = io.ErrClosedPipe
710+
if br.buf == nil {
711+
br.rw.Unlock()
712+
return 0, io.ErrClosedPipe
696713
}
697-
if err != nil && err != io.EOF {
714+
715+
if br.offW < br.offR {
698716
br.rw.Unlock()
699-
return
717+
return 0, io.ErrUnexpectedEOF
700718
}
701-
if n > 0 {
702-
br.off += n
719+
if br.offW == br.offR {
720+
br.readPending = true
703721
br.rw.Unlock()
704-
return n, nil
722+
select {
723+
case <-br.ctx.Done():
724+
return 0, br.ctx.Err()
725+
case _, ok := <-br.readSignal:
726+
if !ok {
727+
return 0, io.ErrClosedPipe
728+
}
729+
continue
730+
}
705731
}
706-
br.readPending = true
732+
733+
n := copy(p, br.buf[br.offR:br.offW])
734+
br.offR += n
707735
br.rw.Unlock()
708-
// n==0, err==io.EOF
709-
select {
710-
case <-br.ctx.Done():
711-
return 0, br.ctx.Err()
712-
case _, ok := <-br.readSignal:
713-
if !ok {
714-
return 0, io.ErrClosedPipe
715-
}
716-
continue
736+
if n < len(p) && br.offR >= br.size {
737+
return n, io.EOF
717738
}
739+
return n, nil
718740
}
719741
}
720742

721-
func (br *Buf) Write(p []byte) (n int, err error) {
743+
func (br *Buf) Write(p []byte) (int, error) {
722744
if err := br.ctx.Err(); err != nil {
723745
return 0, err
724746
}
747+
if len(p) == 0 {
748+
return 0, nil
749+
}
725750
br.rw.Lock()
726751
defer br.rw.Unlock()
727-
if br.buffer == nil {
752+
if br.buf == nil {
728753
return 0, io.ErrClosedPipe
729754
}
730-
n, err = br.buffer.Write(p)
755+
if br.offW >= br.size {
756+
return 0, io.ErrShortWrite
757+
}
758+
n := copy(br.buf[br.offW:], p[:min(br.size-br.offW, len(p))])
759+
br.offW += n
731760
if br.readPending {
732761
br.readPending = false
733762
select {
734763
case br.readSignal <- struct{}{}:
735764
default:
736765
}
737766
}
738-
return
767+
if n < len(p) {
768+
return n, io.ErrShortWrite
769+
}
770+
return n, nil
739771
}
740772

741-
func (br *Buf) Close() {
773+
func (br *Buf) Close() error {
742774
br.rw.Lock()
743775
defer br.rw.Unlock()
744-
br.buffer = nil
776+
var err error
777+
if br.mmap {
778+
err = mmap.Free(br.buf)
779+
br.mmap = false
780+
}
781+
br.buf = nil
745782
close(br.readSignal)
783+
return err
746784
}

internal/stream/stream.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
1616
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1717
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
18+
"github.com/rclone/rclone/lib/mmap"
1819
"go4.org/readerutil"
1920
)
2021

@@ -60,8 +61,12 @@ func (f *FileStream) IsForceStreamUpload() bool {
6061
}
6162

6263
func (f *FileStream) Close() error {
63-
var err1, err2 error
64+
if f.peekBuff != nil {
65+
f.peekBuff.Reset()
66+
f.peekBuff = nil
67+
}
6468

69+
var err1, err2 error
6570
err1 = f.Closers.Close()
6671
if errors.Is(err1, os.ErrClosed) {
6772
err1 = nil
@@ -74,10 +79,6 @@ func (f *FileStream) Close() error {
7479
f.tmpFile = nil
7580
}
7681
}
77-
if f.peekBuff != nil {
78-
f.peekBuff.Reset()
79-
f.peekBuff = nil
80-
}
8182

8283
return errors.Join(err1, err2)
8384
}
@@ -194,7 +195,19 @@ func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
194195
f.oriReader = f.Reader
195196
}
196197
bufSize := maxCacheSize - int64(f.peekBuff.Len())
197-
buf := make([]byte, bufSize)
198+
var buf []byte
199+
if conf.MmapThreshold > 0 && bufSize >= int64(conf.MmapThreshold) {
200+
m, err := mmap.Alloc(int(bufSize))
201+
if err == nil {
202+
f.Add(utils.CloseFunc(func() error {
203+
return mmap.Free(m)
204+
}))
205+
buf = m
206+
}
207+
}
208+
if buf == nil {
209+
buf = make([]byte, bufSize)
210+
}
198211
n, err := io.ReadFull(f.oriReader, buf)
199212
if bufSize != int64(n) {
200213
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)

internal/stream/stream_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"io"
88
"testing"
99

10+
"github.com/OpenListTeam/OpenList/v4/internal/conf"
1011
"github.com/OpenListTeam/OpenList/v4/internal/model"
1112
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1213
)
1314

1415
func TestFileStream_RangeRead(t *testing.T) {
16+
conf.MaxBufferLimit = 16 * 1024 * 1024
1517
type args struct {
1618
httpRange http_range.Range
1719
}
@@ -71,7 +73,7 @@ func TestFileStream_RangeRead(t *testing.T) {
7173
}
7274
})
7375
}
74-
t.Run("after check", func(t *testing.T) {
76+
t.Run("after", func(t *testing.T) {
7577
if f.GetFile() == nil {
7678
t.Error("not cached")
7779
}

0 commit comments

Comments
 (0)