Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit eaaf5c6

Browse files
Merge pull request #3492 from input-output-hk/feature/cbr-381-restart-submission-worker
[CBR-381] Initialise Submission layer on wallet startup
2 parents ae06f3a + d12d981 commit eaaf5c6

File tree

2 files changed

+155
-21
lines changed

2 files changed

+155
-21
lines changed

wallet-new/docs/RelatingWalletSpecToCardano.md

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,9 @@ tick :: WalletSubmission
12791279

12801280
### 10.1 Interface
12811281

1282-
* "call addPending on newPending"
1282+
#### Adding and removing pending transactions
1283+
* "call `addPending` on `newPending` and (possibly) on `rollback`"
1284+
* "call `remPending` on `applyBlock` and `cancel`"
12831285

12841286
After adding a new transaction (`newPending`/`newForeign`) to the wallet state, we notify the submission layer of the new transaction:
12851287

@@ -1290,7 +1292,7 @@ submitTx = modifyMVar_ (walletPassive ^. walletSubmission) $
12901292
```
12911293
[submitTx](https://github.com/input-output-hk/cardano-sl/blob/6659d8501c727714a7861ad2e527a337e0a11b86/wallet-new/src/Cardano/Wallet/Kernel/Pending.hs#L114-L116)
12921294

1293-
* "call remPending during applyBlock"
1295+
In `applyBlock` we remove relevant pending transactions:
12941296

12951297
```haskell
12961298
-- | Notify all the wallets in the PassiveWallet of a new block
@@ -1307,10 +1309,34 @@ applyBlock pw@PassiveWallet{..} b = do
13071309
```
13081310
[BListener.applyBlock](https://github.com/input-output-hk/cardano-sl/blob/6659d8501c727714a7861ad2e527a337e0a11b86/wallet-new/src/Cardano/Wallet/Kernel/BListener.hs#L54-L64)
13091311

1312+
In `switchToFork` we _add_ / _remove_ the relevant pending transactions to/from the submission layer (which corresponds with the Wallet Spec on possibly adding pendings during `rollback` and removing pendings during `applyBlock`):
1313+
1314+
```haskell
1315+
switchToFork :: PassiveWallet
1316+
-> Int -- ^ Number of blocks to roll back
1317+
-> [ResolvedBlock] -- ^ Blocks in the new fork
1318+
-> IO (Either RollbackDuringRestoration ())
1319+
switchToFork pw@PassiveWallet{..} n bs = do
1320+
k <- Node.getSecurityParameter _walletNode
1321+
blocksAndMeta <- mapM (prefilterBlock' pw) bs
1322+
let (blockssByAccount, metas) = unzip blocksAndMeta
1323+
res <- update' _wallets $ SwitchToFork k n blockssByAccount
1324+
case res of
1325+
Left err -> return $ Left err
1326+
Right changes -> do mapM_ (putTxMeta _walletMeta) $ concat metas
1327+
modifyMVar_ _walletSubmission $
1328+
return . Submission.addPendings (fst <$> changes)
1329+
modifyMVar_ _walletSubmission $
1330+
return . Submission.remPending (snd <$> changes)
1331+
return $ Right ()
1332+
```
1333+
[switchToFork](https://github.com/input-output-hk/cardano-sl/blob/6659d8501c727714a7861ad2e527a337e0a11b86/wallet-new/src/Cardano/Wallet/Kernel/BListener.hs#L74-L94)
1334+
1335+
#### Tick function
1336+
13101337
* "must be a thread that periodically calls tick, to give the submission layer a chance to resubmit transactions that haven’t made it into the blockchain yet. The set of transactions returned by tick ... the wallet should remove such transactions from its pending set"
13111338

1312-
The submission layer resource is managed in `bracketActiveWallet`.
1313-
The resubmission function `tickFunction` calls `tick` and cancels any pending transactions returned:
1339+
The submission layer resource is managed in `bracketActiveWallet` and is initialised with `tickFunction`, which calls `tick` and cancels any pending transactions returned:
13141340

13151341
```haskell
13161342
tickFunction :: MVar WalletSubmission -> IO ()
@@ -1328,6 +1354,12 @@ tickFunction submissionLayer = do
13281354
#### cancelPending
13291355

13301356
```haskell
1357+
-- | Cancel a pending transaction
1358+
--
1359+
-- NOTE: This gets called in response to events /from/ the wallet submission
1360+
-- layer, so we shouldn't be notifying the submission in return here.
1361+
--
1362+
-- This removes the transaction from either pending or foreign.
13311363
cancelPending :: forall c. IsCheckpoint c
13321364
=> Set Txp.TxId
13331365
-> NewestFirst StrictNonEmpty c -> NewestFirst StrictNonEmpty c
@@ -1337,6 +1369,89 @@ cancelPending txids = map (cpPending %~ Pending.delete txids)
13371369

13381370
### 10.2 Implementation
13391371

1372+
* The `Schedule` time slots are modeled as a collection of `ScheduleEvents` per `slotId`
1373+
1374+
```haskell
1375+
data Schedule = Schedule {
1376+
_ssScheduled :: IntMap ScheduleEvents
1377+
, _ssUnsentNursery :: [ScheduleSend]
1378+
}
1379+
1380+
-- | A type representing an item (in this context, a transaction) scheduled
1381+
-- to be regularly sent in a given slot (computed by a given 'RetryPolicy').
1382+
data ScheduleSend = ScheduleSend HdAccountId Txp.TxId Txp.TxAux SubmissionCount deriving Eq
1383+
1384+
-- | A type representing an item (in this context, a transaction @ID@) which
1385+
-- needs to be checked against the blockchain for inclusion. In other terms,
1386+
-- we need to confirm that indeed the transaction identified by the given 'TxId' has
1387+
-- been adopted, i.e. it's not in the local pending set anymore.
1388+
data ScheduleEvictIfNotConfirmed = ScheduleEvictIfNotConfirmed HdAccountId Txp.TxId deriving Eq
1389+
1390+
-- | All the events we can schedule for a given 'Slot', partitioned into
1391+
-- 'ScheduleSend' and 'ScheduleEvictIfNotConfirmed'.
1392+
data ScheduleEvents = ScheduleEvents {
1393+
_seToSend :: [ScheduleSend]
1394+
-- ^ A list of transactions which we need to send.
1395+
, _seToConfirm :: [ScheduleEvictIfNotConfirmed]
1396+
-- ^ A list of transactions which we need to check if they have been
1397+
-- confirmed (i.e. adopted) by the blockchain.
1398+
}
1399+
1400+
```
1401+
1402+
* "When the submission layer is notified of new pending transactions, it adds those to its pending set and schedules them to be submitted in the next slot, recording an initial submission count of 0"
1403+
1404+
```haskell
1405+
-- | Schedule the full list of pending transactions.
1406+
-- The transactions will be scheduled immediately in the next 'Slot'.
1407+
schedulePending :: HdAccountId
1408+
-> Pending
1409+
-> WalletSubmission
1410+
-> WalletSubmission
1411+
schedulePending accId pending ws =
1412+
let currentSlot = ws ^. wsState . wssCurrentSlot
1413+
in addToSchedule ws (mapSlot succ currentSlot) toSend mempty
1414+
where
1415+
toEntry :: (Txp.TxId, Txp.TxAux) -> ScheduleSend
1416+
toEntry (txId, txAux) = ScheduleSend accId txId txAux (SubmissionCount 0)
1417+
1418+
toSend :: [ScheduleSend]
1419+
toSend = map toEntry (Pending.toList pending)
1420+
```
1421+
1422+
* "The submission layer is parameterised over a ‘resubmission function’ ρ"
1423+
* "If desired, the submission count can be used to implement exponential back-off"
1424+
1425+
```haskell
1426+
1427+
-- see initPassiveWallet:
1428+
1429+
submission <- newMVar (newWalletSubmission rho)
1430+
...
1431+
rho = defaultResubmitFunction (exponentialBackoff 255 1.25)
1432+
```
1433+
[initPassiveWallet](https://github.com/input-output-hk/cardano-sl/blob/6659d8501c727714a7861ad2e527a337e0a11b86/wallet-new/src/Cardano/Wallet/Kernel.hs#L105)
1434+
1435+
### 10.3 Persistence
1436+
1437+
* "The state of the submission layer does not need to be persisted. If the wallet is shutdown for some period of time, the submission layer can simply be re-initialised from the state of the wallet, starting the submission process afresh for any transactions that the wallet still reports as pending."
1438+
1439+
```haskell
1440+
1441+
-- see initPassiveWallet...
1442+
1443+
initSubmission :: PassiveWallet -> IO ()
1444+
initSubmission pw = do
1445+
pendings <- pendingByAccount <$> getWalletSnapshot pw
1446+
modifyMVar_ (_walletSubmission pw) $
1447+
return . addPendings pendings
1448+
```
1449+
Note: the above code was added shortly after this document was pinned to a git hash (hence no link).
1450+
1451+
### 10.4 Transactions with TTL
1452+
1453+
Cardano does not implement transaction TTL yet and instead relies on submission count and a maximum retry limit.
1454+
13401455
**Fig 14 - Wallet Spec**
13411456

13421457
```haskell

wallet-new/src/Cardano/Wallet/Kernel.hs

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ module Cardano.Wallet.Kernel (
2121
import Universum hiding (State, init)
2222

2323
import Control.Concurrent.Async (async, cancel)
24-
import Control.Concurrent.MVar (modifyMVar)
24+
import Control.Concurrent.MVar (modifyMVar, modifyMVar_)
2525
import Data.Acid (AcidState)
2626
import Data.Acid.Memory (openMemoryState)
2727
import qualified Data.Map.Strict as Map
@@ -31,14 +31,16 @@ import Pos.Core.Txp (TxAux (..))
3131
import Pos.Util.Wlog (Severity (..))
3232

3333
import Cardano.Wallet.Kernel.DB.AcidState (DB, defDB)
34+
import Cardano.Wallet.Kernel.DB.Read (pendingByAccount)
3435
import Cardano.Wallet.Kernel.DB.TxMeta
3536
import Cardano.Wallet.Kernel.Diffusion (WalletDiffusion (..))
3637
import Cardano.Wallet.Kernel.Internal
3738
import Cardano.Wallet.Kernel.Keystore (Keystore)
3839
import Cardano.Wallet.Kernel.NodeStateAdaptor (NodeStateAdaptor)
3940
import Cardano.Wallet.Kernel.Pending (cancelPending)
41+
import Cardano.Wallet.Kernel.Read (getWalletSnapshot)
4042
import Cardano.Wallet.Kernel.Submission (WalletSubmission,
41-
defaultResubmitFunction, exponentialBackoff,
43+
addPendings, defaultResubmitFunction, exponentialBackoff,
4244
newWalletSubmission, tick)
4345
import Cardano.Wallet.Kernel.Submission.Worker (tickSubmissionLayer)
4446

@@ -85,26 +87,43 @@ handlesClose (Handles _ meta) = closeMetaDB meta
8587
Wallet Initialisers
8688
-------------------------------------------------------------------------------}
8789

88-
-- | Initialise Passive Wallet with empty Wallets collection
90+
-- | Initialise Passive Wallet
8991
initPassiveWallet :: (Severity -> Text -> IO ())
9092
-> Keystore
9193
-> WalletHandles
9294
-> NodeStateAdaptor IO
9395
-> IO PassiveWallet
94-
initPassiveWallet logMessage keystore Handles{..} node = do
95-
submission <- newMVar (newWalletSubmission rho)
96-
restore <- newMVar Map.empty
97-
return PassiveWallet {
98-
_walletLogMessage = logMessage
99-
, _walletKeystore = keystore
100-
, _wallets = hAcid
101-
, _walletMeta = hMeta
102-
, _walletNode = node
103-
, _walletSubmission = submission
104-
, _walletRestorationTask = restore
105-
}
106-
where
107-
rho = defaultResubmitFunction (exponentialBackoff 255 1.25)
96+
initPassiveWallet logMessage keystore handles node = do
97+
pw <- preparePassiveWallet
98+
initSubmission pw
99+
return pw
100+
where
101+
-- | Prepare Passive Wallet for initialisation.
102+
-- NOTE: the Submission Layer is not initialised yet since that would require
103+
-- access to the PassiveWallet state
104+
preparePassiveWallet :: IO PassiveWallet
105+
preparePassiveWallet = do
106+
submission <- newMVar (newWalletSubmission rho)
107+
restore <- newMVar Map.empty
108+
return PassiveWallet {
109+
_walletLogMessage = logMessage
110+
, _walletKeystore = keystore
111+
, _wallets = hAcid handles
112+
, _walletMeta = hMeta handles
113+
, _walletNode = node
114+
, _walletSubmission = submission
115+
, _walletRestorationTask = restore
116+
}
117+
where
118+
rho = defaultResubmitFunction (exponentialBackoff 255 1.25)
119+
120+
-- | Since the submission layer state is not persisted, we need to initialise
121+
-- the submission layer with all pending transactions present in the wallet state.
122+
initSubmission :: PassiveWallet -> IO ()
123+
initSubmission pw_ = do
124+
pendings <- pendingByAccount <$> getWalletSnapshot pw_
125+
modifyMVar_ (_walletSubmission pw_) $
126+
return . addPendings pendings
108127

109128
-- | Initialize the Passive wallet (specified by the ESK) with the given Utxo
110129
--

0 commit comments

Comments
 (0)