Skip to content

Commit 0d65a0f

Browse files
authored
Optimize catchpointdump utility (#3561)
## Summary * Do not load everything into memory but write directly in into a file This creates extremely high number of reallocation and CPU and RAM wasting. * Add cmd options for downloading only (no loading), and for downloading from a single relay # Perf data: | param | current | new | |-----------|--------|---------| | RAM | 5 GB | 200 MB | | Total time | 823 s | 441 s | Downloading time went down from minutes to tens of seconds.
1 parent 6230dc5 commit 0d65a0f

File tree

2 files changed

+82
-101
lines changed

2 files changed

+82
-101
lines changed

cmd/catchpointdump/file.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ package main
1919
import (
2020
"archive/tar"
2121
"bufio"
22-
"bytes"
2322
"context"
2423
"database/sql"
2524
"encoding/json"
2625
"fmt"
2726
"io"
28-
"io/ioutil"
2927
"os"
3028
"strings"
3129
"time"
@@ -59,9 +57,14 @@ var fileCmd = &cobra.Command{
5957
cmd.HelpFunc()(cmd, args)
6058
return
6159
}
62-
tarFileBytes, err := ioutil.ReadFile(tarFile)
63-
if err != nil || len(tarFileBytes) == 0 {
64-
reportErrorf("Unable to read '%s' : %v", tarFile, err)
60+
stats, err := os.Stat(tarFile)
61+
if err != nil {
62+
reportErrorf("Unable to stat '%s' : %v", tarFile, err)
63+
}
64+
tarSize := stats.Size()
65+
66+
if tarSize == 0 {
67+
reportErrorf("Empty file '%s' : %v", tarFile, err)
6568
}
6669
genesisInitState := ledgercore.InitState{}
6770
cfg := config.GetDefaultLocal()
@@ -84,7 +87,14 @@ var fileCmd = &cobra.Command{
8487
reportErrorf("Unable to initialize catchup database : %v", err)
8588
}
8689
var fileHeader ledger.CatchpointFileHeader
87-
fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, tarFileBytes)
90+
91+
reader, err := os.Open(tarFile)
92+
if err != nil {
93+
reportErrorf("Unable to read '%s' : %v", tarFile, err)
94+
}
95+
defer reader.Close()
96+
97+
fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize)
8898
if err != nil {
8999
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
90100
}
@@ -115,15 +125,14 @@ func printLoadCatchpointProgressLine(progress int, barLength int, dld int64) {
115125
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
116126
}
117127

118-
func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, fileBytes []byte) (fileHeader ledger.CatchpointFileHeader, err error) {
128+
func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.CatchpointCatchupAccessor, tarFile io.Reader, tarSize int64) (fileHeader ledger.CatchpointFileHeader, err error) {
119129
fmt.Printf("\n")
120130
printLoadCatchpointProgressLine(0, 50, 0)
121131
lastProgressUpdate := time.Now()
122132
progress := uint64(0)
123133
defer printLoadCatchpointProgressLine(0, 0, 0)
124134

125-
reader := bytes.NewReader(fileBytes)
126-
tarReader := tar.NewReader(reader)
135+
tarReader := tar.NewReader(tarFile)
127136
var downloadProgress ledger.CatchpointCatchupAccessorProgress
128137
for {
129138
header, err := tarReader.Next()
@@ -158,9 +167,9 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
158167
// we already know it's valid, since we validated that above.
159168
protocol.Decode(balancesBlockBytes, &fileHeader)
160169
}
161-
if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && len(fileBytes) > 0 {
170+
if time.Since(lastProgressUpdate) > 50*time.Millisecond && tarSize > 0 {
162171
lastProgressUpdate = time.Now()
163-
printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(len(fileBytes))), 50, int64(progress))
172+
printLoadCatchpointProgressLine(int(float64(progress)*50.0/float64(tarSize)), 50, int64(progress))
164173
}
165174
}
166175
}

cmd/catchpointdump/net.go

Lines changed: 62 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import (
4040
var networkName string
4141
var round int
4242
var relayAddress string
43+
var singleCatchpoint bool
44+
var downloadOnly bool
4345

4446
const (
4547
escapeCursorUp = string("\033[A") // Cursor Up
@@ -52,12 +54,14 @@ func init() {
5254
netCmd.Flags().StringVarP(&networkName, "net", "n", "", "Specify the network name ( i.e. mainnet.algorand.network )")
5355
netCmd.Flags().IntVarP(&round, "round", "r", 0, "Specify the round number ( i.e. 7700000 )")
5456
netCmd.Flags().StringVarP(&relayAddress, "relay", "p", "", "Relay address to use ( i.e. r-ru.algorand-mainnet.network:4160 )")
57+
netCmd.Flags().BoolVarP(&singleCatchpoint, "single", "s", true, "Download/process only from a single relay")
58+
netCmd.Flags().BoolVarP(&downloadOnly, "download", "l", false, "Download only, do not process")
5559
}
5660

5761
var netCmd = &cobra.Command{
5862
Use: "net",
59-
Short: "Download and decode (possibly all) catchpoint files from all or specified the relay(s) on the network for a particular round",
60-
Long: "Download and decode (possibly all) catchpoint files from all or specified the relay(s) on the network for a particular round",
63+
Short: "Download and decode (possibly all) catchpoint files from possibly all or specified the relay(s) on the network for a particular round",
64+
Long: "Download and decode (possibly all) catchpoint files from possibly all or specified the relay(s) on the network for a particular round",
6165
Args: validateNoPosArgsFn,
6266
Run: func(cmd *cobra.Command, args []string) {
6367
if networkName == "" || round == 0 {
@@ -77,21 +81,20 @@ var netCmd = &cobra.Command{
7781
}
7882

7983
for _, addr := range addrs {
80-
catchpointFileBytes, err := downloadCatchpoint(addr)
81-
if err != nil || catchpointFileBytes == nil {
82-
reportInfof("failed to download catchpoint from '%s' : %v", addr, err)
83-
continue
84-
}
85-
err = saveCatchpointTarFile(addr, catchpointFileBytes)
84+
tarName, err := downloadCatchpoint(addr, round)
8685
if err != nil {
87-
reportInfof("failed to save catchpoint file for '%s' : %v", addr, err)
86+
reportInfof("failed to download catchpoint from '%s' : %v", addr, err)
8887
continue
8988
}
90-
err = makeFileDump(addr, catchpointFileBytes)
89+
err = makeFileDump(addr, tarName)
9190
if err != nil {
9291
reportInfof("failed to make a dump from tar file for '%s' : %v", addr, err)
9392
continue
9493
}
94+
if singleCatchpoint {
95+
// a catchpoint processes successfully, exit if needed
96+
break
97+
}
9598
}
9699
},
97100
}
@@ -144,13 +147,13 @@ func printDownloadProgressLine(progress int, barLength int, url string, dld int6
144147
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
145148
}
146149

147-
func downloadCatchpoint(addr string) ([]byte, error) {
150+
func downloadCatchpoint(addr string, round int) (tarName string, err error) {
148151
genesisID := strings.Split(networkName, ".")[0] + "-v1.0"
149152
url := "http://" + addr + "/v1/" + genesisID + "/ledger/" + strconv.FormatUint(uint64(round), 36)
150153
fmt.Printf("downloading from %s\n", url)
151154
request, err := http.NewRequest(http.MethodGet, url, nil)
152155
if err != nil {
153-
return nil, err
156+
return
154157
}
155158

156159
timeoutContext, timeoutContextCancel := context.WithTimeout(context.Background(), config.GetDefaultLocal().MaxCatchpointDownloadDuration)
@@ -159,116 +162,72 @@ func downloadCatchpoint(addr string) ([]byte, error) {
159162
network.SetUserAgentHeader(request.Header)
160163
response, err := http.DefaultClient.Do(request)
161164
if err != nil {
162-
return nil, err
165+
return
163166
}
164167
defer response.Body.Close()
165168

166169
// check to see that we had no errors.
167170
switch response.StatusCode {
168171
case http.StatusOK:
169172
case http.StatusNotFound: // server could not find a block with that round numbers.
170-
return nil, fmt.Errorf("no catchpoint file for round %d", round)
173+
err = fmt.Errorf("no catchpoint file for round %d", round)
174+
return
171175
default:
172-
return nil, fmt.Errorf("error response status code %d", response.StatusCode)
173-
}
174-
wdReader := util.MakeWatchdogStreamReader(response.Body, 4096, 4096, 2*time.Second)
175-
outBytes := make([]byte, 0, 4096)
176-
tempBytes := make([]byte, 4096)
177-
lastProgressUpdate := time.Now()
178-
progress := -25
179-
printDownloadProgressLine(progress, 50, url, 0)
180-
defer printDownloadProgressLine(0, 0, url, 0)
181-
for {
182-
n, err := wdReader.Read(tempBytes)
183-
if err != nil {
184-
if err == io.EOF {
185-
outBytes = append(outBytes, tempBytes[:n]...)
186-
return outBytes, nil
187-
}
188-
return nil, err
189-
}
190-
if cap(outBytes) < len(outBytes)+n {
191-
// need to increase buffer.
192-
newBuffer := make([]byte, cap(outBytes)+n, cap(outBytes)+1024*1024)
193-
copy(newBuffer, outBytes)
194-
copy(newBuffer[len(outBytes):], tempBytes[:n])
195-
outBytes = newBuffer
196-
} else {
197-
outBytes = append(outBytes, tempBytes[:n]...)
198-
}
199-
err = wdReader.Reset()
200-
if err != nil {
201-
if err == io.EOF {
202-
return outBytes, nil
203-
}
204-
return nil, err
205-
}
206-
if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond {
207-
lastProgressUpdate = time.Now()
208-
printDownloadProgressLine(progress, 50, url, int64(len(outBytes)))
209-
progress++
210-
}
211-
}
212-
}
213-
214-
func printSaveProgressLine(progress int, barLength int, filename string, dld int64) {
215-
if barLength == 0 {
216-
fmt.Printf(escapeCursorUp+escapeDeleteLine+"[ Done ] Saved %s\n", filename)
176+
err = fmt.Errorf("error response status code %d", response.StatusCode)
217177
return
218178
}
219179

220-
outString := "[" + strings.Repeat(escapeSquare, progress) + strings.Repeat(escapeDot, barLength-progress) + "] Saving " + filename + " ..."
221-
222-
fmt.Printf(escapeCursorUp+escapeDeleteLine+outString+" %s\n", formatSize(dld))
223-
}
224-
225-
func saveCatchpointTarFile(addr string, catchpointFileBytes []byte) (err error) {
226-
// make a directory:
227180
dirName := "./" + strings.Split(networkName, ".")[0] + "/" + strings.Split(addr, ".")[0]
228181
os.RemoveAll(dirName)
229182
err = os.MkdirAll(dirName, 0777)
230183
if err != nil && !os.IsExist(err) {
231184
return
232185
}
233-
destFileName := dirName + "/" + strconv.FormatUint(uint64(round), 10) + ".tar"
234-
file, err2 := os.Create(destFileName) // will create a file with 0666 permission.
186+
tarName = dirName + "/" + strconv.FormatUint(uint64(round), 10) + ".tar"
187+
file, err2 := os.Create(tarName) // will create a file with 0666 permission.
235188
if err2 != nil {
236-
return err2
189+
return tarName, err2
237190
}
238191
defer func() {
239192
err = file.Close()
240193
}()
241194
writeChunkSize := 64 * 1024
195+
196+
wdReader := util.MakeWatchdogStreamReader(response.Body, 4096, 4096, 2*time.Second)
197+
var totalBytes int
198+
tempBytes := make([]byte, writeChunkSize)
242199
lastProgressUpdate := time.Now()
243-
fmt.Printf("\n")
244-
printSaveProgressLine(0, 50, destFileName, 0)
245-
progress := uint64(0)
246-
defer printSaveProgressLine(0, 0, destFileName, 0)
247-
total := len(catchpointFileBytes)
200+
progress := -25
201+
printDownloadProgressLine(progress, 50, url, 0)
202+
defer printDownloadProgressLine(0, 0, url, 0)
203+
var n int
248204
for {
249-
writeSize := writeChunkSize
250-
if len(catchpointFileBytes) < writeSize {
251-
writeSize = len(catchpointFileBytes)
205+
n, err = wdReader.Read(tempBytes)
206+
if err != nil && err != io.EOF {
207+
return
252208
}
253-
if writeSize <= 0 {
254-
break
209+
totalBytes += n
210+
writtenBytes, err2 := file.Write(tempBytes[:n])
211+
if err2 != nil || n != writtenBytes {
212+
return tarName, err2
255213
}
256-
n, err2 := file.Write(catchpointFileBytes[:writeSize])
257-
if err2 != nil || n != writeSize {
258-
return err
214+
215+
err = wdReader.Reset()
216+
if err != nil {
217+
if err == io.EOF {
218+
return tarName, nil
219+
}
220+
return
259221
}
260-
catchpointFileBytes = catchpointFileBytes[n:]
261-
if time.Now().Sub(lastProgressUpdate) > 50*time.Millisecond && total > 0 {
222+
if time.Since(lastProgressUpdate) > 50*time.Millisecond {
262223
lastProgressUpdate = time.Now()
263-
printSaveProgressLine(int(float64(progress)*50.0/float64(total)), 50, destFileName, int64(progress))
264-
224+
printDownloadProgressLine(progress, 50, url, int64(totalBytes))
225+
progress++
265226
}
266-
progress += uint64(n)
267227
}
268-
return
269228
}
270229

271-
func makeFileDump(addr string, catchpointFileBytes []byte) error {
230+
func makeFileDump(addr string, tarFile string) error {
272231
genesisInitState := ledgercore.InitState{}
273232
deleteLedgerFiles := func() {
274233
os.Remove("./ledger.block.sqlite")
@@ -294,8 +253,21 @@ func makeFileDump(addr string, catchpointFileBytes []byte) error {
294253
if err != nil {
295254
reportErrorf("Unable to initialize catchup database : %v", err)
296255
}
256+
257+
stats, err := os.Stat(tarFile)
258+
if err != nil {
259+
return err
260+
}
261+
tarSize := stats.Size()
262+
263+
reader, err := os.Open(tarFile)
264+
if err != nil {
265+
return err
266+
}
267+
defer reader.Close()
268+
297269
var fileHeader ledger.CatchpointFileHeader
298-
fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, catchpointFileBytes)
270+
fileHeader, err = loadCatchpointIntoDatabase(context.Background(), catchupAccessor, reader, tarSize)
299271
if err != nil {
300272
reportErrorf("Unable to load catchpoint file into in-memory database : %v", err)
301273
}

0 commit comments

Comments
 (0)