Skip to content

Commit 59b4035

Browse files
thomash-acinqpm47t-bast
authored
Relay onion messages (#2061)
* Relay onion messages Allow sending and relaying onion messages Co-authored-by: Pierre-Marie Padiou <[email protected]> Co-authored-by: t-bast <[email protected]>
1 parent fb96e5e commit 59b4035

File tree

19 files changed

+493
-73
lines changed

19 files changed

+493
-73
lines changed

docs/release-notes/eclair-vnext.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ Node operators should watch this file very regularly.
1818
An event is also sent to the event stream for every such notification.
1919
This lets plugins notify the node operator via external systems (push notifications, email, etc).
2020

21+
### Initial support for onion messages
22+
23+
Eclair now supports the feature `option_onion_messages`. If this feature is enabled, eclair will relay onion messages.
24+
It can also send onion messages with the `sendonionmessage` API.
25+
Messages sent to Eclair will be ignored.
26+
2127
### API changes
2228

2329
#### Timestamps
@@ -59,6 +65,14 @@ Examples:
5965
}
6066
}
6167
```
68+
69+
#### Sending onion messages
70+
71+
You can now send onion messages with `sendonionmessage`.
72+
It expects `--route` a list of `nodeId` to send the message through, the last one being the recipient, and `--content` the content of the message as an encoded TLV stream in hexadecimal.
73+
It also accepts `--pathId` as an encoded TLV stream in hexadecimal.
74+
Sending to a blinded route (as a reply to a previous message) is not supported.
75+
6276

6377
This release contains many other API updates:
6478

eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import fr.acinq.eclair.channel._
3535
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
3636
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment}
3737
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
38-
import fr.acinq.eclair.io.{NodeURI, Peer, PeerConnection}
38+
import fr.acinq.eclair.io.{MessageRelay, NodeURI, Peer, PeerConnection}
39+
import fr.acinq.eclair.message.OnionMessages
3940
import fr.acinq.eclair.payment._
4041
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
4142
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, RelayFees, UsableBalance}
@@ -46,13 +47,13 @@ import fr.acinq.eclair.router.{NetworkStats, Router}
4647
import fr.acinq.eclair.wire.protocol._
4748
import grizzled.slf4j.Logging
4849
import scodec.bits.ByteVector
50+
import scodec.{Attempt, DecodeResult, codecs}
4951

5052
import java.nio.charset.StandardCharsets
5153
import java.util.UUID
5254
import scala.collection.immutable.SortedMap
5355
import scala.concurrent.duration._
5456
import scala.concurrent.{ExecutionContext, Future, Promise}
55-
import scala.reflect.ClassTag
5657

5758
case class GetInfoResponse(version: String, nodeId: PublicKey, alias: String, color: String, features: Features, chainHash: ByteVector32, network: String, blockHeight: Int, publicAddresses: Seq[NodeAddress], onionAddress: Option[NodeAddress], instanceId: String)
5859

@@ -62,6 +63,8 @@ case class SignedMessage(nodeId: PublicKey, message: String, signature: ByteVect
6263

6364
case class VerifiedMessage(valid: Boolean, publicKey: PublicKey)
6465

66+
case class SendOnionMessageResponse(sent: Boolean, failureMessage: Option[String])
67+
6568
object SignedMessage {
6669
def signedBytes(message: ByteVector): ByteVector32 =
6770
Crypto.hash256(ByteVector("Lightning Signed Message:".getBytes(StandardCharsets.UTF_8)) ++ message)
@@ -150,6 +153,8 @@ trait Eclair {
150153
def signMessage(message: ByteVector): SignedMessage
151154

152155
def verifyMessage(message: ByteVector, recoverableSignature: ByteVector): VerifiedMessage
156+
157+
def sendOnionMessage(route: Seq[PublicKey], userCustomContent: ByteVector, pathId: Option[ByteVector])(implicit timeout: Timeout): Future[SendOnionMessageResponse]
153158
}
154159

155160
class EclairImpl(appKit: Kit) extends Eclair with Logging {
@@ -160,8 +165,8 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
160165
private val externalIdMaxLength = 66
161166

162167
override def connect(target: Either[NodeURI, PublicKey])(implicit timeout: Timeout): Future[String] = target match {
163-
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
164-
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
168+
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
169+
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
165170
}
166171

167172
override def disconnect(nodeId: PublicKey)(implicit timeout: Timeout): Future[String] = {
@@ -502,4 +507,26 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
502507
}
503508
}
504509
}
510+
511+
override def sendOnionMessage(route: Seq[PublicKey], userCustomContent: ByteVector, pathId: Option[ByteVector])(implicit timeout: Timeout): Future[SendOnionMessageResponse] = {
512+
val sessionKey = randomKey()
513+
val blindingSecret = randomKey()
514+
codecs.list(TlvCodecs.genericTlv).decode(userCustomContent.bits) match {
515+
case Attempt.Successful(DecodeResult(userCustomTlvs, _)) =>
516+
val (nextNodeId, message) =
517+
OnionMessages.buildMessage(
518+
sessionKey,
519+
blindingSecret,
520+
route.dropRight(1).map(OnionMessages.IntermediateNode(_)),
521+
Left(OnionMessages.Recipient(route.last, pathId)),
522+
Nil,
523+
userCustomTlvs)
524+
(appKit.switchboard ? OnionMessages.SendMessage(nextNodeId, message)).mapTo[MessageRelay.Status].map {
525+
case MessageRelay.Success => SendOnionMessageResponse(sent = true, None)
526+
case MessageRelay.Failure(f) => SendOnionMessageResponse(sent = false, Some(f.toString))
527+
}
528+
case Attempt.Failure(cause) => Future.successful(SendOnionMessageResponse(sent = false, Some(s"the `content` field is invalid, it must contain encoded tlvs: ${cause.message}")))
529+
}
530+
531+
}
505532
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2021 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.io
18+
19+
import akka.actor.typed.Behavior
20+
import akka.actor.typed.scaladsl.Behaviors
21+
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
22+
import akka.actor.{ActorRef, typed}
23+
import fr.acinq.bitcoin.Crypto.PublicKey
24+
import fr.acinq.eclair.wire.protocol.OnionMessage
25+
26+
object MessageRelay {
27+
// @formatter:off
28+
sealed trait Command
29+
case class RelayMessage(switchboard: ActorRef, nextNodeId: PublicKey, msg: OnionMessage, replyTo: typed.ActorRef[Status]) extends Command
30+
case class WrappedConnectionResult(result: PeerConnection.ConnectionResult) extends Command
31+
32+
sealed trait Status
33+
case object Success extends Status
34+
case class Failure(failure: PeerConnection.ConnectionResult.Failure) extends Status
35+
// @formatter:on
36+
37+
def apply(): Behavior[Command] = {
38+
Behaviors.receivePartial {
39+
case (context, RelayMessage(switchboard, nextNodeId, msg, replyTo)) =>
40+
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic)
41+
waitForConnection(msg, replyTo)
42+
}
43+
}
44+
45+
def waitForConnection(msg: OnionMessage, replyTo: typed.ActorRef[Status]): Behavior[Command] = {
46+
Behaviors.receiveMessagePartial {
47+
case WrappedConnectionResult(r: PeerConnection.ConnectionResult.HasConnection) =>
48+
r.peerConnection ! msg
49+
replyTo ! Success
50+
Behaviors.stopped
51+
case WrappedConnectionResult(f: PeerConnection.ConnectionResult.Failure) =>
52+
replyTo ! Failure(f)
53+
Behaviors.stopped
54+
}
55+
}
56+
}

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ import fr.acinq.eclair.blockchain.{OnChainAddressGenerator, OnChainChannelFunder
3333
import fr.acinq.eclair.channel._
3434
import fr.acinq.eclair.io.Monitoring.Metrics
3535
import fr.acinq.eclair.io.PeerConnection.KillReason
36+
import fr.acinq.eclair.message.OnionMessages
3637
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
3738
import fr.acinq.eclair.wire.protocol
38-
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, RoutingMessage, UnknownMessage, Warning}
39+
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
3940
import scodec.bits.ByteVector
4041

4142
import java.net.InetSocketAddress
@@ -51,7 +52,7 @@ import scala.concurrent.ExecutionContext
5152
*
5253
* Created by PM on 26/08/2016.
5354
*/
54-
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
55+
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory, switchboard: ActorRef) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
5556

5657
import Peer._
5758

@@ -98,8 +99,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
9899

99100
when(CONNECTED) {
100101
dropStaleMessages {
101-
case Event(_: Peer.Connect, _) =>
102-
sender() ! PeerConnection.ConnectionResult.AlreadyConnected
102+
case Event(c: Peer.Connect, d: ConnectedData) =>
103+
c.replyTo ! PeerConnection.ConnectionResult.AlreadyConnected(d.peerConnection)
103104
stay()
104105

105106
case Event(Peer.OutgoingMessage(msg, peerConnection), d: ConnectedData) if peerConnection == d.peerConnection => // this is an outgoing message, but we need to make sure that this is for the current active connection
@@ -243,6 +244,20 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
243244
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
244245
gotoConnected(connectionReady, d.channels)
245246

247+
case Event(msg: OnionMessage, _: ConnectedData) =>
248+
if (nodeParams.features.hasFeature(Features.OnionMessages)) {
249+
OnionMessages.process(nodeParams.privateKey, msg) match {
250+
case OnionMessages.DropMessage(reason) =>
251+
log.debug(s"dropping message from ${remoteNodeId.value.toHex}: ${reason.toString}")
252+
case send: OnionMessages.SendMessage =>
253+
switchboard ! send
254+
case received: OnionMessages.ReceiveMessage =>
255+
log.info(s"received message from ${remoteNodeId.value.toHex}: $received")
256+
context.system.eventStream.publish(received)
257+
}
258+
}
259+
stay()
260+
246261
case Event(unknownMsg: UnknownMessage, d: ConnectedData) if nodeParams.pluginMessageTags.contains(unknownMsg.tag) =>
247262
context.system.eventStream.publish(UnknownMessageReceived(self, remoteNodeId, unknownMsg, d.connectionInfo))
248263
stay()
@@ -392,7 +407,7 @@ object Peer {
392407
context.actorOf(Channel.props(nodeParams, wallet, remoteNodeId, watcher, relayer, txPublisherFactory, origin_opt))
393408
}
394409

395-
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: ChannelFactory): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory))
410+
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: ChannelFactory, switchboard: ActorRef): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory, switchboard))
396411

397412
// @formatter:off
398413

@@ -420,11 +435,11 @@ object Peer {
420435
case object CONNECTED extends State
421436

422437
case class Init(storedChannels: Set[HasCommitments])
423-
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort]) {
438+
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef) {
424439
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
425440
}
426441
object Connect {
427-
def apply(uri: NodeURI): Connect = new Connect(uri.nodeId, Some(uri.address))
442+
def apply(uri: NodeURI, replyTo: ActorRef): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo)
428443
}
429444

430445
case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful

eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
135135
} else {
136136
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment()
137137
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
138-
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected)
138+
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected(self))
139139

140140
if (d.doSync) {
141141
self ! DoSync(replacePrevious = true)
@@ -517,13 +517,16 @@ object PeerConnection {
517517
object ConnectionResult {
518518
sealed trait Success extends ConnectionResult
519519
sealed trait Failure extends ConnectionResult
520+
sealed trait HasConnection extends ConnectionResult {
521+
val peerConnection: ActorRef
522+
}
520523

521524
case object NoAddressFound extends ConnectionResult.Failure { override def toString: String = "no address found" }
522525
case class ConnectionFailed(address: InetSocketAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
523526
case class AuthenticationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
524527
case class InitializationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
525-
case object AlreadyConnected extends ConnectionResult.Failure { override def toString: String = "already connected" }
526-
case object Connected extends ConnectionResult.Success { override def toString: String = "connected" }
528+
case class AlreadyConnected(peerConnection: ActorRef) extends ConnectionResult.Failure with HasConnection { override def toString: String = "already connected" }
529+
case class Connected(peerConnection: ActorRef) extends ConnectionResult.Success with HasConnection { override def toString: String = "connected" }
527530
}
528531

529532
case class DelayedRebroadcast(rebroadcast: Rebroadcast)

eclair-core/src/main/scala/fr/acinq/eclair/io/ReconnectionTask.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package fr.acinq.eclair.io
1818

19-
import java.net.InetSocketAddress
2019
import akka.actor.{ActorRef, Props}
2120
import akka.cluster.Cluster
2221
import akka.cluster.pubsub.DistributedPubSub
@@ -27,8 +26,9 @@ import fr.acinq.bitcoin.Crypto.PublicKey
2726
import fr.acinq.eclair.Logs.LogCategory
2827
import fr.acinq.eclair.db.{NetworkDb, PeersDb}
2928
import fr.acinq.eclair.io.Monitoring.Metrics
30-
import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, NodeParams, TimestampMilli, TimestampSecond}
29+
import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, NodeParams, TimestampMilli}
3130

31+
import java.net.InetSocketAddress
3232
import scala.concurrent.duration.{FiniteDuration, _}
3333
import scala.util.Random
3434

@@ -130,15 +130,15 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
130130

131131
case Event(TickReconnect, _) => stay()
132132

133-
case Event(Peer.Connect(_, hostAndPort_opt), _) =>
133+
case Event(Peer.Connect(_, hostAndPort_opt, replyTo), _) =>
134134
// manual connection requests happen completely independently of the automated reconnection process;
135135
// we initiate a connection but don't modify our state.
136136
// if we are already connecting/connected, the peer will kill any duplicate connections
137137
hostAndPort_opt
138138
.map(hostAndPort2InetSocketAddress)
139139
.orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match {
140-
case Some(address) => connect(address, origin = sender())
141-
case None => sender() ! PeerConnection.ConnectionResult.NoAddressFound
140+
case Some(address) => connect(address, origin = replyTo)
141+
case None => replyTo ! PeerConnection.ConnectionResult.NoAddressFound
142142
}
143143
stay()
144144
}

eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package fr.acinq.eclair.io
1818

19+
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps}
1920
import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy}
2021
import fr.acinq.bitcoin.Crypto.PublicKey
2122
import fr.acinq.eclair.NodeParams
2223
import fr.acinq.eclair.blockchain.OnChainAddressGenerator
2324
import fr.acinq.eclair.channel.Helpers.Closing
2425
import fr.acinq.eclair.channel._
26+
import fr.acinq.eclair.message.OnionMessages
2527
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
2628
import fr.acinq.eclair.router.Router.RouterConf
2729

@@ -57,12 +59,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
5759

5860
def normal(peersWithChannels: Set[PublicKey]): Receive = {
5961

60-
case Peer.Connect(publicKey, _) if publicKey == nodeParams.nodeId =>
62+
case Peer.Connect(publicKey, _, _) if publicKey == nodeParams.nodeId =>
6163
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
6264

63-
case c: Peer.Connect =>
65+
case Peer.Connect(nodeId, address_opt, replyTo) =>
6466
// we create a peer if it doesn't exist
65-
val peer = createOrGetPeer(c.nodeId, offlineChannels = Set.empty)
67+
val peer = createOrGetPeer(nodeId, offlineChannels = Set.empty)
68+
val c = if (replyTo == ActorRef.noSender){
69+
Peer.Connect(nodeId, address_opt, sender())
70+
}else{
71+
Peer.Connect(nodeId, address_opt, replyTo)
72+
}
6673
peer forward c
6774

6875
case d: Peer.Disconnect =>
@@ -92,6 +99,10 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
9299
case Symbol("peers") => sender() ! context.children
93100

94101
case GetRouterPeerConf => sender() ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf)
102+
103+
case OnionMessages.SendMessage(nextNodeId, dataToRelay) =>
104+
val relay = context.spawnAnonymous(MessageRelay())
105+
relay ! MessageRelay.RelayMessage(self, nextNodeId, dataToRelay, sender().toTyped)
95106
}
96107

97108
/**
@@ -131,7 +142,7 @@ object Switchboard {
131142

132143
case class SimplePeerFactory(nodeParams: NodeParams, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory) extends PeerFactory {
133144
override def spawn(context: ActorContext, remoteNodeId: PublicKey): ActorRef =
134-
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory), name = peerActorName(remoteNodeId))
145+
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory, context.self), name = peerActorName(remoteNodeId))
135146
}
136147

137148
def props(nodeParams: NodeParams, peerFactory: PeerFactory) = Props(new Switchboard(nodeParams, peerFactory))

0 commit comments

Comments
 (0)