Skip to content

Commit 9105dd6

Browse files
committed
proxystore: stabilize order for duplicates in response losertree
Currently the order of responses in the losertree on collisions in the labelset is random. This can happen if we drop the replica label in an endpoint. In the case of sidecars the order of responses has effect on deduplication. The primary iterator is used until we find a large enough gap to failover to the replica iterator, where primary and replica is determined by the order they are returned from the proxy losertree. This can lead to slight differences if we repeat a query since different sidecars have scraped at different times possibly. Using the store labelset as tiebreaker is an attempt at stabilizing this. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 519fda5 commit 9105dd6

File tree

1 file changed

+51
-18
lines changed

1 file changed

+51
-18
lines changed

pkg/store/proxy_merge.go

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"math"
1111
"sort"
12+
"strings"
1213
"sync"
1314
"time"
1415

@@ -166,12 +167,21 @@ func (d *responseDeduplicator) At() *storepb.SeriesResponse {
166167
return d.bufferedResp[d.buffRespI]
167168
}
168169

170+
type seriesResponseWithStoreLabelset struct {
171+
labelset string
172+
response *storepb.SeriesResponse
173+
}
174+
175+
type proxyResponseLoserTree struct {
176+
tree *losertree.Tree[seriesResponseWithStoreLabelset, respSet]
177+
}
178+
169179
// NewProxyResponseLoserTree returns heap that k-way merge series together.
170-
// It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order.
171-
func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.SeriesResponse, respSet] {
172-
var maxVal *storepb.SeriesResponse = storepb.NewSeriesResponse(nil)
180+
// It's agnostic to duplicates and overlaps, it forwards all duplicated series ordered by the labelset of their endpoint.
181+
func NewProxyResponseLoserTree(seriesSets ...respSet) *proxyResponseLoserTree {
182+
var maxVal seriesResponseWithStoreLabelset = seriesResponseWithStoreLabelset{}
173183

174-
less := func(a, b *storepb.SeriesResponse) bool {
184+
less := func(a, b seriesResponseWithStoreLabelset) bool {
175185
if a == maxVal && b != maxVal {
176186
return false
177187
}
@@ -181,29 +191,52 @@ func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.S
181191
if a == maxVal && b == maxVal {
182192
return true
183193
}
184-
if a.GetSeries() != nil && b.GetSeries() != nil {
185-
iLbls := labelpb.ZLabelsToPromLabels(a.GetSeries().Labels)
186-
jLbls := labelpb.ZLabelsToPromLabels(b.GetSeries().Labels)
194+
ar, br := a.response, b.response
187195

188-
return labels.Compare(iLbls, jLbls) < 0
189-
} else if a.GetSeries() == nil && b.GetSeries() != nil {
196+
if ar.GetSeries() != nil && br.GetSeries() != nil {
197+
aLbls := labelpb.ZLabelsToPromLabels(ar.GetSeries().Labels)
198+
bLbls := labelpb.ZLabelsToPromLabels(br.GetSeries().Labels)
199+
200+
if cmp := labels.Compare(aLbls, bLbls); cmp == 0 {
201+
// use store labelset as tiebreaker for a stable order of responses and predictable deduplication
202+
return strings.Compare(a.labelset, b.labelset) < 0
203+
} else {
204+
return cmp < 0
205+
}
206+
} else if ar.GetSeries() == nil && br.GetSeries() != nil {
190207
return true
191-
} else if a.GetSeries() != nil && b.GetSeries() == nil {
208+
} else if ar.GetSeries() != nil && br.GetSeries() == nil {
192209
return false
193210
}
194211

195-
if a.GetWarning() != "" && b.GetWarning() != "" {
196-
return len(a.GetWarning()) < len(b.GetWarning())
212+
if ar.GetWarning() != "" && br.GetWarning() != "" {
213+
return len(ar.GetWarning()) < len(br.GetWarning())
197214
}
198-
199215
return false
200216
}
201217

202-
return losertree.New[*storepb.SeriesResponse, respSet](seriesSets, maxVal, func(s respSet) *storepb.SeriesResponse {
203-
return s.At()
204-
}, less, func(s respSet) {
205-
s.Close()
206-
})
218+
return &proxyResponseLoserTree{
219+
tree: losertree.New(
220+
seriesSets,
221+
maxVal,
222+
func(s respSet) seriesResponseWithStoreLabelset {
223+
return seriesResponseWithStoreLabelset{response: s.At(), labelset: s.Labelset()}
224+
},
225+
less,
226+
func(s respSet) { s.Close() },
227+
)}
228+
}
229+
230+
func (lt *proxyResponseLoserTree) Next() bool {
231+
return lt.tree.Next()
232+
}
233+
234+
func (lt *proxyResponseLoserTree) At() *storepb.SeriesResponse {
235+
return lt.tree.At().response
236+
}
237+
238+
func (lt *proxyResponseLoserTree) Close() {
239+
lt.tree.Close()
207240
}
208241

209242
func (l *lazyRespSet) StoreID() string {

0 commit comments

Comments
 (0)