Skip to content

Commit ad1438e

Browse files
committed
Setup PubSub framework code.
1 parent c1ef090 commit ad1438e

17 files changed

+877
-6
lines changed

cron/config/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2021 Security Scorecard Authors
2+
//
3+
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permisshandlerns and
13+
// limitathandlerns under the License.
14+
15+
package config
16+
17+
const (
18+
ResultDataBucketURL string = "gs://ossf-scorecard-data"
19+
RequestTopicURL = "gcppubsub://projects/openssf/topics/scorecard-batch"
20+
InputReposFile = "../projects.txt"
21+
ShardNumFilename = ".shard_num"
22+
ShardSize int = 250
23+
)

cron/cron

20.1 MB
Binary file not shown.

cron/data/blob.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2021 Security Scorecard Authors
2+
//
3+
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permisshandlerns and
13+
// limitathandlerns under the License.
14+
15+
package data
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"time"
22+
23+
"github.com/ossf/scorecard/cron/config"
24+
"gocloud.dev/blob"
25+
)
26+
27+
const (
28+
filePrefixFormat = "2006.01.02/150405/"
29+
)
30+
31+
var (
32+
errorWrite = errors.New("blobWriter.Write")
33+
errorWriteClose = errors.New("blobWriter.Close")
34+
errorOpenBucket = errors.New("blob.OpenBucket")
35+
)
36+
37+
func WriteToBlobStore(ctx context.Context, bucketURL, filename string, data []byte) error {
38+
bucket, err := blob.OpenBucket(ctx, bucketURL)
39+
if err != nil {
40+
return fmt.Errorf("%w: %v", errorOpenBucket, err)
41+
}
42+
defer bucket.Close()
43+
44+
blobWriter, err := bucket.NewWriter(ctx, filename, nil)
45+
if err != nil {
46+
return fmt.Errorf("%w", err)
47+
}
48+
if _, err = blobWriter.Write(data); err != nil {
49+
return fmt.Errorf("%w: %v", errorWrite, err)
50+
}
51+
if err := blobWriter.Close(); err != nil {
52+
return fmt.Errorf("%w: %v", errorWriteClose, err)
53+
}
54+
return nil
55+
}
56+
57+
func GetBlobFilename(filename string, datetime time.Time) string {
58+
return datetime.Format(filePrefixFormat) + filename
59+
}
60+
61+
func GetShardNumFilename(datetime time.Time) string {
62+
return GetBlobFilename(config.ShardNumFilename, datetime)
63+
}

cron/data/blob_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2021 Security Scorecard Authors
2+
//
3+
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permisshandlerns and
13+
// limitathandlerns under the License.
14+
15+
package data
16+
17+
import (
18+
"testing"
19+
"time"
20+
)
21+
22+
const (
23+
inputTimeFormat string = "2006-01-02T15:04:05"
24+
)
25+
26+
func TestGetBlobFilename(t *testing.T) {
27+
t.Parallel()
28+
testcases := []struct {
29+
name string
30+
inputTime string
31+
inputFilename string
32+
expectedFilename string
33+
}{
34+
{
35+
name: "Basic",
36+
inputTime: "2021-04-23T15:06:43",
37+
inputFilename: "file-000",
38+
expectedFilename: "2021.04.23/150643/file-000",
39+
},
40+
}
41+
for _, testcase := range testcases {
42+
testcase := testcase
43+
t.Run(testcase.name, func(t *testing.T) {
44+
t.Parallel()
45+
datetime, err := time.Parse(inputTimeFormat, testcase.inputTime)
46+
if err != nil {
47+
t.Errorf("Failed to parse testcase.inputTime: %v", err)
48+
}
49+
gotFilename := GetBlobFilename(testcase.inputFilename, datetime)
50+
if gotFilename != testcase.expectedFilename {
51+
t.Errorf("Test failed. Expected: %s, Got: %s", testcase.expectedFilename, gotFilename)
52+
}
53+
})
54+
}
55+
}

cron/data/publisher.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2021 Security Scorecard Authors
2+
//
3+
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permisshandlerns and
13+
// limitathandlerns under the License.
14+
15+
package data
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"log"
22+
"sync"
23+
"sync/atomic"
24+
25+
"gocloud.dev/pubsub"
26+
_ "gocloud.dev/pubsub/gcppubsub" // Needed to link in GCP drivers.
27+
"google.golang.org/protobuf/encoding/protojson"
28+
)
29+
30+
var (
31+
errorOpenTopic = errors.New("pubsub.OpenTopic")
32+
errorProtoJSONMarshal = errors.New("protojson.Marshal")
33+
errorPublish = errors.New("total errors when publishing")
34+
)
35+
36+
type Publisher interface {
37+
Publish(request *ScorecardBatchRequest) error
38+
Close() error
39+
}
40+
41+
func CreatePublisher(ctx context.Context, topicURL string) (Publisher, error) {
42+
ret := publisherImpl{}
43+
topic, err := pubsub.OpenTopic(ctx, topicURL)
44+
if err != nil {
45+
return &ret, fmt.Errorf("%w: %v", errorOpenTopic, err)
46+
}
47+
return &publisherImpl{
48+
ctx: ctx,
49+
topic: topic,
50+
}, nil
51+
}
52+
53+
type topicInterface interface {
54+
Send(ctx context.Context, msg *pubsub.Message) error
55+
}
56+
57+
type publisherImpl struct {
58+
ctx context.Context
59+
topic topicInterface
60+
wg sync.WaitGroup
61+
totalErrors uint64
62+
}
63+
64+
func (publisher *publisherImpl) Publish(request *ScorecardBatchRequest) error {
65+
msg, err := protojson.Marshal(request)
66+
if err != nil {
67+
return fmt.Errorf("%w: %v", errorProtoJSONMarshal, err)
68+
}
69+
70+
publisher.wg.Add(1)
71+
go func() {
72+
defer publisher.wg.Done()
73+
err := publisher.topic.Send(publisher.ctx, &pubsub.Message{
74+
Body: msg,
75+
})
76+
if err != nil {
77+
log.Printf("Error when publishing message %s: %v", msg, err)
78+
atomic.AddUint64(&publisher.totalErrors, 1)
79+
return
80+
}
81+
log.Print("Successfully published message")
82+
}()
83+
return nil
84+
}
85+
86+
func (publisher *publisherImpl) Close() error {
87+
publisher.wg.Wait()
88+
if publisher.totalErrors > 0 {
89+
return fmt.Errorf("%w: %d", errorPublish, publisher.totalErrors)
90+
}
91+
return nil
92+
}

cron/data/publisher_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2021 Security Scorecard Authors
2+
//
3+
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permisshandlerns and
13+
// limitathandlerns under the License.
14+
15+
package data
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"testing"
21+
22+
"gocloud.dev/pubsub"
23+
)
24+
25+
type mockSucceedTopic struct{}
26+
27+
func (topic *mockSucceedTopic) Send(ctx context.Context, msg *pubsub.Message) error {
28+
return nil
29+
}
30+
31+
type mockFailTopic struct{}
32+
33+
func (topic *mockFailTopic) Send(ctx context.Context, msg *pubsub.Message) error {
34+
// nolint: goerr113
35+
return fmt.Errorf("mockFailTopic failed to send")
36+
}
37+
38+
func TestPublish(t *testing.T) {
39+
t.Parallel()
40+
// nolint: govet
41+
testcases := []struct {
42+
numErrors uint64
43+
name string
44+
errorMsg string
45+
hasError bool
46+
topic topicInterface
47+
}{
48+
{
49+
name: "SendFails",
50+
topic: &mockFailTopic{},
51+
hasError: true,
52+
numErrors: 1,
53+
errorMsg: "",
54+
},
55+
{
56+
name: "SendSucceeds",
57+
topic: &mockSucceedTopic{},
58+
hasError: false,
59+
},
60+
}
61+
for _, testcase := range testcases {
62+
testcase := testcase
63+
t.Run(testcase.name, func(t *testing.T) {
64+
t.Parallel()
65+
ctx := context.Background()
66+
publisher := publisherImpl{
67+
ctx: ctx,
68+
topic: testcase.topic,
69+
}
70+
request := ScorecardBatchRequest{
71+
Repos: []string{"repo1"},
72+
}
73+
if err := publisher.Publish(&request); err != nil {
74+
t.Errorf("Failed to parse message: %v", err)
75+
}
76+
err := publisher.Close()
77+
if (err == nil) == testcase.hasError {
78+
t.Errorf("Test failed. Expected: %t got: %v", testcase.hasError, err)
79+
}
80+
if testcase.hasError && testcase.numErrors != publisher.totalErrors {
81+
t.Errorf("Test failed. Expected numErrors: %d, got: %d", testcase.numErrors, publisher.totalErrors)
82+
}
83+
})
84+
}
85+
}

cron/data/reader.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2021 Security Scorecard Authors
2+
//
3+
// Licensed under the Apache License, Vershandlern 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permisshandlerns and
13+
// limitathandlerns under the License.
14+
15+
package data
16+
17+
import (
18+
"encoding/csv"
19+
"errors"
20+
"fmt"
21+
"io"
22+
23+
"github.com/jszwec/csvutil"
24+
"github.com/ossf/scorecard/repos"
25+
)
26+
27+
var errorCSVDecoder = errors.New("could not create CSV decoder")
28+
29+
type Reader interface {
30+
HasNext() bool
31+
Next() (repos.RepoURL, error)
32+
}
33+
34+
func MakeReader(reader io.Reader) (Reader, error) {
35+
dec, err := csvutil.NewDecoder(csv.NewReader(reader))
36+
if err != nil {
37+
return &csvReader{}, fmt.Errorf("%w: %v", errorCSVDecoder, err)
38+
}
39+
return &csvReader{decoder: dec}, nil
40+
}
41+
42+
type inputRepo struct {
43+
Repo string `csv:"repo"`
44+
Metadata string `csv:"metadata"`
45+
}
46+
47+
type csvReader struct {
48+
decoder *csvutil.Decoder
49+
err error
50+
next inputRepo
51+
}
52+
53+
func (reader *csvReader) HasNext() bool {
54+
reader.err = reader.decoder.Decode(&reader.next)
55+
return reader.err != io.EOF
56+
}
57+
58+
func (reader *csvReader) Next() (repos.RepoURL, error) {
59+
if reader.err != nil {
60+
return repos.RepoURL{}, reader.err
61+
}
62+
ret := repos.RepoURL{}
63+
var err error
64+
err = ret.Set(reader.next.Repo)
65+
if err == nil {
66+
err = ret.ValidGitHubURL()
67+
}
68+
return ret, errors.Unwrap(err)
69+
}

0 commit comments

Comments
 (0)