Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions beacon_chain/sync/sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
try:
let res =
if peer.useSyncV2():
await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
await beaconBlocksByRange_v2(peer, req.slot, req.count, 1)
else:
(await beaconBlocksByRange(peer, req.slot, req.count, req.step)).map(
(await beaconBlocksByRange(peer, req.slot, req.count, 1)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))

Expand Down
4 changes: 4 additions & 0 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ p2pProtocol BeaconSync(version = 1,
# client call that returns `seq[ref SignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
# TODO reqStep is deprecated - future versions can remove support for
# values != 1: https://github.com/ethereum/consensus-specs/pull/2856
trace "got range request", peer, startSlot,
count = reqCount, step = reqStep
if reqCount == 0'u64 or reqStep == 0'u64:
Expand Down Expand Up @@ -404,6 +406,8 @@ p2pProtocol BeaconSync(version = 1,
# client call that returns `seq[ref ForkedSignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
# TODO reqStep is deprecated - future versions can remove support for
# values != 1: https://github.com/ethereum/consensus-specs/pull/2856

trace "got range request", peer, startSlot,
count = reqCount, step = reqStep
Expand Down
31 changes: 12 additions & 19 deletions beacon_chain/sync/sync_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type
index*: uint64
slot*: Slot
count*: uint64
step*: uint64
item*: T

SyncResult*[T] = object
Expand Down Expand Up @@ -105,12 +104,11 @@ proc getShortMap*[T](req: SyncRequest[T],
break
else:
res.add('.')
slider = slider + req.step
slider = slider + 1
res

proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
slot >= req.slot and slot < req.slot + req.count * req.step and
((slot - req.slot) mod req.step == 0)
slot >= req.slot and slot < req.slot + req.count

proc cmp*[T](a, b: SyncRequest[T]): int =
cmp(uint64(a.slot), uint64(b.slot))
Expand All @@ -137,7 +135,7 @@ proc checkResponse*[T](req: SyncRequest[T],
inc(dindex)
else:
return false
slot = slot + req.step
slot = slot + 1
rindex = rindex + 1'u64

if dindex == len(data):
Expand All @@ -153,26 +151,26 @@ proc getFullMap*[T](req: SyncRequest[T],
proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
finish: Slot, t2: typedesc[T]): SyncRequest[T] =
let count = finish - start + 1'u64
SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64)
SyncRequest[T](kind: kind, slot: start, count: count)

proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, slot: Slot,
count: uint64, item: T): SyncRequest[T] =
SyncRequest[T](kind: kind, slot: slot, count: count, item: item, step: 1'u64)
SyncRequest[T](kind: kind, slot: slot, count: count, item: item)

proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
finish: Slot, item: T): SyncRequest[T] =
let count = finish - start + 1'u64
SyncRequest[T](kind: kind, slot: start, count: count, step: 1'u64, item: item)
SyncRequest[T](kind: kind, slot: start, count: count, item: item)

proc empty*[T](t: typedesc[SyncRequest], kind: SyncQueueKind,
t2: typedesc[T]): SyncRequest[T] {.inline.} =
SyncRequest[T](kind: kind, step: 0'u64, count: 0'u64)
SyncRequest[T](kind: kind, count: 0'u64)

proc setItem*[T](sr: var SyncRequest[T], item: T) =
sr.item = item

proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
(sr.step == 0'u64) and (sr.count == 0'u64)
(sr.count == 0'u64)

proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
queueKind: SyncQueueKind,
Expand Down Expand Up @@ -263,8 +261,7 @@ proc `<`*[T](a, b: SyncResult[T]): bool =
a.request.slot > b.request.slot

proc `==`*[T](a, b: SyncRequest[T]): bool =
(a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count) and
(a.step == b.step)
(a.kind == b.kind) and (a.slot == b.slot) and (a.count == b.count)

proc lastSlot*[T](req: SyncRequest[T]): Slot =
## Returns last slot for request ``req``.
Expand Down Expand Up @@ -808,12 +805,10 @@ func updateRequestForNewSafeSlot[T](sq: SyncQueue[T], sr: var SyncRequest[T]) =
# Request is only partially relevant.
let
numSlotsDone = outSlot - lowSlot
numStepsDone = (numSlotsDone + sr.step - 1) div sr.step
sr.slot += numStepsDone * sr.step
sr.count -= numStepsDone
sr.slot += numSlotsDone
sr.count -= numSlotsDone
else:
# Entire request is no longer relevant.
sr.step = 0
sr.count = 0
of SyncQueueKind.Backward:
if outSlot >= highSlot:
Expand All @@ -823,11 +818,9 @@ func updateRequestForNewSafeSlot[T](sq: SyncQueue[T], sr: var SyncRequest[T]) =
# Request is only partially relevant.
let
numSlotsDone = highSlot - outSlot
numStepsDone = (numSlotsDone + sr.step - 1) div sr.step
sr.count -= numStepsDone
sr.count -= numSlotsDone
else:
# Entire request is no longer relevant.
sr.step = 0
sr.count = 0

proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
Expand Down
79 changes: 13 additions & 66 deletions tests/test_sync_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ suite "SyncManager test suite":
r11e == r11
r11.item == p1
r11e.item == r11.item
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
r11.slot == Slot(0) and r11.count == 1'u64

template passThroughLimitsTest(kind: SyncQueueKind) =
let
Expand Down Expand Up @@ -187,7 +187,6 @@ suite "SyncManager test suite":
req1.isEmpty() == false
req1.slot == item[3][0]
req1.count == item[3][1]
req1.step == 1'u64
req2.isEmpty() == true

template twoFullRequests(kkind: SyncQueueKind) =
Expand Down Expand Up @@ -248,12 +247,12 @@ suite "SyncManager test suite":
case kkind
of SyncQueueKind.Forward:
check:
r21.slot == Slot(0) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
r21.slot == Slot(0) and r21.count == 1'u64
r22.slot == Slot(1) and r22.count == 1'u64
of SyncQueueKind.Backward:
check:
r21.slot == Slot(1) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(0) and r22.count == 1'u64 and r22.step == 1'u64
r21.slot == Slot(1) and r21.count == 1'u64
r22.slot == Slot(0) and r22.count == 1'u64

template done(b: BlockEntry) =
b.resfut.complete(Result[void, BlockError].ok())
Expand Down Expand Up @@ -957,12 +956,11 @@ suite "SyncManager test suite":
let chain2 = newSeq[ref ForkedSignedBeaconBlock]()

for counter in countdown(32'u64, 2'u64):
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter,
step: 1'u64)
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter)
let sr = SyncResult[SomeTPeer](request: req, data: chain1)
check sr.hasEndGap() == true

let req = SyncRequest[SomeTPeer](slot: Slot(1), count: 1'u64, step: 1'u64)
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: 1'u64)
let sr1 = SyncResult[SomeTPeer](request: req, data: chain1)
let sr2 = SyncResult[SomeTPeer](request: req, data: chain2)
check:
Expand All @@ -974,12 +972,11 @@ suite "SyncManager test suite":
let chain2 = newSeq[ref ForkedSignedBeaconBlock]()

for counter in countdown(32'u64, 2'u64):
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter,
step: 1'u64)
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter)
let sr = SyncResult[SomeTPeer](request: req, data: chain1)
check sr.getLastNonEmptySlot() == Slot(10)

let req = SyncRequest[SomeTPeer](slot: Slot(100), count: 1'u64, step: 1'u64)
let req = SyncRequest[SomeTPeer](slot: Slot(100), count: 1'u64)
let sr = SyncResult[SomeTPeer](request: req, data: chain2)
check sr.getLastNonEmptySlot() == Slot(100)

Expand All @@ -990,59 +987,22 @@ suite "SyncManager test suite":
while counter < req.count:
if not(req.contains(slot)):
return false
slot = slot + req.step
slot = slot + 1
counter = counter + 1'u64
return true

var req1 = SyncRequest[SomeTPeer](slot: Slot(5), count: 10'u64, step: 1'u64)
var req2 = SyncRequest[SomeTPeer](slot: Slot(1), count: 10'u64, step: 2'u64)
var req3 = SyncRequest[SomeTPeer](slot: Slot(2), count: 10'u64, step: 3'u64)
var req4 = SyncRequest[SomeTPeer](slot: Slot(3), count: 10'u64, step: 4'u64)
var req5 = SyncRequest[SomeTPeer](slot: Slot(4), count: 10'u64, step: 5'u64)
var req1 = SyncRequest[SomeTPeer](slot: Slot(5), count: 10'u64)

check:
req1.checkRange() == true
req2.checkRange() == true
req3.checkRange() == true
req4.checkRange() == true
req5.checkRange() == true

req1.contains(Slot(4)) == false
req1.contains(Slot(15)) == false

req2.contains(Slot(0)) == false
req2.contains(Slot(21)) == false
req2.contains(Slot(20)) == false

req3.contains(Slot(0)) == false
req3.contains(Slot(1)) == false
req3.contains(Slot(32)) == false
req3.contains(Slot(31)) == false
req3.contains(Slot(30)) == false

req4.contains(Slot(0)) == false
req4.contains(Slot(1)) == false
req4.contains(Slot(2)) == false
req4.contains(Slot(43)) == false
req4.contains(Slot(42)) == false
req4.contains(Slot(41)) == false
req4.contains(Slot(40)) == false

req5.contains(Slot(0)) == false
req5.contains(Slot(1)) == false
req5.contains(Slot(2)) == false
req5.contains(Slot(3)) == false
req5.contains(Slot(54)) == false
req5.contains(Slot(53)) == false
req5.contains(Slot(52)) == false
req5.contains(Slot(51)) == false
req5.contains(Slot(50)) == false

test "[SyncQueue] checkResponse() test":
let chain = createChain(Slot(10), Slot(20))
let r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64, step: 1'u64)
let r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64, step: 1'u64)
let r22 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64, step: 2'u64)
let r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64)
let r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64)

check:
checkResponse(r1, @[chain[1]]) == true
Expand All @@ -1063,19 +1023,6 @@ suite "SyncManager test suite":
checkResponse(r21, @[chain[2], chain[3]]) == false
checkResponse(r21, @[chain[3]]) == false

checkResponse(r22, @[chain[1]]) == true
checkResponse(r22, @[]) == true
checkResponse(r22, @[chain[1], chain[3]]) == true
checkResponse(r22, @[chain[3]]) == true
checkResponse(r22, @[chain[1], chain[3], chain[5]]) == false
checkResponse(r22, @[chain[0], chain[1]]) == false
checkResponse(r22, @[chain[1], chain[2]]) == false
checkResponse(r22, @[chain[2], chain[3]]) == false
checkResponse(r22, @[chain[3], chain[4]]) == false
checkResponse(r22, @[chain[4], chain[5]]) == false
checkResponse(r22, @[chain[4]]) == false
checkResponse(r22, @[chain[3], chain[1]]) == false

test "[SyncQueue#Forward] getRewindPoint() test":
let aq = newAsyncQueue[BlockEntry]()
block:
Expand Down