@@ -19,6 +19,8 @@ import (
19
19
"github.com/aws/aws-sdk-go-v2/credentials"
20
20
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
21
21
"github.com/aws/aws-sdk-go-v2/service/s3"
22
+ "github.com/vbauerster/mpb/v8"
23
+ "github.com/vbauerster/mpb/v8/decor"
22
24
23
25
"github.com/replicate/cog/pkg/global"
24
26
"github.com/replicate/cog/pkg/requirements"
@@ -55,6 +57,9 @@ func userAgent() string {
55
57
56
58
func FastPush (ctx context.Context , image string , projectDir string , command Command ) error {
57
59
g , _ := errgroup .WithContext (ctx )
60
+ p := mpb .New (
61
+ mpb .WithRefreshRate (180 * time .Millisecond ),
62
+ )
58
63
59
64
token , err := command .LoadLoginToken (global .ReplicateRegistryHost )
60
65
if err != nil {
@@ -69,7 +74,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
69
74
// Upload weights
70
75
for _ , weight := range weights {
71
76
g .Go (func () error {
72
- return uploadFile (ctx , weightsObjectType , weight .Digest , weight .Path , token )
77
+ return uploadFile (ctx , weightsObjectType , weight .Digest , weight .Path , token , p , "weights - " + filepath . Base ( weight . Path ) )
73
78
})
74
79
}
75
80
@@ -84,7 +89,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
84
89
return err
85
90
}
86
91
g .Go (func () error {
87
- return uploadFile (ctx , filesObjectType , hash , aptTarFile , token )
92
+ return uploadFile (ctx , filesObjectType , hash , aptTarFile , token , p , "apt" )
88
93
})
89
94
}
90
95
@@ -104,7 +109,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
104
109
return err
105
110
}
106
111
g .Go (func () error {
107
- return uploadFile (ctx , filesObjectType , hash , pythonTar , token )
112
+ return uploadFile (ctx , filesObjectType , hash , pythonTar , token , p , "python-packages" )
108
113
})
109
114
} else {
110
115
requirementsTarFile := filepath .Join (tmpDir , requirementsTarFile )
@@ -127,7 +132,7 @@ func FastPush(ctx context.Context, image string, projectDir string, command Comm
127
132
return err
128
133
}
129
134
g .Go (func () error {
130
- return uploadFile (ctx , filesObjectType , hash , srcTar , token )
135
+ return uploadFile (ctx , filesObjectType , hash , srcTar , token , p , "src" )
131
136
})
132
137
133
138
// Wait for uploads
@@ -187,9 +192,45 @@ func checkVerificationStatus(req *http.Request, client *http.Client) (bool, erro
187
192
return false , nil
188
193
}
189
194
190
- func uploadFile (ctx context.Context , objectType string , digest string , path string , token string ) error {
195
+ func uploadFile (ctx context.Context , objectType string , digest string , path string , token string , p * mpb. Progress , desc string ) error {
191
196
console .Debug ("uploading file: " + path )
192
197
198
+ // Open the file for uploading
199
+ file , err := os .Open (path )
200
+ if err != nil {
201
+ return err
202
+ }
203
+ defer file .Close ()
204
+
205
+ // Find the file size
206
+ fileInfo , err := file .Stat ()
207
+ if err != nil {
208
+ return err
209
+ }
210
+
211
+ // Start the progress bar
212
+ trimDesc := desc
213
+ if len (trimDesc ) > 20 {
214
+ trimDesc = trimDesc [:20 ]
215
+ }
216
+ if len (trimDesc ) < 20 {
217
+ trimDesc += strings .Repeat (" " , 20 - len (trimDesc ))
218
+ }
219
+ bar := p .New (fileInfo .Size (),
220
+ mpb .BarStyle ().Rbound ("|" ),
221
+ mpb .PrependDecorators (
222
+ decor .Name (trimDesc + " " ),
223
+ decor .Counters (decor .SizeB1024 (0 ), "% .2f / % .2f" ),
224
+ ),
225
+ mpb .AppendDecorators (
226
+ decor .EwmaETA (decor .ET_STYLE_GO , 30 ),
227
+ decor .Name (" ] " ),
228
+ decor .EwmaSpeed (decor .SizeB1024 (0 ), "% .2f" , 30 ),
229
+ ),
230
+ )
231
+ defer bar .Abort (false )
232
+
233
+ // Declare that we want to upload a file.
193
234
uploadUrl := startUploadURL (objectType , digest )
194
235
client := & http.Client {}
195
236
req , err := http .NewRequestWithContext (ctx , http .MethodPost , uploadUrl .String (), nil )
@@ -219,13 +260,6 @@ func uploadFile(ctx context.Context, objectType string, digest string, path stri
219
260
return err
220
261
}
221
262
222
- // Open the file for uploading
223
- file , err := os .Open (path )
224
- if err != nil {
225
- return err
226
- }
227
- defer file .Close ()
228
-
229
263
// Upload the file using an S3 client
230
264
console .Debug ("multi-part uploading file: " + path )
231
265
cfg := aws .NewConfig ()
@@ -244,10 +278,12 @@ func uploadFile(ctx context.Context, objectType string, digest string, path stri
244
278
u .PartSize = 64 * 1024 * 1024 // 64MB per part
245
279
})
246
280
281
+ proxyReader := bar .ProxyReader (file )
282
+ defer proxyReader .Close ()
247
283
uploadParams := & s3.PutObjectInput {
248
284
Bucket : aws .String (data .Bucket ),
249
285
Key : aws .String (data .Key ),
250
- Body : file ,
286
+ Body : proxyReader ,
251
287
}
252
288
_ , err = uploader .Upload (ctx , uploadParams )
253
289
if err != nil {
0 commit comments