Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions relay/limiter/chunk_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func (l *ChunkRateLimiter) FinishGetChunkOperation(requesterID string) {
}

// RequestGetChunkBandwidth should be called when a GetChunk is about to start downloading chunk data.
func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID string, bytes int) error {
func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID string, bytes uint32) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

// no lock needed here, as the bandwidth limiters themselves are thread-safe

allowed := l.globalBandwidthLimiter.AllowN(now, bytes)
allowed := l.globalBandwidthLimiter.AllowN(now, int(bytes))
if !allowed {
if l.relayMetrics != nil {
l.relayMetrics.ReportChunkRateLimited("global bandwidth")
Expand All @@ -175,9 +175,9 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s
if !ok {
return fmt.Errorf("internal error, unable to find bandwidth limiter for client ID %s", requesterID)
}
allowed = limiter.AllowN(now, bytes)
allowed = limiter.AllowN(now, int(bytes))
if !allowed {
l.globalBandwidthLimiter.AllowN(now, -bytes)
l.globalBandwidthLimiter.AllowN(now, -int(bytes))
if l.relayMetrics != nil {
l.relayMetrics.ReportChunkRateLimited("client bandwidth")
}
Expand Down
14 changes: 7 additions & 7 deletions relay/limiter/chunk_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func TestGetChunksBandwidthLimit(t *testing.T) {
// Without advancing time, we should be able to utilize a number of bytes equal to the burstiness limit.
bytesRemaining := config.GetChunkBytesBurstiness
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
bytesToRequest := uint32(1 + rand.Intn(bytesRemaining))
err = limiter.RequestGetChunkBandwidth(now, userID, bytesToRequest)
require.NoError(t, err)
bytesRemaining -= bytesToRequest
bytesRemaining -= int(bytesToRequest)
}

// Requesting one more byte should fail due to the bandwidth limit
Expand All @@ -148,7 +148,7 @@ func TestGetChunksBandwidthLimit(t *testing.T) {
bytesRemaining = int(config.MaxGetChunkBytesPerSecond)
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
err = limiter.RequestGetChunkBandwidth(now, userID, bytesToRequest)
err = limiter.RequestGetChunkBandwidth(now, userID, uint32(bytesToRequest))
require.NoError(t, err)
bytesRemaining -= bytesToRequest
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestBandwidthLimitPerClient(t *testing.T) {
bytesRemaining := config.GetChunkBytesBurstinessClient
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
err = limiter.RequestGetChunkBandwidth(now, userID1, bytesToRequest)
err = limiter.RequestGetChunkBandwidth(now, userID1, uint32(bytesToRequest))
require.NoError(t, err)
bytesRemaining -= bytesToRequest
}
Expand All @@ -306,7 +306,7 @@ func TestBandwidthLimitPerClient(t *testing.T) {
bytesRemaining = config.GetChunkBytesBurstinessClient
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
err = limiter.RequestGetChunkBandwidth(now, userID2, bytesToRequest)
err = limiter.RequestGetChunkBandwidth(now, userID2, uint32(bytesToRequest))
require.NoError(t, err)
bytesRemaining -= bytesToRequest
}
Expand All @@ -320,9 +320,9 @@ func TestBandwidthLimitPerClient(t *testing.T) {
bytesRemaining = int(config.MaxGetChunkBytesPerSecondClient)
for bytesRemaining > 0 {
bytesToRequest := 1 + rand.Intn(bytesRemaining)
err = limiter.RequestGetChunkBandwidth(now, userID1, bytesToRequest)
err = limiter.RequestGetChunkBandwidth(now, userID1, uint32(bytesToRequest))
require.NoError(t, err)
err = limiter.RequestGetChunkBandwidth(now, userID2, bytesToRequest)
err = limiter.RequestGetChunkBandwidth(now, userID2, uint32(bytesToRequest))
require.NoError(t, err)
bytesRemaining -= bytesToRequest
}
Expand Down
4 changes: 2 additions & 2 deletions relay/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ func (m *RelayMetrics) ReportChunkKeyCount(count int) {
m.getChunksKeyCount.WithLabelValues().Set(float64(count))
}

func (m *RelayMetrics) ReportGetChunksBandwidthUsage(size int) {
func (m *RelayMetrics) ReportGetChunksBandwidthUsage(size uint32) {
m.getChunksBandwidth.WithLabelValues().Add(float64(size))
}

func (m *RelayMetrics) ReportGetChunksRequestedBandwidthUsage(size int) {
func (m *RelayMetrics) ReportGetChunksRequestedBandwidthUsage(size uint32) {
m.getChunksRequestedBandwidth.WithLabelValues().Add(float64(size))
}

Expand Down
21 changes: 14 additions & 7 deletions relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,28 +389,35 @@ func gatherChunkDataToSend(
}

// computeChunkRequestRequiredBandwidth computes the bandwidth required to fulfill a GetChunks request.
func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap metadataMap) (int, error) {
requiredBandwidth := 0
func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap metadataMap) (uint32, error) {
requiredBandwidth := uint32(0)
for _, req := range request.ChunkRequests {
var metadata *blobMetadata
var key v2.BlobKey
var requestedChunks int
var requestedChunks uint32

if req.GetByIndex() != nil {
key = v2.BlobKey(req.GetByIndex().GetBlobKey())
metadata = mMap[key]
requestedChunks = len(req.GetByIndex().ChunkIndices)
requestedChunks = uint32(len(req.GetByIndex().ChunkIndices))
} else {
key = v2.BlobKey(req.GetByRange().GetBlobKey())
metadata = mMap[key]
requestedChunks = int(req.GetByRange().EndIndex - req.GetByRange().StartIndex)

if req.GetByRange().EndIndex < req.GetByRange().StartIndex {
return 0, fmt.Errorf(
"chunk range %d-%d is invalid for key %s, start index must be less than or equal to end index",
req.GetByRange().StartIndex, req.GetByRange().EndIndex, key.Hex())
}

requestedChunks = req.GetByRange().EndIndex - req.GetByRange().StartIndex
}

if metadata == nil {
return 0, fmt.Errorf("metadata not found for key %s", key.Hex())
}

requiredBandwidth += requestedChunks * int(metadata.chunkSizeBytes)
requiredBandwidth += requestedChunks * metadata.chunkSizeBytes
}

return requiredBandwidth, nil
Expand All @@ -420,7 +427,7 @@ func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap met
// bandwidth to serve a GetChunks() request.
func buildInsufficientGetChunksBandwidthError(
request *pb.GetChunksRequest,
requiredBandwidth int,
requiredBandwidth uint32,
originalError error) error {

chunkCount := 0
Expand Down
Loading