Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be1-go/internal/handler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
FederationActionInit = "init"
FederationActionExpect = "expect"
FederationActionResult = "result"
FederationActionTokensExchange = "tokens_exchange"

ChirpObject = "chirp"
ChirpActionAdd = "add"
Expand Down
182 changes: 124 additions & 58 deletions be1-go/internal/handler/channel/federation/hfederation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"popstellar/internal/handler/jsonrpc/mjsonrpc"
"popstellar/internal/handler/message/mmessage"
"popstellar/internal/handler/method/publish/mpublish"
"popstellar/internal/handler/method/subscribe/msubscribe"
"popstellar/internal/handler/query/mquery"
"popstellar/internal/network/socket"
"popstellar/internal/validation"
Expand All @@ -44,7 +43,18 @@ type Subscribers interface {
SendToAll(buf []byte, channel string) error
}

type Sockets interface {
Upsert(socket socket.Socket)
}

type Config interface {
GetServerInfo() (string, string, string, error)
}

type Repository interface {
// HasMessage returns true if the message already exists.
HasMessage(messageID string) (bool, error)

// GetOrganizerPubKey returns the organizer public key of a LAO.
GetOrganizerPubKey(laoID string) (kyber.Point, error)

Expand All @@ -70,24 +80,45 @@ type Repository interface {
}

type Handler struct {
hub Hub
subs Subscribers
db Repository
schema *validation.SchemaValidator
log zerolog.Logger
hub Hub
subs Subscribers
sockets Sockets
conf Config
db Repository
schema *validation.SchemaValidator
log zerolog.Logger
}

func New(hub Hub, subs Subscribers, db Repository, schema *validation.SchemaValidator, log zerolog.Logger) *Handler {
func New(hub Hub, subs Subscribers, sockets Sockets, conf Config,
db Repository, schema *validation.SchemaValidator,
log zerolog.Logger) *Handler {
return &Handler{
hub: hub,
subs: subs,
db: db,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
hub: hub,
subs: subs,
sockets: sockets,
conf: conf,
db: db,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
}
}

func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
func (h *Handler) Handle(channelPath string, msg mmessage.Message,
socket socket.Socket) error {
err := msg.VerifyMessage()
if err != nil {
return err
}

alreadyExist, err := h.db.HasMessage(msg.MessageID)
if err != nil {
return err
}

if alreadyExist {
return nil
}

jsonData, err := base64.URLEncoding.DecodeString(msg.Data)
if err != nil {
return errors.NewInvalidMessageFieldError("failed to decode message data: %v", err)
Expand Down Expand Up @@ -115,9 +146,11 @@ func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
case channel.FederationActionExpect:
err = h.handleExpect(msg, channelPath)
case channel.FederationActionChallenge:
err = h.handleChallenge(msg, channelPath)
err = h.handleChallenge(msg, channelPath, socket)
case channel.FederationActionResult:
err = h.handleResult(msg, channelPath)
case channel.FederationActionTokensExchange:
err = h.handleTokensExchange(msg, channelPath)
default:
err = errors.NewInvalidMessageFieldError("failed to Handle %s#%s, invalid object#action", object, action)
}
Expand Down Expand Up @@ -192,6 +225,11 @@ func (h *Handler) handleExpect(msg mmessage.Message, channelPath string) error {
return err
}

err = federationExpect.ChallengeMsg.VerifyMessage()
if err != nil {
return err
}

var challenge mfederation.FederationChallenge
err = federationExpect.ChallengeMsg.UnmarshalData(&challenge)
if err != nil {
Expand All @@ -213,9 +251,6 @@ func (h *Handler) handleExpect(msg mmessage.Message, channelPath string) error {
return err
}

remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
_ = h.subs.AddChannel(remoteChannel)

return h.db.StoreMessageAndData(channelPath, msg)
}

Expand Down Expand Up @@ -257,45 +292,29 @@ func (h *Handler) handleInit(msg mmessage.Message, channelPath string) error {
return err
}

remote, err := h.connectTo(federationInit.ServerAddress)
if err != nil {
return err
}

//Force the remote server to be subscribed to /root/<remote_lao>/federation
remoteChannel := fmt.Sprintf(channelPattern, federationInit.LaoId)
_ = h.subs.AddChannel(remoteChannel)
err = h.subs.Subscribe(remoteChannel, remote)
if err != nil {
return err
if h.isOnSameServer(federationInit.ServerAddress) {
// In the edge case where the two LAOs are on the same server,
// there is no need to create a websocket connection to the other
// server and message from one "server" to the "other" could be
// directly handled.
// In that case the ack result of the federation_init will be sent
// only after any federation_result sent when handling the challenge.
_ = h.handleChallenge(federationInit.ChallengeMsg, remoteChannel, nil)
return nil
}

subscribeMsg := msubscribe.Subscribe{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: "subscribe",
},
Params: msubscribe.SubscribeParams{Channel: channelPath},
}

subscribeBytes, err := json.Marshal(subscribeMsg)
if err != nil {
return errors.NewJsonMarshalError(err.Error())
}

// Subscribe to /root/<local_lao>/federation on the remote server
err = h.subs.SendToAll(subscribeBytes, remoteChannel)
remote, err := h.connectTo(federationInit.ServerAddress)
if err != nil {
return err
}

// send the challenge to a channelPath where the remote server is subscribed to
return h.publishTo(federationInit.ChallengeMsg, remoteChannel)
// send the challenge to the remote channelPath on the remote socket
return h.publishTo(federationInit.ChallengeMsg, remoteChannel, remote)
}

func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string) error {
func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string,
socket socket.Socket) error {
var federationChallenge mfederation.FederationChallenge
err := msg.UnmarshalData(&federationChallenge)
if err != nil {
Expand Down Expand Up @@ -340,13 +359,26 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string) erro
return err
}

// publish the FederationResult to the other server
remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
err = h.publishTo(resultMsg, remoteChannel)
if err != nil {
return err
if h.isOnSameServer(federationExpect.ServerAddress) || socket == nil {
// In the edge case where the two LAOs are on the same server, the
// result message would already be stored and handleResult will not be
// called => broadcast the result to both federation channels directly.
_ = h.db.StoreMessageAndData(remoteChannel, resultMsg)
_ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel)
} else {
// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}

// Add the socket to the list of server sockets
h.sockets.Upsert(socket)
}

h.log.Info().Msgf("A federation was successfully")

// broadcast the FederationResult to the local organizer
return h.subs.BroadcastToAllClients(resultMsg, channelPath)
}
Expand Down Expand Up @@ -405,6 +437,26 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {
return h.subs.BroadcastToAllClients(msg, channelPath)
}

func (h *Handler) handleTokensExchange(msg mmessage.Message, channelPath string) error {
var tokensExchange mfederation.FederationTokensExchange
err := msg.UnmarshalData(&tokensExchange)
if err != nil {
return err
}

err = h.verifyLocalOrganizer(msg, channelPath)
if err != nil {
return err
}

err = h.db.StoreMessageAndData(channelPath, msg)
if err != nil {
return err
}

return h.subs.BroadcastToAllClients(msg, channelPath)
}

func (h *Handler) getOrganizerPk(federationChannel string) (string, error) {
laoChannel := strings.TrimSuffix(federationChannel, "/federation")

Expand Down Expand Up @@ -448,6 +500,17 @@ func (h *Handler) verifyLocalOrganizer(msg mmessage.Message, channelPath string)
return nil
}

func (h *Handler) isOnSameServer(address string) bool {
_, clientServerAddress, serverServerAddress, _ := h.conf.GetServerInfo()

isSameAddress := address == clientServerAddress || address == serverServerAddress

h.log.Info().Msgf("isOnSameServer=%v, remote=%s, client=%s, server=%s",
isSameAddress, address, clientServerAddress, serverServerAddress)

return isSameAddress
}

func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
ws, _, err := websocket.DefaultDialer.Dial(serverAddress, nil)
if err != nil {
Expand All @@ -459,14 +522,15 @@ func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
wg := h.hub.GetWaitGroup()
stopChan := h.hub.GetStopChan()

client := socket.NewClientSocket(messageChan, closedSockets, ws, wg, stopChan, h.log)
server := socket.NewServerSocket(messageChan, closedSockets, ws, wg, stopChan, h.log)
h.sockets.Upsert(server)

wg.Add(2)

go client.WritePump()
go client.ReadPump()
go server.WritePump()
go server.ReadPump()

return client, nil
return server, nil
}

func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, error) {
Expand Down Expand Up @@ -505,13 +569,14 @@ func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, err
return msg, nil
}

func (h *Handler) publishTo(msg mmessage.Message, channelPath string) error {
func (h *Handler) publishTo(msg mmessage.Message, channelPath string,
socket socket.Socket) error {
publishMsg := mpublish.Publish{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: "publish",
Method: mquery.MethodPublish,
},
Params: mpublish.PublishParams{
Channel: channelPath,
Expand All @@ -524,5 +589,6 @@ func (h *Handler) publishTo(msg mmessage.Message, channelPath string) error {
return errors.NewJsonMarshalError(err.Error())
}

return h.subs.SendToAll(publishBytes, channelPath)
socket.Send(publishBytes)
return nil
}
Loading