Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ object Server {

// Create necessary actors for server-server communications
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, messageRegistry))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef, connectionMediatorRef))
val gossipManagerRef: ActorRef = system.actorOf(GossipManager.props(dbActorRef, monitorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, gossipManagerRef, messageRegistry))

// Setup routes
def publishSubscribeRoute: RequestContext => Future[RouteResult] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ final case class ConnectionMediator(
mediatorRef: ActorRef,
dbActorRef: AskableActorRef,
securityModuleActorRef: AskableActorRef,
gossipManagerRef: ActorRef,
messageRegistry: MessageRegistry
) extends Actor with ActorLogging with AskPatternConstants {
implicit val system: ActorSystem = ActorSystem()

private var serverMap: HashMap[ActorRef, GreetServer] = HashMap()
private var gossipManagerRef: AskableActorRef = _

// Ping Monitor to inform it of our ActorRef
monitorRef ! ConnectionMediator.Ping()
gossipManagerRef ! ConnectionMediator.Ping()

override def receive: Receive = {

Expand Down Expand Up @@ -87,31 +88,28 @@ final case class ConnectionMediator(
if (serverMap.isEmpty)
sender() ! ConnectionMediator.NoPeer()
else
val serverRefs = serverMap.filter((_, greetServer) => !excludes.contains(greetServer.publicKey))
val serverRefs = serverMap.keys.filter(!excludes.contains(_)).toList
if (serverRefs.isEmpty)
sender() ! ConnectionMediator.NoPeer()
else
val randomKey = serverRefs.keys.toList(Random.nextInt(serverRefs.size))
sender() ! ConnectionMediator.GetRandomPeerAck(randomKey, serverRefs(randomKey))

case GossipManager.Ping() =>
gossipManagerRef = sender()
val randomKey = serverRefs(Random.nextInt(serverRefs.size))
sender() ! ConnectionMediator.GetRandomPeerAck(randomKey, serverMap(randomKey))

}
}

object ConnectionMediator {

def props(monitorRef: ActorRef, mediatorRef: ActorRef, dbActorRef: AskableActorRef, securityModuleActorRef: AskableActorRef, messageRegistry: MessageRegistry): Props =
Props(new ConnectionMediator(monitorRef, mediatorRef, dbActorRef, securityModuleActorRef, messageRegistry))
def props(monitorRef: ActorRef, mediatorRef: ActorRef, dbActorRef: AskableActorRef, securityModuleActorRef: AskableActorRef, gossipManagerRef: ActorRef, messageRegistry: MessageRegistry): Props =
Props(new ConnectionMediator(monitorRef, mediatorRef, dbActorRef, securityModuleActorRef, gossipManagerRef, messageRegistry))

sealed trait Event
final case class ConnectTo(urlList: List[String]) extends Event
final case class NewServerConnected(serverRef: ActorRef, greetServer: GreetServer) extends Event
final case class ServerLeft(serverRef: ActorRef) extends Event
final case class Ping() extends Event
final case class ReadPeersClientAddress() extends Event
final case class GetRandomPeer(excludes: List[PublicKey] = List.empty) extends Event
final case class GetRandomPeer(excludes: Set[ActorRef] = Set.empty) extends Event

sealed trait ConnectionMediatorMessage
final case class ReadPeersClientAddressAck(list: List[String]) extends ConnectionMediatorMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,34 @@ package ch.epfl.pop.decentralized

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.AskableActorRef
import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Flow
import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
import ch.epfl.pop.model.objects.{Channel, PublicKey, RumorData}
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError}
import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData}
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorGetRumorStateAck, DbActorReadRumorData, GetRumorState}

import scala.concurrent.Await
import scala.util.Random

final case class GossipManager(
dbActorRef: AskableActorRef,
monitorRef: ActorRef,
connectionMediator: AskableActorRef,
stopProbability: Double = 0.5
) extends Actor with AskPatternConstants with ActorLogging {
/** This class is responsible of managing the gossiping of rumors across the network
* @param dbActorRef
* reference to the database actor
* @param stopProbability
* probability with which we stop the gossipping in case of error response
*/
final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Double = 0.5) extends Actor with AskPatternConstants with ActorLogging {

private type ServerInfos = (ActorRef, GreetServer)
private var activeGossipProtocol: Map[JsonRpcRequest, List[ServerInfos]] = Map.empty
private var activeGossipProtocol: Map[JsonRpcRequest, Set[ActorRef]] = Map.empty
private var jsonId = 0
private var rumorId = 0
private var publicKey: Option[PublicKey] = None
private var connectionMediatorRef: AskableActorRef = _

publicKey = {
val readPk = dbActorRef ? DbActor.ReadServerPublicKey()
Expand All @@ -40,48 +40,20 @@ final case class GossipManager(
None
}

rumorId =
publicKey match
case Some(pk: PublicKey) =>
val readRumorData = dbActorRef ? DbActor.ReadRumorData(pk)
Await.result(readRumorData, duration) match
case DbActorReadRumorData(foundRumorIds: RumorData) => foundRumorIds.lastRumorId() + 1
case failure => 0
case None => 0

monitorRef ! GossipManager.Ping()
connectionMediator ? GossipManager.Ping()

private def getPeersForRumor(jsonRpcRequest: JsonRpcRequest): List[ServerInfos] = {
val activeGossip = activeGossipProtocol.get(jsonRpcRequest)
activeGossip match
case Some(peersInfosList) => peersInfosList
case None => List.empty
}

private def prepareRumor(rumor: Rumor): JsonRpcRequest = {
val request = JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
MethodType.rumor,
rumor,
Some(jsonId)
)
jsonId += 1
request
}

/** Does a step of gossipping protocol for given rpc. Tries to find a random peer that hasn't already received this msg If such a peer is found, sends message and updates table accordingly. If no peer is found, ends the protocol.
* @param rumorRpc
* Rpc that must be spreac
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Rpc that must be spreac
* Rpc that must be spread

*/
private def updateGossip(rumorRpc: JsonRpcRequest): Unit = {
// checks the peers to which we already forwarded the message
val activeGossip: List[ServerInfos] = getPeersForRumor(rumorRpc)
// get senderpk to avoid sending rumor back
val senderPk: PublicKey = rumorRpc.getParams.asInstanceOf[Rumor].senderPk
val activeGossip: Set[ActorRef] = peersAlreadyReceived(rumorRpc)
// selects a random peer from remaining peers
val randomPeer = connectionMediator ? ConnectionMediator.GetRandomPeer(activeGossip.map(_._2.publicKey).appended(senderPk))
val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer(activeGossip)
Await.result(randomPeer, duration) match {
// updates the list based on response
// if some peers are available we send
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val alreadySent: List[ServerInfos] = activeGossip :+ (serverRef -> greetServer)
val alreadySent: Set[ActorRef] = activeGossip + serverRef
activeGossipProtocol += (rumorRpc -> alreadySent)
log.info(s"rumorSent > dest : ${greetServer.clientAddress}, rumor : $rumorRpc")
serverRef ! ClientAnswer(
Expand All @@ -95,15 +67,22 @@ final case class GossipManager(
}
}

private def handleRumor(request: JsonRpcRequest): Unit = {
/** When receiving a rumor that must be relayed, empacks a rumor in a new jsonRPC and tries to do a step of gossipping protocol
* @param request
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding the second parameter in the doc ;)

private def handleRumor(request: JsonRpcRequest, serverActorRef: ActorRef): Unit = {
val rcvRumor = request.getParams.asInstanceOf[Rumor]
val newRumorRequest = prepareRumor(rcvRumor)
activeGossipProtocol += (newRumorRequest -> Set(serverActorRef))
updateGossip(newRumorRequest)
}

/** Processes a response. If a response matches a active gossip protocol, uses the reponse to decide how to continue gossipping If response is Positive (Result(0)), tries to do another step of gossipping If response is Negative (Error(-3)), considers stop gossiping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Processes a response. If a response matches a active gossip protocol, uses the reponse to decide how to continue gossipping If response is Positive (Result(0)), tries to do another step of gossipping If response is Negative (Error(-3)), considers stop gossiping
/** Processes a response. If a response matches an active gossip protocol, uses the response to decide how to continue gosipping If response is Positive (Result(0)), tries to do another step of gosipping If response is Negative (Error(-3)), considers stop gossiping

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually double checked online and "gossipping" is correct

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, shouldn't have blindly trusted intellij !

* @param response
* Received response
*/
private def processResponse(response: JsonRpcResponse): Unit = {
val activeGossipPeers = activeGossipProtocol.filter((k, _) => k.id == response.id)

// response is expected because only one entry exists
if (activeGossipPeers.size == 1) {
activeGossipPeers.foreach { (rumorRpc, _) =>
Expand All @@ -122,11 +101,14 @@ final case class GossipManager(
}
}

/** When receives a new publish, empacks messages in a new rumor and starts gossipping it in the network
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** When receives a new publish, empacks messages in a new rumor and starts gossipping it in the network
/** When receives a new publish, packs messages in a new rumor and starts gossiping it in the network

* @param messages
* Messages to be gossiped by channel
*/
private def startGossip(messages: Map[Channel, List[Message]]): Unit = {
if (publicKey.isDefined)
val rumor: Rumor = Rumor(publicKey.get, rumorId, messages)
val rumor: Rumor = Rumor(publicKey.get, getRumorId(publicKey.get) + 1, messages)
val jsonRpcRequest = prepareRumor(rumor)
rumorId += 1
val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor)
Await.result(writeRumor, duration) match
case DbActorAck() => updateGossip(jsonRpcRequest)
Expand All @@ -135,61 +117,111 @@ final case class GossipManager(
log.info(s"Actor (gossip) $self will not be able to start rumors because it has no publicKey")
}

private def getRumorId(publicKey: PublicKey): Int = {
val readRumorState = dbActorRef ? GetRumorState()
Await.result(readRumorState, duration) match
case DbActorGetRumorStateAck(rumorState) =>
rumorState.state.get(publicKey) match
case Some(rumorIdInDb) => rumorIdInDb
case None => -1
}

private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = {
val activeGossip = activeGossipProtocol.get(jsonRpcRequest)
activeGossip match
case Some(peersInfosList) => peersInfosList
case None => Set.empty
}

private def prepareRumor(rumor: Rumor): JsonRpcRequest = {
val request = JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
MethodType.rumor,
rumor,
Some(jsonId)
)
jsonId += 1
request
}

override def receive: Receive = {
case GossipManager.HandleRumor(jsonRpcRequest: JsonRpcRequest) =>
handleRumor(jsonRpcRequest)
case GossipManager.HandleRumor(jsonRpcRequest: JsonRpcRequest, clientActorRef: ActorRef) =>
handleRumor(jsonRpcRequest, clientActorRef)

case GossipManager.ManageGossipResponse(jsonRpcResponse) =>
processResponse(jsonRpcResponse)

case GossipManager.StartGossip(messages) =>
startGossip(messages)

case ConnectionMediator.Ping() =>
log.info(s"Actor $self received a ping from Connection Mediator")
connectionMediatorRef = sender()

case _ =>
log.info(s"Actor $self received an unexpected message")
}

}

object GossipManager extends AskPatternConstants {
def props(dbActorRef: AskableActorRef, monitorRef: ActorRef, connectionMediatorRef: AskableActorRef): Props =
Props(new GossipManager(dbActorRef, monitorRef, connectionMediatorRef))

def gossipHandler(gossipManager: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
def props(dbActorRef: AskableActorRef, monitorRef: ActorRef): Props =
Props(new GossipManager(dbActorRef))

/** When receiving a rumor, gossip manager handles the rumor by relaying
*
* @param gossipManager
* reference to the gossip manager of the server
* @param clientActorRef
* reference to the client who sent the message.
* @return
*/
def gossipHandler(gossipManager: AskableActorRef, clientActorRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.method match
case MethodType.rumor =>
gossipManager ? HandleRumor(jsonRpcRequest)
gossipManager ? HandleRumor(jsonRpcRequest, clientActorRef)
Right(jsonRpcRequest)
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "GossipManager received a non expected jsonRpcRequest", jsonRpcRequest.id))
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "GossipManager received an unexpected message:" + graphMessage, None))
// if an error comes from previous step, we let it go through
case graphMessage @ _ => graphMessage
}

def monitorResponse(gossipManager: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
/** Monitors responses to check if one is related to a rumor we sent
* @param gossipManager
* reference to the gossip manager of the server
* @param clientActorRef
* reference to the client who sent the message.
* @return
*/
def monitorResponse(gossipManager: AskableActorRef, clientActorRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcResponse: JsonRpcResponse) =>
gossipManager ? ManageGossipResponse(jsonRpcResponse)
Right(jsonRpcResponse)
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"GossipManager received an unexpected message:$graphMessage while monitoring responses", None))
}

def startGossip(gossipManager: AskableActorRef, clientRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
/** When a server receives a publish, it starts gossiping
* @param gossipManager
* reference to the gossip manager of the server
* @param clientActorRef
* reference to the client who sent the message. If set to Actor.noSender, should no start gossiping
* @return
*/
def startGossip(gossipManager: AskableActorRef, clientActorRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.getParamsMessage match
case Some(message) =>
// Start gossiping only if message comes from a real actor (and not from processing pipeline)
if (clientRef != Actor.noSender)
if (clientActorRef != Actor.noSender)
gossipManager ? StartGossip(Map(jsonRpcRequest.getParamsChannel -> List(message)))
case None => /* Do nothing */
Right(jsonRpcRequest)
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"GossipManager received an unexpected message:$graphMessage while starting gossiping", None))
}

sealed trait Event
final case class HandleRumor(jsonRpcRequest: JsonRpcRequest)
final case class HandleRumor(jsonRpcRequest: JsonRpcRequest, clientActorRef: ActorRef)
final case class ManageGossipResponse(jsonRpcResponse: JsonRpcResponse)
final case class StartGossip(messages: Map[Channel, List[Message]])

sealed trait GossipManagerMessage
final case class Ping() extends GossipManagerMessage

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ final case class Monitor(
// Monitor is self-contained,
// To that end it doesn't know the ref of the connectionMediator
private var connectionMediatorRef = ActorRef.noSender
private var gossipManagerRef = ActorRef.noSender

private var fileMonitor: FileMonitor = _

Expand Down Expand Up @@ -90,10 +89,6 @@ final case class Monitor(
fileMonitor = new FileMonitor(this.self)
new Thread(fileMonitor).start()

case GossipManager.Ping() =>
log.info("Received GossipManager Ping")
gossipManagerRef = sender()

case msg: ConnectionMediator.ConnectTo =>
connectionMediatorRef ! msg
/* ConnectTo message is sent by FileMonitor thread, which is not an actor, we thus forward it with a real actor to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object PublishSubscribe {

val requestPartition = builder.add(validateRequests(clientActorRef, messageRegistry))

val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager))
val gossipMonitorPartition = builder.add(GossipManager.monitorResponse(gossipManager, clientActorRef))
val getMsgByIdResponsePartition = builder.add(ProcessMessagesHandler.getMsgByIdResponseHandler(messageRegistry))

// ResponseHandler messages do not go in the merger
Expand Down Expand Up @@ -156,7 +156,7 @@ object PublishSubscribe {
val getMessagesByIdPartition = builder.add(ParamsWithMapHandler.getMessagesByIdHandler(dbActorRef))
val greetServerPartition = builder.add(ParamsHandler.greetServerHandler(clientActorRef))
val rumorPartition = builder.add(ParamsHandler.rumorHandler(dbActorRef, messageRegistry))
val gossipManagerPartition = builder.add(GossipManager.gossipHandler(gossipManager))
val gossipManagerPartition = builder.add(GossipManager.gossipHandler(gossipManager, clientActorRef))
val gossipStartPartition = builder.add(GossipManager.startGossip(gossipManager, clientActorRef))
val rumorStatePartition = builder.add(ParamsHandler.rumorStateHandler(dbActorRef))

Expand All @@ -166,7 +166,7 @@ object PublishSubscribe {
input ~> jsonRpcContentValidator ~> methodPartitioner

methodPartitioner.out(portPipelineError) ~> merger
methodPartitioner.out(portParamsWithMessage) ~> gossipStartPartition ~> hasMessagePartition ~> merger
methodPartitioner.out(portParamsWithMessage) ~> hasMessagePartition ~> gossipStartPartition ~> merger
methodPartitioner.out(portSubscribe) ~> subscribePartition ~> merger
methodPartitioner.out(portUnsubscribe) ~> unsubscribePartition ~> merger
methodPartitioner.out(portCatchup) ~> catchupPartition ~> merger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package ch.epfl.pop.pubsub.graph
import akka.NotUsed
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.Flow
import ch.epfl.pop.model.network.method.{Broadcast, Catchup, GetMessagesById}
import ch.epfl.pop.model.network._
import ch.epfl.pop.model.network.method.{Broadcast, Catchup, GetMessagesById, Rumor}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's necessary

import ch.epfl.pop.model.network.*
import ch.epfl.pop.model.objects.DbActorNAckException
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
Expand Down
Loading