Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be1-go/internal/database/sqlite/sqlite_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ const (
FROM message
JOIN channelMessage ON message.messageID = channelMessage.messageID
WHERE channelMessage.channelPath = ?
ORDER BY message.storedTime DESC`
ORDER BY message.storedTime ASC`

selectChannelPath = `SELECT channelPath FROM channel WHERE channelPath = ?`

Expand Down
2 changes: 1 addition & 1 deletion be1-go/internal/database/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func Test_SQLite_GetAllMessagesFromChannel(t *testing.T) {
require.NoError(t, err)
}

expected := []mmessage.Message{testMessages[3].msg, testMessages[0].msg}
expected := []mmessage.Message{testMessages[0].msg, testMessages[3].msg}
messages, err := lite.GetAllMessagesFromChannel("channel1")
require.NoError(t, err)
require.Equal(t, expected, messages)
Expand Down
1 change: 1 addition & 0 deletions be1-go/internal/handler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
FederationActionInit = "init"
FederationActionExpect = "expect"
FederationActionResult = "result"
FederationActionTokensExchange = "tokens_exchange"

ChirpObject = "chirp"
ChirpActionAdd = "add"
Expand Down
204 changes: 146 additions & 58 deletions be1-go/internal/handler/channel/federation/hfederation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"popstellar/internal/handler/jsonrpc/mjsonrpc"
"popstellar/internal/handler/message/mmessage"
"popstellar/internal/handler/method/publish/mpublish"
"popstellar/internal/handler/method/subscribe/msubscribe"
"popstellar/internal/handler/query/mquery"
"popstellar/internal/network/socket"
"popstellar/internal/validation"
Expand All @@ -44,7 +43,18 @@ type Subscribers interface {
SendToAll(buf []byte, channel string) error
}

type Sockets interface {
Upsert(socket socket.Socket)
}

type Config interface {
GetServerInfo() (string, string, string, error)
}

type Repository interface {
// HasMessage returns true if the message already exists.
HasMessage(messageID string) (bool, error)

// GetOrganizerPubKey returns the organizer public key of a LAO.
GetOrganizerPubKey(laoID string) (kyber.Point, error)

Expand All @@ -70,24 +80,45 @@ type Repository interface {
}

type Handler struct {
hub Hub
subs Subscribers
db Repository
schema *validation.SchemaValidator
log zerolog.Logger
hub Hub
subs Subscribers
sockets Sockets
conf Config
db Repository
schema *validation.SchemaValidator
log zerolog.Logger
}

func New(hub Hub, subs Subscribers, db Repository, schema *validation.SchemaValidator, log zerolog.Logger) *Handler {
func New(hub Hub, subs Subscribers, sockets Sockets, conf Config,
db Repository, schema *validation.SchemaValidator,
log zerolog.Logger) *Handler {
return &Handler{
hub: hub,
subs: subs,
db: db,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
hub: hub,
subs: subs,
sockets: sockets,
conf: conf,
db: db,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
}
}

func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
func (h *Handler) Handle(channelPath string, msg mmessage.Message,
socket socket.Socket) error {
err := msg.VerifyMessage()
if err != nil {
return err
}

alreadyExist, err := h.db.HasMessage(msg.MessageID)
if err != nil {
return err
}

if alreadyExist {
return nil
}

jsonData, err := base64.URLEncoding.DecodeString(msg.Data)
if err != nil {
return errors.NewInvalidMessageFieldError("failed to decode message data: %v", err)
Expand All @@ -111,13 +142,15 @@ func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
case channel.FederationActionChallengeRequest:
err = h.handleRequestChallenge(msg, channelPath)
case channel.FederationActionInit:
err = h.handleInit(msg, channelPath)
err = h.handleInit(msg, channelPath, socket)
case channel.FederationActionExpect:
err = h.handleExpect(msg, channelPath)
case channel.FederationActionChallenge:
err = h.handleChallenge(msg, channelPath)
err = h.handleChallenge(msg, channelPath, socket)
case channel.FederationActionResult:
err = h.handleResult(msg, channelPath)
case channel.FederationActionTokensExchange:
err = h.handleTokensExchange(msg, channelPath)
default:
err = errors.NewInvalidMessageFieldError("failed to Handle %s#%s, invalid object#action", object, action)
}
Expand Down Expand Up @@ -213,16 +246,14 @@ func (h *Handler) handleExpect(msg mmessage.Message, channelPath string) error {
return err
}

remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
_ = h.subs.AddChannel(remoteChannel)

return h.db.StoreMessageAndData(channelPath, msg)
}

// handleInit checks that the message is from the local organizer and that
// it contains a valid challenge, then stores the msg,
// connect to the server and send the embedded challenge
func (h *Handler) handleInit(msg mmessage.Message, channelPath string) error {
func (h *Handler) handleInit(msg mmessage.Message, channelPath string,
s socket.Socket) error {
var federationInit mfederation.FederationInit
err := msg.UnmarshalData(&federationInit)
if err != nil {
Expand Down Expand Up @@ -257,45 +288,55 @@ func (h *Handler) handleInit(msg mmessage.Message, channelPath string) error {
return err
}

remote, err := h.connectTo(federationInit.ServerAddress)
if err != nil {
return err
}

//Force the remote server to be subscribed to /root/<remote_lao>/federation
remoteChannel := fmt.Sprintf(channelPattern, federationInit.LaoId)
_ = h.subs.AddChannel(remoteChannel)
err = h.subs.Subscribe(remoteChannel, remote)
if err != nil {
return err
}

subscribeMsg := msubscribe.Subscribe{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
if h.isOnSameServer(federationInit.ServerAddress) {
// In the edge case where the two LAOs are on the same server,
// there is no need to create a websocket connection to the other
// server and message from one "server" to the "other" could be
// directly added to the message channel.
publishMsg := mpublish.Publish{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: mquery.MethodPublish,
},
Method: "subscribe",
},
Params: msubscribe.SubscribeParams{Channel: channelPath},
}
Params: mpublish.PublishParams{
Channel: remoteChannel,
Message: federationInit.ChallengeMsg,
},
}

subscribeBytes, err := json.Marshal(subscribeMsg)
if err != nil {
return errors.NewJsonMarshalError(err.Error())
publishBytes, err := json.Marshal(&publishMsg)
if err != nil {
return errors.NewJsonMarshalError(err.Error())
}

incomingMsg := socket.IncomingMessage{
Socket: s,
Message: publishBytes,
}

// when adding the message to the message queue, we check if it is full
select {
case h.hub.GetMessageChan() <- incomingMsg:
return nil
default:
return errors.NewInternalServerError("Messages queue full")
}
}

// Subscribe to /root/<local_lao>/federation on the remote server
err = h.subs.SendToAll(subscribeBytes, remoteChannel)
remote, err := h.connectTo(federationInit.ServerAddress)
if err != nil {
return err
}

// send the challenge to a channelPath where the remote server is subscribed to
return h.publishTo(federationInit.ChallengeMsg, remoteChannel)
// send the challenge to the remote channelPath on the remote socket
return h.publishTo(federationInit.ChallengeMsg, remoteChannel, remote)
}

func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string) error {
func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string,
socket socket.Socket) error {
var federationChallenge mfederation.FederationChallenge
err := msg.UnmarshalData(&federationChallenge)
if err != nil {
Expand Down Expand Up @@ -340,13 +381,26 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string) erro
return err
}

// publish the FederationResult to the other server
remoteChannel := fmt.Sprintf(channelPattern, federationExpect.LaoId)
err = h.publishTo(resultMsg, remoteChannel)
if err != nil {
return err
if h.isOnSameServer(federationExpect.ServerAddress) || socket == nil {
// In the edge case where the two LAOs are on the same server, the
// result message would already be stored and handleResult will not be
// called => broadcast the result to both federation channels directly.
_ = h.db.StoreMessageAndData(remoteChannel, resultMsg)
_ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel)
} else {
// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}

// Add the socket to the list of server sockets
h.sockets.Upsert(socket)
}

h.log.Info().Msgf("A federation was successfully")

// broadcast the FederationResult to the local organizer
return h.subs.BroadcastToAllClients(resultMsg, channelPath)
}
Expand Down Expand Up @@ -405,6 +459,26 @@ func (h *Handler) handleResult(msg mmessage.Message, channelPath string) error {
return h.subs.BroadcastToAllClients(msg, channelPath)
}

func (h *Handler) handleTokensExchange(msg mmessage.Message, channelPath string) error {
var tokensExchange mfederation.FederationTokensExchange
err := msg.UnmarshalData(&tokensExchange)
if err != nil {
return err
}

err = h.verifyLocalOrganizer(msg, channelPath)
if err != nil {
return err
}

err = h.db.StoreMessageAndData(channelPath, msg)
if err != nil {
return err
}

return h.subs.BroadcastToAllClients(msg, channelPath)
}

func (h *Handler) getOrganizerPk(federationChannel string) (string, error) {
laoChannel := strings.TrimSuffix(federationChannel, "/federation")

Expand Down Expand Up @@ -448,6 +522,17 @@ func (h *Handler) verifyLocalOrganizer(msg mmessage.Message, channelPath string)
return nil
}

func (h *Handler) isOnSameServer(address string) bool {
_, clientServerAddress, serverServerAddress, _ := h.conf.GetServerInfo()

isSameAddress := address == clientServerAddress || address == serverServerAddress

h.log.Info().Msgf("isOnSameServer=%v, remote=%s, client=%s, server=%s",
isSameAddress, address, clientServerAddress, serverServerAddress)

return isSameAddress
}

func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
ws, _, err := websocket.DefaultDialer.Dial(serverAddress, nil)
if err != nil {
Expand All @@ -459,14 +544,15 @@ func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
wg := h.hub.GetWaitGroup()
stopChan := h.hub.GetStopChan()

client := socket.NewClientSocket(messageChan, closedSockets, ws, wg, stopChan, h.log)
server := socket.NewServerSocket(messageChan, closedSockets, ws, wg, stopChan, h.log)
h.sockets.Upsert(server)

wg.Add(2)

go client.WritePump()
go client.ReadPump()
go server.WritePump()
go server.ReadPump()

return client, nil
return server, nil
}

func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, error) {
Expand Down Expand Up @@ -505,13 +591,14 @@ func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, err
return msg, nil
}

func (h *Handler) publishTo(msg mmessage.Message, channelPath string) error {
func (h *Handler) publishTo(msg mmessage.Message, channelPath string,
socket socket.Socket) error {
publishMsg := mpublish.Publish{
Base: mquery.Base{
JSONRPCBase: mjsonrpc.JSONRPCBase{
JSONRPC: "2.0",
},
Method: "publish",
Method: mquery.MethodPublish,
},
Params: mpublish.PublishParams{
Channel: channelPath,
Expand All @@ -524,5 +611,6 @@ func (h *Handler) publishTo(msg mmessage.Message, channelPath string) error {
return errors.NewJsonMarshalError(err.Error())
}

return h.subs.SendToAll(publishBytes, channelPath)
socket.Send(publishBytes)
return nil
}
Loading