Skip to content

Commit 8687b0d

Browse files
Fair queue for traffic (#34)
* Fair(er) queue for traffic * Fix copyright notice * Fix function name * Additional tweaks
1 parent cfde1f8 commit 8687b0d

File tree

4 files changed

+115
-3
lines changed

4 files changed

+115
-3
lines changed

router/packetconn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (r *Router) newLocalPeer() *peer {
3434
zone: "local",
3535
peertype: 0,
3636
public: r.public,
37-
traffic: newFIFOQueue(trafficBuffer),
37+
traffic: newFairFIFOQueue(trafficBuffer),
3838
}
3939
return peer
4040
}

router/peer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type peer struct {
6161
keepalives bool // Not mutated after peer setup.
6262
started atomic.Bool // Thread-safe toggle for marking a peer as down.
6363
proto *fifoQueue // Thread-safe queue for outbound protocol messages.
64-
traffic *fifoQueue // Thread-safe queue for outbound traffic messages.
64+
traffic *fairFIFOQueue // Thread-safe queue for outbound traffic messages.
6565
}
6666

6767
func (p *peer) String() string { // to make sim less ugly

router/queuefairfifo.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2022 The Matrix.org Foundation C.I.C.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package router
16+
17+
import (
18+
"sync"
19+
20+
"github.com/matrix-org/pinecone/types"
21+
)
22+
23+
const fairFIFOQueueCount = 32
24+
const fairFIFOQueueSize = trafficBuffer
25+
26+
type fairFIFOQueue struct {
27+
queues map[uint8]chan *types.Frame // queue ID -> frame, map for randomness
28+
count int // how many queued items in total?
29+
mutex sync.Mutex
30+
}
31+
32+
func newFairFIFOQueue(_ int) *fairFIFOQueue {
33+
q := &fairFIFOQueue{}
34+
q.reset()
35+
return q
36+
}
37+
38+
func (q *fairFIFOQueue) queuecount() int { // nolint:unused
39+
q.mutex.Lock()
40+
defer q.mutex.Unlock()
41+
return q.count
42+
}
43+
44+
func (q *fairFIFOQueue) queuesize() int { // nolint:unused
45+
return fairFIFOQueueCount * fairFIFOQueueSize
46+
}
47+
48+
func (q *fairFIFOQueue) hash(frame *types.Frame) uint8 {
49+
var h uint64
50+
for _, v := range frame.SourceKey {
51+
h += uint64(v)
52+
}
53+
return uint8(h) % fairFIFOQueueCount
54+
}
55+
56+
func (q *fairFIFOQueue) push(frame *types.Frame) bool {
57+
q.mutex.Lock()
58+
defer q.mutex.Unlock()
59+
var h uint8
60+
if q.count > 0 {
61+
h = q.hash(frame) + 1
62+
}
63+
select {
64+
case q.queues[h] <- frame:
65+
q.count++
66+
return true
67+
default:
68+
return false
69+
}
70+
}
71+
72+
func (q *fairFIFOQueue) reset() {
73+
q.mutex.Lock()
74+
defer q.mutex.Unlock()
75+
q.queues = make(map[uint8]chan *types.Frame, fairFIFOQueueCount+1)
76+
for i := uint8(0); i <= fairFIFOQueueCount; i++ {
77+
q.queues[i] = make(chan *types.Frame, fairFIFOQueueSize)
78+
}
79+
}
80+
81+
func (q *fairFIFOQueue) pop() <-chan *types.Frame {
82+
q.mutex.Lock()
83+
defer q.mutex.Unlock()
84+
switch {
85+
case q.count == 0:
86+
// Nothing has been queued yet — whatever gets queued up
87+
// next will always be in queue 0, so it makes sense to
88+
// return the channel for that queue for now.
89+
fallthrough
90+
case len(q.queues[0]) > 0:
91+
// There's something in queue 0 waiting to be sent.
92+
return q.queues[0]
93+
default:
94+
// Select a random queue that has something waiting.
95+
for i, queue := range q.queues {
96+
if i == 0 {
97+
continue
98+
}
99+
if len(queue) > 0 {
100+
return queue
101+
}
102+
}
103+
}
104+
// We shouldn't ever arrive here.
105+
panic("invalid queue state")
106+
}
107+
108+
func (q *fairFIFOQueue) ack() {
109+
q.mutex.Lock()
110+
defer q.mutex.Unlock()
111+
q.count--
112+
}

router/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *state) _addPeer(conn net.Conn, public types.PublicKey, uri ConnectionUR
126126
context: ctx,
127127
cancel: cancel,
128128
proto: newFIFOQueue(fifoNoMax),
129-
traffic: newFIFOQueue(trafficBuffer),
129+
traffic: newFairFIFOQueue(trafficBuffer),
130130
}
131131
s._peers[i] = new
132132
s.r.log.Println("Connected to peer", new.public.String(), "on port", new.port)

0 commit comments

Comments
 (0)