|
| 1 | +from typing import Dict, Callable, List |
| 2 | +from . import Role |
| 3 | +from ..node import Node |
| 4 | +from konsensus.entities.data_types import Proposal |
| 5 | +from konsensus.entities.messages_types import Propose, Invoked, Welcome |
| 6 | +from konsensus.constants import LEADER_TIMEOUT |
| 7 | + |
| 8 | + |
| 9 | +class Replica(Role): |
| 10 | + """ |
| 11 | + Replica has the following roles to play |
| 12 | + - Making new proposals; |
| 13 | + - Invoking the local state machine when proposals are decided; |
| 14 | + - Tracking the current leader; and |
| 15 | + - Adding newly started nodes to the cluster. |
| 16 | + """ |
| 17 | + |
| 18 | + def __init__(self, node: Node, execute_fn: Callable, state, slot, decisions: Dict[int, Proposal], peers: List) -> None: |
| 19 | + super().__init__(node) |
| 20 | + self.execute_fn = execute_fn |
| 21 | + self.state = state |
| 22 | + self.slot = slot |
| 23 | + self.decisions = decisions |
| 24 | + self.peers = peers |
| 25 | + self.proposals: Dict[int, Proposal] = {} |
| 26 | + self.next_slot: int = slot |
| 27 | + self.latest_leader = None |
| 28 | + self.latest_leader_timeout = None |
| 29 | + |
| 30 | + # making proposals |
| 31 | + def do_invoke(self, sender, caller, client_id, input_value): |
| 32 | + proposal = Proposal(caller=caller, client_id=client_id, input=input_value) |
| 33 | + slot = next((s for s, p in self.proposals.items() if p == proposal), None) |
| 34 | + # propose or re-propose if this proposal already has a slot |
| 35 | + self.propose(proposal, slot) |
| 36 | + |
| 37 | + def propose(self, proposal: Proposal, slot=None): |
| 38 | + """Send (or resend if slot is specified) a proposal to the leader""" |
| 39 | + if not slot: |
| 40 | + slot, self.next_slot = self.next_slot, self.next_slot + 1 |
| 41 | + self.proposals[slot] = proposal |
| 42 | + # find a leader we think is working - either the latest we know of, or |
| 43 | + # ourselves(which may trigger a scout to make use the leader) |
| 44 | + leader = self.latest_leader or self.node.address |
| 45 | + self.logger.info(f"proposing {proposal} at slot {slot} to leader {leader}") |
| 46 | + self.node.send([leader], Propose(slot=slot, proposal=proposal)) |
| 47 | + |
| 48 | + # handling deciding proposals |
| 49 | + def do_decision(self, sender, slot, proposal: Proposal): |
| 50 | + assert not self.decisions.get(self.slot, None), "next slot to commit is already decided" |
| 51 | + |
| 52 | + if slot in self.decisions: |
| 53 | + assert self.decisions[slot] == Proposal, f"slot {slot} already decided with {self.decisions[slot]}" |
| 54 | + return |
| 55 | + |
| 56 | + self.decisions[slot] = proposal |
| 57 | + self.next_slot = max(self.next_slot, slot+1) |
| 58 | + |
| 59 | + # re-propose our proposal in a new slot if it lost its slot and was not a no-op |
| 60 | + our_proposal = self.proposals.get(slot) |
| 61 | + if our_proposal is not None and our_proposal != proposal and our_proposal.caller: |
| 62 | + self.propose(our_proposal) |
| 63 | + |
| 64 | + # execute any pending decided proposals |
| 65 | + while True: |
| 66 | + commit_proposal = self.decisions.get(self.slot) |
| 67 | + if not commit_proposal: |
| 68 | + break # not yet decided |
| 69 | + commit_slot, self.slot = self.slot, self.slot + 1 |
| 70 | + |
| 71 | + self.commit(commit_slot, commit_proposal) |
| 72 | + |
| 73 | + def commit(self, slot: int, proposal: Proposal): |
| 74 | + """Actually commit a proposal that is decided and in sequence""" |
| 75 | + decided_proposals = [p for s, p in self.decisions.items() if s < slot] |
| 76 | + if proposal in decided_proposals: |
| 77 | + self.logger.info(f"not committing duplicate proposal {proposal}, slot {slot}") |
| 78 | + return # duplicate |
| 79 | + |
| 80 | + self.logger.info(f"committing {proposal} at slot {slot}") |
| 81 | + if proposal.caller is not None: |
| 82 | + # perform a client operation |
| 83 | + self.state, output = self.execute_fn(self.state, proposal.input) |
| 84 | + self.node.send([proposal.caller], Invoked(client_id=proposal.client_id, output=output)) |
| 85 | + |
| 86 | + |
| 87 | + # tracking the leader |
| 88 | + |
| 89 | + def do_adopted(self, sender, ballot_num, accepted_proposals): |
| 90 | + self.latest_leader = self.node.address |
| 91 | + self.leader_alive() |
| 92 | + |
| 93 | + def do_accepting(self, sender, leader): |
| 94 | + self.latest_leader = leader |
| 95 | + self.leader_alive() |
| 96 | + |
| 97 | + def do_active(self, sender): |
| 98 | + if sender != self.latest_leader: |
| 99 | + return |
| 100 | + self.leader_alive() |
| 101 | + |
| 102 | + def leader_alive(self): |
| 103 | + if self.latest_leader_timeout: |
| 104 | + self.latest_leader_timeout.cancel() |
| 105 | + |
| 106 | + def reset_leader(): |
| 107 | + idx = self.peers.index(self.latest_leader) |
| 108 | + self.latest_leader = self.peers[(idx + 1) % len(self.peers)] |
| 109 | + self.logger.debug(f"leader timed out; trying the next one, {self.latest_leader}") |
| 110 | + |
| 111 | + self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader) |
| 112 | + |
| 113 | + # adding new cluster members |
| 114 | + |
| 115 | + def do_join(self, sender): |
| 116 | + if sender in self.peers: |
| 117 | + self.node.send([sender], Welcome(state=self.state, slot=self.slot, decisions=self.decisions)) |
0 commit comments