@@ -6,7 +6,6 @@ import akka.actor._
66import akka .io .Tcp ._
77import akka .io .{IO , Tcp }
88import akka .util .ByteString
9- import io .iohk .ethereum .network .PeerActor .PeerP2pVersion
109import io .iohk .ethereum .network .p2p .{Message , MessageDecoder , MessageSerializable }
1110import io .iohk .ethereum .network .rlpx .RLPxConnectionHandler .RLPxConfiguration
1211import io .iohk .ethereum .utils .ByteUtils
@@ -24,15 +23,15 @@ import scala.util.{Failure, Success, Try}
2423 * 1. when created it waits for initial command (either handle incoming connection or connect usin g uri)
2524 * 2. when new connection is requested the actor waits for the result (waitingForConnectionResult)
2625 * 3. once underlying connection is established it either waits for handshake init message or for response message
27- * (depending on who initiated the connection)
26+ * (depending on who initiated the connection)
2827 * 4. once handshake is done (and secure connection established) actor can send/receive messages (`handshaked` state)
2928 */
3029class RLPxConnectionHandler (
31- messageDecoder : MessageDecoder ,
32- protocolVersion : Message .Version ,
33- authHandshaker : AuthHandshaker ,
34- messageCodecFactory : (Secrets , MessageDecoder , Message .Version ) => MessageCodec ,
35- rlpxConfiguration : RLPxConfiguration )
30+ messageDecoder : MessageDecoder ,
31+ protocolVersion : Message .Version ,
32+ authHandshaker : AuthHandshaker ,
33+ messageCodecFactory : (Secrets , MessageDecoder , Message .Version ) => MessageCodec ,
34+ rlpxConfiguration : RLPxConfiguration )
3635 extends Actor with ActorLogging {
3736
3837 import AuthHandshaker .{InitiatePacketLength , ResponsePacketLength }
@@ -126,10 +125,10 @@ class RLPxConnectionHandler(
126125 /**
127126 * Decode V4 packet
128127 *
129- * @param data, includes both the V4 packet with bytes from next messages
128+ * @param data , includes both the V4 packet with bytes from next messages
130129 * @return data of the packet and the remaining data
131130 */
132- private def decodeV4Packet (data : ByteString ): (ByteString , ByteString ) = {
131+ private def decodeV4Packet (data : ByteString ): (ByteString , ByteString ) = {
133132 val encryptedPayloadSize = ByteUtils .bigEndianToShort(data.take(2 ).toArray)
134133 val (packetData, remainingData) = data.splitAt(encryptedPayloadSize + 2 )
135134 packetData -> remainingData
@@ -148,7 +147,7 @@ class RLPxConnectionHandler(
148147 log.debug(s " Auth handshake succeeded for peer $peerId" )
149148 context.parent ! ConnectionEstablished (remotePubKey)
150149 val messageCodec = messageCodecFactory(secrets, messageDecoder, protocolVersion)
151- val messagesSoFar = messageCodec.readMessages(remainingData , None )
150+ val messagesSoFar = messageCodec.readMessages(remainingData)
152151 messagesSoFar foreach processMessage
153152 context become handshaked(messageCodec)
154153
@@ -170,59 +169,54 @@ class RLPxConnectionHandler(
170169 * Handles sending and receiving messages from the Akka TCP connection, while also handling acknowledgement of
171170 * messages sent. Messages are only sent when all Ack from previous messages were received.
172171 *
173- * @param messageCodec, for encoding the messages sent
174- * @param messagesNotSent, messages not yet sent
175- * @param cancellableAckTimeout, timeout for the message sent for which we are awaiting an acknowledgement (if there is one)
176- * @param seqNumber, sequence number for the next message to be sent
172+ * @param messageCodec , for encoding the messages sent
173+ * @param messagesNotSent , messages not yet sent
174+ * @param cancellableAckTimeout , timeout for the message sent for which we are awaiting an acknowledgement (if there is one)
175+ * @param seqNumber , sequence number for the next message to be sent
177176 */
178177 def handshaked (messageCodec : MessageCodec ,
179178 messagesNotSent : Queue [MessageSerializable ] = Queue .empty,
180179 cancellableAckTimeout : Option [CancellableAckTimeout ] = None ,
181- seqNumber : Int = 0 ,
182- p2pVersion : Option [Long ] = None ): Receive =
180+ seqNumber : Int = 0 ): Receive =
183181 handleWriteFailed orElse handleConnectionClosed orElse {
184182 case sm : SendMessage =>
185- if (cancellableAckTimeout.isEmpty)
186- sendMessage(messageCodec, sm.serializable, seqNumber, messagesNotSent, p2pVersion )
183+ if (cancellableAckTimeout.isEmpty)
184+ sendMessage(messageCodec, sm.serializable, seqNumber, messagesNotSent)
187185 else
188- context become handshaked(messageCodec, messagesNotSent :+ sm.serializable, cancellableAckTimeout, seqNumber, p2pVersion )
186+ context become handshaked(messageCodec, messagesNotSent :+ sm.serializable, cancellableAckTimeout, seqNumber)
189187
190188 case Received (data) =>
191- val messages = messageCodec.readMessages(data, p2pVersion )
189+ val messages = messageCodec.readMessages(data)
192190 messages foreach processMessage
193191
194192 case Ack if cancellableAckTimeout.nonEmpty =>
195193 // Cancel pending message timeout
196194 cancellableAckTimeout.foreach(_.cancellable.cancel())
197195
198196 // Send next message if there is one
199- if (messagesNotSent.nonEmpty)
200- sendMessage(messageCodec, messagesNotSent.head, seqNumber, messagesNotSent.tail, p2pVersion )
197+ if (messagesNotSent.nonEmpty)
198+ sendMessage(messageCodec, messagesNotSent.head, seqNumber, messagesNotSent.tail)
201199 else
202- context become handshaked(messageCodec, Queue .empty, None , seqNumber, p2pVersion )
200+ context become handshaked(messageCodec, Queue .empty, None , seqNumber)
203201
204202 case AckTimeout (ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
205203 cancellableAckTimeout.foreach(_.cancellable.cancel())
206204 log.debug(s " [Stopping Connection] Write to $peerId failed " )
207205 context stop self
208-
209- case PeerP2pVersion (p2pVer) =>
210- // We have peer p2p version based on hello message, if version is >= 5 next messages will be compressed.
211- context.become(handshaked(messageCodec, messagesNotSent, cancellableAckTimeout, seqNumber, Some (p2pVer)))
212206 }
213207
214208 /**
215209 * Sends an encoded message through the TCP connection, an Ack will be received when the message was
216210 * successfully queued for delivery. A cancellable timeout is created for the Ack message.
217211 *
218- * @param messageCodec, for encoding the messages sent
219- * @param messageToSend, message to be sent
220- * @param seqNumber, sequence number for the message to be sent
221- * @param remainingMsgsToSend, messages not yet sent
212+ * @param messageCodec , for encoding the messages sent
213+ * @param messageToSend , message to be sent
214+ * @param seqNumber , sequence number for the message to be sent
215+ * @param remainingMsgsToSend , messages not yet sent
222216 */
223217 private def sendMessage (messageCodec : MessageCodec , messageToSend : MessageSerializable ,
224- seqNumber : Int , remainingMsgsToSend : Queue [MessageSerializable ], p2pVersion : Option [ Long ] ): Unit = {
225- val out = messageCodec.encodeMessage(messageToSend, p2pVersion )
218+ seqNumber : Int , remainingMsgsToSend : Queue [MessageSerializable ]): Unit = {
219+ val out = messageCodec.encodeMessage(messageToSend)
226220 connection ! Write (out, Ack )
227221 log.debug(s " Sent message: $messageToSend from $peerId" )
228222
@@ -231,15 +225,14 @@ class RLPxConnectionHandler(
231225 messageCodec = messageCodec,
232226 messagesNotSent = remainingMsgsToSend,
233227 cancellableAckTimeout = Some (CancellableAckTimeout (seqNumber, timeout)),
234- seqNumber = increaseSeqNumber(seqNumber),
235- p2pVersion = p2pVersion
228+ seqNumber = increaseSeqNumber(seqNumber)
236229 )
237230 }
238231
239232 /**
240233 * Given a sequence number for the AckTimeouts, the next seq number is returned
241234 *
242- * @param seqNumber, the current sequence number
235+ * @param seqNumber , the current sequence number
243236 * @return the sequence number for the next message sent
244237 */
245238 private def increaseSeqNumber (seqNumber : Int ): Int = seqNumber match {
@@ -258,13 +251,14 @@ class RLPxConnectionHandler(
258251 if (msg.isPeerClosed) {
259252 log.debug(s " [Stopping Connection] Connection with $peerId closed by peer " )
260253 }
261- if (msg.isErrorClosed){
254+ if (msg.isErrorClosed) {
262255 log.debug(s " [Stopping Connection] Connection with $peerId closed because of error ${msg.getErrorCause}" )
263256 }
264257
265258 context stop self
266259 }
267260 }
261+
268262}
269263
270264object RLPxConnectionHandler {
0 commit comments