@@ -31,7 +31,21 @@ import (
31
31
"google.golang.org/api/iterator"
32
32
)
33
33
34
- // Obj contains information about the GCS object.
34
+ var GetBucketManager = getBucketManager
35
+
36
+ // bucketHandler defines the available interactaions with a GCS bucket.
37
+ type bucketHandler interface {
38
+ // ListObjects list the objects that match the given query.
39
+ ListObjects (ctx context.Context , q * storage.Query ) ([]string , error )
40
+ // DownloadObject download the object with the given uri in the localPath.
41
+ DownloadObject (ctx context.Context , localPath , uri string ) error
42
+ // UploadObject creates a files with the given content with the objName.
43
+ UploadObject (ctx context.Context , objName string , content * os.File ) error
44
+ // Close closes the bucket handler connection.
45
+ Close ()
46
+ }
47
+
48
+ // uriInfo contains information about the GCS object.
35
49
type uriInfo struct {
36
50
// Bucket is the name of the GCS bucket.
37
51
Bucket string
@@ -48,17 +62,16 @@ type Native struct{}
48
62
49
63
// Downloads the content that match the given src uri and subfolders.
50
64
func (n * Native ) DownloadRecursive (ctx context.Context , src , dst string ) error {
51
- sc , err := storage . NewClient ( ctx )
65
+ uriInfo , err := n . parseGCSURI ( src )
52
66
if err != nil {
53
- return fmt . Errorf ( "error creating GCS Client: %w" , err )
67
+ return err
54
68
}
55
- defer sc .Close ()
56
69
57
- uriInfo , err := n . parseGCSURI ( src )
70
+ bucket , err := GetBucketManager ( ctx , uriInfo . Bucket )
58
71
if err != nil {
59
72
return err
60
73
}
61
- bucket := sc . Bucket ( uriInfo . Bucket )
74
+ defer bucket . Close ( )
62
75
63
76
files , err := n .filesToDownload (ctx , bucket , uriInfo )
64
77
if err != nil {
@@ -67,7 +80,15 @@ func (n *Native) DownloadRecursive(ctx context.Context, src, dst string) error {
67
80
68
81
for uri , localPath := range files {
69
82
fullPath := filepath .Join (dst , localPath )
70
- if err := n .downloadFile (ctx , bucket , fullPath , uri ); err != nil {
83
+
84
+ dir := filepath .Dir (fullPath )
85
+ if _ , err := os .Stat (dir ); os .IsNotExist (err ) {
86
+ if err := os .MkdirAll (dir , os .ModePerm ); err != nil {
87
+ return fmt .Errorf ("failed to create directory: %v" , err )
88
+ }
89
+ }
90
+
91
+ if err := bucket .DownloadObject (ctx , fullPath , uri ); err != nil {
71
92
return err
72
93
}
73
94
}
@@ -77,45 +98,36 @@ func (n *Native) DownloadRecursive(ctx context.Context, src, dst string) error {
77
98
78
99
// Uploads a single file to the given dst.
79
100
func (n * Native ) UploadFile (ctx context.Context , src , dst string ) error {
80
- sc , err := storage .NewClient (ctx )
81
- if err != nil {
82
- return fmt .Errorf ("error creating GCS Client: %w" , err )
83
- }
84
- defer sc .Close ()
85
-
86
101
f , err := os .Open (src )
87
102
if err != nil {
88
103
return fmt .Errorf ("error opening file: %w" , err )
89
104
}
90
105
defer f .Close ()
91
106
92
- uinfo , err := n .parseGCSURI (dst )
107
+ urinfo , err := n .parseGCSURI (dst )
93
108
if err != nil {
94
109
return err
95
110
}
96
- bucket := sc .Bucket (uinfo .Bucket )
97
111
98
- isDirectory , err := n . isGCSDirectory (ctx , bucket , uinfo )
112
+ bucket , err := GetBucketManager (ctx , urinfo . Bucket )
99
113
if err != nil {
100
114
return err
101
115
}
102
116
103
- dstObj := uinfo .ObjPath
117
+ isDirectory , err := n .isGCSDirectory (ctx , bucket , urinfo )
118
+ if err != nil {
119
+ return err
120
+ }
121
+
122
+ dstObj := urinfo .ObjPath
104
123
if isDirectory {
105
124
dstObj , err = url .JoinPath (dstObj , filepath .Base (src ))
106
125
if err != nil {
107
126
return err
108
127
}
109
128
}
110
129
111
- wc := bucket .Object (dstObj ).NewWriter (ctx )
112
- if _ , err = io .Copy (wc , f ); err != nil {
113
- return fmt .Errorf ("error copying file to GCS: %w" , err )
114
- }
115
- if err := wc .Close (); err != nil {
116
- return fmt .Errorf ("error closing GCS writer: %w" , err )
117
- }
118
- return nil
130
+ return bucket .UploadObject (ctx , dstObj , f )
119
131
}
120
132
121
133
func (n * Native ) parseGCSURI (uri string ) (uriInfo , error ) {
@@ -136,10 +148,10 @@ func (n *Native) parseGCSURI(uri string) (uriInfo, error) {
136
148
return gcsobj , nil
137
149
}
138
150
139
- func (n * Native ) filesToDownload (ctx context.Context , bucket * storage. BucketHandle , uinfo uriInfo ) (map [string ]string , error ) {
151
+ func (n * Native ) filesToDownload (ctx context.Context , bucket bucketHandler , urinfo uriInfo ) (map [string ]string , error ) {
140
152
uriToLocalPath := map [string ]string {}
141
153
142
- exactMatches , err := n . listObjects (ctx , bucket , & storage.Query {MatchGlob : uinfo .ObjPath })
154
+ exactMatches , err := bucket . ListObjects (ctx , & storage.Query {MatchGlob : urinfo .ObjPath })
143
155
if err != nil {
144
156
return nil , err
145
157
}
@@ -148,58 +160,37 @@ func (n *Native) filesToDownload(ctx context.Context, bucket *storage.BucketHand
148
160
uriToLocalPath [match ] = filepath .Base (match )
149
161
}
150
162
151
- recursiveMatches , err := n .recursiveListing (ctx , bucket , uinfo )
163
+ recursiveMatches , err := n .recursiveListing (ctx , bucket , urinfo )
152
164
if err != nil {
153
165
return nil , err
154
166
}
155
167
156
- for _ , match := range recursiveMatches {
157
- uriToLocalPath [match ] = match
168
+ for uri , match := range recursiveMatches {
169
+ uriToLocalPath [uri ] = match
158
170
}
159
171
160
172
return uriToLocalPath , nil
161
173
}
162
174
163
- func (n * Native ) listObjects (ctx context.Context , bucket * storage.BucketHandle , q * storage.Query ) ([]string , error ) {
164
- matches := []string {}
165
- it := bucket .Objects (ctx , q )
166
-
167
- for {
168
- attrs , err := it .Next ()
169
- if err == iterator .Done {
170
- break
171
- }
172
-
173
- if err != nil {
174
- return nil , fmt .Errorf ("failed to iterate objects: %v" , err )
175
- }
176
-
177
- if attrs .Name != "" {
178
- matches = append (matches , attrs .Name )
179
- }
180
- }
181
- return matches , nil
182
- }
183
-
184
- func (n * Native ) recursiveListing (ctx context.Context , bucket * storage.BucketHandle , uinfo uriInfo ) (map [string ]string , error ) {
175
+ func (n * Native ) recursiveListing (ctx context.Context , bucket bucketHandler , urinfo uriInfo ) (map [string ]string , error ) {
185
176
uriToLocalPath := map [string ]string {}
186
- recursiveURI := n .uriForRecursiveSearch (uinfo .ObjPath )
187
- recursiveMatches , err := n . listObjects (ctx , bucket , & storage.Query {MatchGlob : recursiveURI })
177
+ recursiveURI := n .uriForRecursiveSearch (urinfo .ObjPath )
178
+ recursiveMatches , err := bucket . ListObjects (ctx , & storage.Query {MatchGlob : recursiveURI })
188
179
if err != nil {
189
180
return nil , err
190
181
}
191
182
192
- prefixRemovalURI := n .uriForPrefixRemoval (uinfo .Full ())
183
+ prefixRemovalURI := n .uriForPrefixRemoval (urinfo .Full ())
193
184
prefixRemovalRegex , err := n .wildcardToRegex (prefixRemovalURI )
194
185
if err != nil {
195
186
return nil , err
196
187
}
197
188
198
- shouldRecreateFolders := ! strings .Contains (uinfo .ObjPath , "**" )
189
+ shouldRecreateFolders := ! strings .Contains (urinfo .ObjPath , "**" )
199
190
for _ , match := range recursiveMatches {
200
191
destPath := filepath .Base (match )
201
192
if shouldRecreateFolders {
202
- matchWithBucket := uinfo .Bucket + "/" + match
193
+ matchWithBucket := urinfo .Bucket + "/" + match
203
194
destPath = string (prefixRemovalRegex .ReplaceAll ([]byte (matchWithBucket ), []byte ("" )))
204
195
}
205
196
uriToLocalPath [match ] = destPath
@@ -254,15 +245,69 @@ func (n *Native) wildcardToRegex(wildcard string) (*regexp.Regexp, error) {
254
245
return regexp .Compile (regexStr )
255
246
}
256
247
257
- func (n * Native ) downloadFile (ctx context.Context , bucket * storage.BucketHandle , localPath , uri string ) error {
258
- dir := filepath .Dir (localPath )
259
- if _ , err := os .Stat (dir ); os .IsNotExist (err ) {
260
- if err := os .MkdirAll (dir , os .ModePerm ); err != nil {
261
- return fmt .Errorf ("failed to create directory: %v" , err )
248
+ func (n * Native ) isGCSDirectory (ctx context.Context , bucket bucketHandler , urinfo uriInfo ) (bool , error ) {
249
+ if urinfo .ObjPath == "" {
250
+ return true , nil
251
+ }
252
+
253
+ if strings .HasSuffix (urinfo .ObjPath , "/" ) {
254
+ return true , nil
255
+ }
256
+
257
+ q := & storage.Query {Prefix : urinfo .ObjPath + "/" }
258
+ matches , err := bucket .ListObjects (ctx , q )
259
+ if err != nil {
260
+ return false , err
261
+ }
262
+
263
+ if len (matches ) > 0 {
264
+ return true , nil
265
+ }
266
+
267
+ return false , nil
268
+ }
269
+
270
+ func getBucketManager (ctx context.Context , bucketName string ) (bucketHandler , error ) {
271
+ sc , err := storage .NewClient (ctx )
272
+ if err != nil {
273
+ return nil , fmt .Errorf ("error creating GCS Client: %w" , err )
274
+ }
275
+
276
+ return nativeBucketHandler {
277
+ storageClient : sc ,
278
+ bucket : sc .Bucket (bucketName ),
279
+ }, nil
280
+ }
281
+
282
+ // nativeBucketHandler implements a handler using the Cloud client libraries.
283
+ type nativeBucketHandler struct {
284
+ storageClient * storage.Client
285
+ bucket * storage.BucketHandle
286
+ }
287
+
288
+ func (nb nativeBucketHandler ) ListObjects (ctx context.Context , q * storage.Query ) ([]string , error ) {
289
+ matches := []string {}
290
+ it := nb .bucket .Objects (ctx , q )
291
+
292
+ for {
293
+ attrs , err := it .Next ()
294
+ if err == iterator .Done {
295
+ break
296
+ }
297
+
298
+ if err != nil {
299
+ return nil , fmt .Errorf ("failed to iterate objects: %v" , err )
300
+ }
301
+
302
+ if attrs .Name != "" {
303
+ matches = append (matches , attrs .Name )
262
304
}
263
305
}
306
+ return matches , nil
307
+ }
264
308
265
- reader , err := bucket .Object (uri ).NewReader (ctx )
309
+ func (nb nativeBucketHandler ) DownloadObject (ctx context.Context , localPath , uri string ) error {
310
+ reader , err := nb .bucket .Object (uri ).NewReader (ctx )
266
311
if err != nil {
267
312
return fmt .Errorf ("failed to read object: %v" , err )
268
313
}
@@ -281,24 +326,17 @@ func (n *Native) downloadFile(ctx context.Context, bucket *storage.BucketHandle,
281
326
return nil
282
327
}
283
328
284
- func (n * Native ) isGCSDirectory (ctx context.Context , bucket * storage.BucketHandle , uinfo uriInfo ) (bool , error ) {
285
- if uinfo .ObjPath == "" {
286
- return true , nil
287
- }
288
-
289
- if strings .HasSuffix (uinfo .ObjPath , "/" ) {
290
- return true , nil
291
- }
292
-
293
- q := & storage.Query {Prefix : uinfo .ObjPath + "/" }
294
- matches , err := n .listObjects (ctx , bucket , q )
295
- if err != nil {
296
- return false , err
329
+ func (nb nativeBucketHandler ) UploadObject (ctx context.Context , objName string , content * os.File ) error {
330
+ wc := nb .bucket .Object (objName ).NewWriter (ctx )
331
+ if _ , err := io .Copy (wc , content ); err != nil {
332
+ return fmt .Errorf ("error copying file to GCS: %w" , err )
297
333
}
298
-
299
- if len (matches ) > 0 {
300
- return true , nil
334
+ if err := wc .Close (); err != nil {
335
+ return fmt .Errorf ("error closing GCS writer: %w" , err )
301
336
}
337
+ return nil
338
+ }
302
339
303
- return false , nil
340
+ func (nb nativeBucketHandler ) Close () {
341
+ nb .storageClient .Close ()
304
342
}
0 commit comments