Skip to content

Commit c5d8af4

Browse files
authored
Merge pull request #1808 from dedis/work-be2-onsriahi-Reduce-heartbeatbuild-logic-duplication
Reduce heartbeatbuild logic duplication
2 parents 572c577 + 9396f41 commit c5d8af4

File tree

10 files changed

+278
-228
lines changed

10 files changed

+278
-228
lines changed

be2-scala/src/main/scala/ch/epfl/pop/Server.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import akka.util.Timeout
1212
import ch.epfl.pop.authentication.{GetRequestHandler, PopchaWebSocketResponseHandler}
1313
import ch.epfl.pop.config.RuntimeEnvironment
1414
import ch.epfl.pop.config.RuntimeEnvironment._
15-
import ch.epfl.pop.decentralized.{ConnectionMediator, HeartbeatGenerator, Monitor}
15+
import ch.epfl.pop.decentralized.{ConnectionMediator, Monitor}
1616
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
1717
import ch.epfl.pop.storage.{DbActor, SecurityModuleActor}
1818
import org.iq80.leveldb.Options
@@ -49,8 +49,7 @@ object Server {
4949
val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)))
5050

5151
// Create necessary actors for server-server communications
52-
val heartbeatGenRef: ActorRef = system.actorOf(HeartbeatGenerator.props(dbActorRef))
53-
val monitorRef: ActorRef = system.actorOf(Monitor.props(heartbeatGenRef))
52+
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
5453
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, messageRegistry))
5554

5655
// Setup routes

be2-scala/src/main/scala/ch/epfl/pop/decentralized/HeartbeatGenerator.scala

Lines changed: 0 additions & 74 deletions
This file was deleted.

be2-scala/src/main/scala/ch/epfl/pop/decentralized/Monitor.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,34 @@ package ch.epfl.pop.decentralized
33
import akka.NotUsed
44
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
55
import akka.event.LoggingReceive
6+
import akka.pattern.{AskableActorRef, ask}
67
import akka.stream.scaladsl.Sink
78
import ch.epfl.pop.config.RuntimeEnvironment.{readServerPeers, serverPeersListPath}
89
import ch.epfl.pop.decentralized.Monitor.TriggerHeartbeat
910
import ch.epfl.pop.model.network.JsonRpcRequest
10-
import ch.epfl.pop.model.network.method.ParamsWithMap
11+
import ch.epfl.pop.model.network.method.{Heartbeat, ParamsWithMap}
12+
import ch.epfl.pop.model.objects.{Channel, Hash}
13+
import ch.epfl.pop.pubsub.AskPatternConstants
1114
import ch.epfl.pop.pubsub.graph.GraphMessage
15+
import ch.epfl.pop.storage.DbActor
1216

17+
import java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY
1318
import java.nio.file.{Path, WatchService}
14-
import java.nio.file.StandardWatchEventKinds.{ENTRY_CREATE, ENTRY_MODIFY}
19+
import scala.collection.immutable.HashMap
20+
import scala.concurrent.Await
1521
import scala.concurrent.duration.{DurationInt, FiniteDuration}
1622
import scala.jdk.CollectionConverters.CollectionHasAsScala
23+
import scala.util.Success
1724

1825
//This actor is tasked with scheduling heartbeats.
19-
// To that end it sees every messages the system receives.
26+
// To that end it sees every message the system receives.
2027
// When a message is seen it schedule a heartbeat in the next heartbeatRate seconds.
2128
// Periodic heartbeats are sent with a period of messageDelay seconds.
2229
final case class Monitor(
23-
heartbeatGenRef: ActorRef,
30+
dbActorRef: AskableActorRef,
2431
heartbeatRate: FiniteDuration,
2532
messageDelay: FiniteDuration
26-
) extends Actor with ActorLogging with Timers {
33+
) extends Actor with ActorLogging with Timers with AskPatternConstants() {
2734

2835
// These keys are used to keep track of the timers states
2936
private val periodicHbKey = 0
@@ -51,7 +58,15 @@ final case class Monitor(
5158
case Monitor.TriggerHeartbeat =>
5259
log.info("triggering a heartbeat")
5360
timers.cancel(singleHbKey)
54-
heartbeatGenRef ! Monitor.GenerateAndSendHeartbeat(connectionMediatorRef)
61+
62+
val askForHeartbeat = dbActorRef ? DbActor.GenerateHeartbeat()
63+
val heartbeat: HashMap[Channel, Set[Hash]] =
64+
Await.ready(askForHeartbeat, duration).value.get match
65+
case Success(DbActor.DbActorGenerateHeartbeatAck(map)) => map
66+
case _ => HashMap.empty[Channel, Set[Hash]] // Handle anything else
67+
68+
if (heartbeat.nonEmpty)
69+
connectionMediatorRef ! Heartbeat(heartbeat)
5570

5671
case Right(jsonRpcMessage: JsonRpcRequest) =>
5772
jsonRpcMessage.getParams match {
@@ -74,8 +89,8 @@ final case class Monitor(
7489
}
7590

7691
object Monitor {
77-
def props(heartbeatGenRef: ActorRef, heartbeatRate: FiniteDuration = 15.seconds, messageDelay: FiniteDuration = 1.seconds): Props =
78-
Props(new Monitor(heartbeatGenRef, heartbeatRate, messageDelay))
92+
def props(dbActorRef: AskableActorRef, heartbeatRate: FiniteDuration = 15.seconds, messageDelay: FiniteDuration = 1.seconds): Props =
93+
Props(new Monitor(dbActorRef, heartbeatRate, messageDelay))
7994

8095
def sink(monitorRef: ActorRef): Sink[GraphMessage, NotUsed] = {
8196
Sink.actorRef(
@@ -90,7 +105,6 @@ object Monitor {
90105
sealed trait Event
91106
final case class AtLeastOneServerConnected() extends Event
92107
final case class NoServerConnected() extends Event
93-
final case class GenerateAndSendHeartbeat(connectionMediatorRef: ActorRef) extends Event
94108
final case class TriggerHeartbeat() extends Event
95109
private final case class DoNothing() extends Event
96110
}

be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ParamsWithMapHandler.scala

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,41 +24,22 @@ object ParamsWithMapHandler extends AskPatternConstants {
2424
/** first step is to retrieve the received heartbeat from the jsonRpcRequest */
2525
val receivedHeartBeat: Map[Channel, Set[Hash]] = jsonRpcMessage.getParams.asInstanceOf[Heartbeat].channelsToMessageIds
2626

27-
/** second step is to retrieve the local set of channels */
28-
var setOfChannels: Set[Channel] = Set()
29-
val ask = dbActorRef ? DbActor.GetAllChannels()
30-
Await.ready(ask, duration).value match {
31-
case Some(Success(DbActor.DbActorGetAllChannelsAck(channels))) =>
32-
setOfChannels = channels
33-
case Some(Failure(ex: DbActorNAckException)) =>
34-
Left(PipelineError(ex.code, s"couldn't retrieve local set of channels", jsonRpcMessage.getId))
27+
/** finally, we only keep from the received heartbeat the message ids that are not contained in the locally extracted heartbeat. */
28+
val ask = dbActorRef ? DbActor.GenerateHeartbeat()
29+
Await.ready(ask, duration).value.get match
30+
case Success(DbActor.DbActorGenerateHeartbeatAck(map)) =>
31+
var missingIdsMap: HashMap[Channel, Set[Hash]] = HashMap()
32+
receivedHeartBeat.keys.foreach(channel => {
33+
val missingIdsSet = receivedHeartBeat(channel).diff(map.getOrElse(channel, Set.empty))
34+
if (missingIdsSet.nonEmpty)
35+
missingIdsMap += (channel -> missingIdsSet)
36+
})
37+
Right(JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, MethodType.get_messages_by_id, GetMessagesById(missingIdsMap), Some(0)))
38+
39+
case Failure(ex: DbActorNAckException) =>
40+
Left(PipelineError(ex.code, s"couldn't retrieve localHeartBeat", jsonRpcMessage.getId))
3541
case reply =>
3642
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"heartbeatHandler failed : unexpected DbActor reply '$reply'", jsonRpcMessage.getId))
37-
}
38-
39-
/** third step is to ask the DB for the content of each channel in terms of message ids. */
40-
val localHeartBeat: mutable.HashMap[Channel, Set[Hash]] = mutable.HashMap()
41-
setOfChannels.foreach(channel => {
42-
val ask = dbActorRef ? DbActor.ReadChannelData(channel)
43-
Await.ready(ask, duration).value match {
44-
case Some(Success(DbActor.DbActorReadChannelDataAck(channelData))) =>
45-
val setOfIds = channelData.messages.toSet
46-
localHeartBeat += (channel -> setOfIds)
47-
case Some(Failure(ex: DbActorNAckException)) =>
48-
Left(PipelineError(ex.code, s"couldn't readChannelData for local heartbeat", jsonRpcMessage.getId))
49-
case reply =>
50-
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"heartbeatHandler failed : unexpected DbActor reply '$reply'", jsonRpcMessage.getId))
51-
}
52-
})
53-
54-
/** finally, we only keep from the received heartbeat the message ids that are not contained in the locally extracted heartbeat. */
55-
var missingIdsMap: HashMap[Channel, Set[Hash]] = HashMap()
56-
receivedHeartBeat.keys.foreach(channel => {
57-
val missingIdsSet = receivedHeartBeat(channel).diff(localHeartBeat.getOrElse(channel, Set.empty))
58-
if (missingIdsSet.nonEmpty)
59-
missingIdsMap += (channel -> missingIdsSet)
60-
})
61-
Right(JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, MethodType.get_messages_by_id, GetMessagesById(missingIdsMap), Some(0)))
6243

6344
case Right(jsonRpcMessage: JsonRpcResponse) =>
6445
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "HeartbeatHandler received a 'JsonRpcResponse'", jsonRpcMessage.id))

be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
99
import ch.epfl.pop.model.network.method.message.Message
1010
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
1111
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
12+
import ch.epfl.pop.model.objects.*
1213
import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX}
13-
import ch.epfl.pop.model.objects._
1414
import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout
1515
import ch.epfl.pop.pubsub.graph.{ErrorCodes, JsonString}
1616
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
17-
import ch.epfl.pop.storage.DbActor._
17+
import ch.epfl.pop.storage.DbActor.*
1818
import com.google.crypto.tink.subtle.Ed25519Sign
1919

2020
import java.util.concurrent.TimeUnit
21+
import scala.collection.immutable.HashMap
2122
import scala.concurrent.Await
2223
import scala.concurrent.duration.{Duration, FiniteDuration}
2324
import scala.util.{Failure, Success, Try}
@@ -130,7 +131,7 @@ final case class DbActor(
130131

131132
@throws[DbActorNAckException]
132133
private def readElectionData(laoId: Hash, electionId: Hash): ElectionData = {
133-
Try(storage.read(storage.DATA_KEY + s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/private/${electionId.toString}")) match {
134+
Try(storage.read(storage.DATA_KEY + s"$ROOT_CHANNEL_PREFIX${laoId.toString}/private/${electionId.toString}")) match {
134135
case Success(Some(json)) => ElectionData.buildFromJson(json)
135136
case Success(None) => throw DbActorNAckException(ErrorCodes.SERVER_ERROR.id, s"ElectionData for election $electionId not in the database")
136137
case Failure(ex) => throw ex
@@ -243,7 +244,7 @@ final case class DbActor(
243244

244245
@throws[DbActorNAckException]
245246
private def createElectionData(laoId: Hash, electionId: Hash, keyPair: KeyPair): Unit = {
246-
val channel = Channel(s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/private/${electionId.toString}")
247+
val channel = Channel(s"$ROOT_CHANNEL_PREFIX${laoId.toString}/private/${electionId.toString}")
247248
if (!checkChannelExistence(channel)) {
248249
val pair = (storage.DATA_KEY + channel.toString) -> ElectionData(electionId, keyPair).toJsonString
249250
storage.write(pair)
@@ -299,7 +300,7 @@ final case class DbActor(
299300
@throws[DbActorNAckException]
300301
private def generateLaoDataKey(channel: Channel): String = {
301302
channel.decodeChannelLaoId match {
302-
case Some(data) => storage.DATA_KEY + s"${Channel.ROOT_CHANNEL_PREFIX}$data${LAO_DATA_LOCATION}"
303+
case Some(data) => storage.DATA_KEY + s"${Channel.ROOT_CHANNEL_PREFIX}$data$LAO_DATA_LOCATION"
303304
case None =>
304305
log.error(s"Actor $self (db) encountered a problem while decoding LAO channel from '$channel'")
305306
throw DbActorNAckException(ErrorCodes.SERVER_ERROR.id, s"Could not extract the LAO id for channel $channel")
@@ -308,7 +309,7 @@ final case class DbActor(
308309

309310
// generates the key of the RollCallData to store in the database
310311
private def generateRollCallDataKey(laoId: Hash): String = {
311-
storage.DATA_KEY + s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/rollcall"
312+
storage.DATA_KEY + s"$ROOT_CHANNEL_PREFIX${laoId.toString}/rollcall"
312313
}
313314

314315
@throws[DbActorNAckException]
@@ -369,6 +370,20 @@ final case class DbActor(
369370
(publicKey, privateKey)
370371
}
371372

373+
@throws[DbActorNAckException]
374+
private def generateHeartbeat(): HashMap[Channel, Set[Hash]] = {
375+
val setOfChannels = getAllChannels
376+
if (setOfChannels.isEmpty) return HashMap()
377+
val heartbeatMap: HashMap[Channel, Set[Hash]] = setOfChannels.foldLeft(HashMap.empty[Channel, Set[Hash]]) {
378+
(acc, channel) =>
379+
readChannelData(channel).messages.toSet match {
380+
case setOfIds if setOfIds.nonEmpty => acc + (channel -> setOfIds)
381+
case _ => acc
382+
}
383+
}
384+
heartbeatMap
385+
}
386+
372387
override def receive: Receive = LoggingReceive {
373388
case Write(channel, message) =>
374389
log.info(s"Actor $self (db) received a WRITE request on channel '$channel'")
@@ -543,6 +558,13 @@ final case class DbActor(
543558
case failure => sender() ! failure.recover(Status.Failure(_))
544559
}
545560

561+
case GenerateHeartbeat() =>
562+
log.info(s"Actor $self (db) received a GenerateHeartbeat request")
563+
Try(generateHeartbeat()) match {
564+
case Success(heartbeat) => sender() ! DbActorGenerateHeartbeatAck(heartbeat)
565+
case failure => sender() ! failure.recover(Status.Failure(_))
566+
}
567+
546568
case m =>
547569
log.info(s"Actor $self (db) received an unknown message")
548570
sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize"))
@@ -758,6 +780,9 @@ object DbActor {
758780
*/
759781
final case class ReadServerPrivateKey() extends Event
760782

783+
/** Request to generate a local heartbeat */
784+
final case class GenerateHeartbeat() extends Event
785+
761786
// DbActor DbActorMessage correspond to messages the actor may emit
762787
sealed trait DbActorMessage
763788

@@ -832,6 +857,13 @@ object DbActor {
832857
*/
833858
final case class DbActorReadServerPrivateKeyAck(privateKey: PrivateKey) extends DbActorMessage
834859

860+
/** Response for a [[GenerateHeartbeat]] db request Receiving [[DbActorGenerateHeartbeatAck]] works as an acknowledgement that the request was successful
861+
*
862+
* @param heartbeatMap
863+
* requested heartbeat as a map from the channels to message ids
864+
*/
865+
final case class DbActorGenerateHeartbeatAck(heartbeatMap: HashMap[Channel, Set[Hash]]) extends DbActorMessage
866+
835867
/** Response for a general db actor ACK
836868
*/
837869
final case class DbActorAck() extends DbActorMessage
Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package ch.epfl.pop.decentralized
22

33
import akka.actor.Actor
4-
import ch.epfl.pop.model.objects.{Channel, DbActorNAckException}
5-
import ch.epfl.pop.pubsub.graph.ErrorCodes.SERVER_ERROR
6-
import ch.epfl.pop.storage.DbActor
4+
import ch.epfl.pop.model.objects.DbActorNAckException
75

86
class FailingToyDbActor extends Actor {
97
override def receive: Receive = {
10-
case _ => {
8+
case _ =>
119
sender() ! DbActorNAckException(0, "")
12-
}
1310
}
1411

1512
}

0 commit comments

Comments
 (0)