-
Notifications
You must be signed in to change notification settings - Fork 10
docs: event queue connection class design #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
86d5e0a
0e6162a
6da92d4
3d94df6
8055066
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,198 @@ | ||
| # Summary | ||
| [summary]: #summary | ||
|
|
||
| A new connection class that streams events from the fullnode event queue. | ||
|
|
||
| # Motivation | ||
| [motivation]: #motivation | ||
|
|
||
| The event queue is a way to sequentially stream events, making sure we do not miss any transaction or transaction update, we can also continue where we left in case of any connection issues, currently any connection issue makes it so we have to sync the entire wallet history from the beggining. | ||
|
|
||
| # Guide-level explanation | ||
| [guide-level-explanation]: #guide-level-explanation | ||
|
|
||
| The fullnode's event queue (a.k.a. reliable integration) has 2 ways to stream events, a websocket and a http api (not server-side events (SSE), just polling events with an event id based pagination). | ||
| We will focus on the websocket implementation but we can later create another class that uses the http api. | ||
|
|
||
| ## Events | ||
|
|
||
| The events from the event queue are not filtered in any way, meaning we have to implement our own filter. | ||
| The events that represent new transactions or blocks is called `NEW_VERTEX_ACCEPTED` and any updates on the transaction data is called `VERTEX_METADATA_CHANGED` these 2 events come with the current transaction data, we can use this data to filter for transaction of our wallet. | ||
|
|
||
| ## Address subscription | ||
|
|
||
| The current implementation uses the fullnode pubsub to listen only for addresses of our wallet, meaning that during startup we send a subscribe command for each address of our wallet. | ||
| Since the events will be filtered locally we can instead use the `subscribeAddresses` method to create a local list of addresses being listened to and use this list to filter the events. | ||
|
|
||
| The "list of addresses" will be an object where the address is the key, since determining if an object has a key is O(1) we can ensure that this does not become a bottleneck for wallets with many addresses. | ||
| Alternatively, we could use the storage `isAddressMine` method which is already O(1). | ||
|
|
||
| ## Event streaming | ||
|
|
||
| To get the full state of the wallet we would need to stream all events of the fullnode, but we can still use the address history api to fetch the balance and transactions of our addresses and start listening the newer events. | ||
|
|
||
| The best way to achieve this is to use the event api to fetch a single event, this will come with the latest event id. | ||
| Example response of `GET ${FULLNODE_URL}/v1a/event?size=1` | ||
|
|
||
| ```json | ||
| { | ||
| "events": [ | ||
| { | ||
| "peer_id": "ca084565aa4ac6f84c452cb0085c1bc03e64317b64e0761f119b389c34fcfede", | ||
| "id": 0, | ||
| "timestamp": 1686186579.306944, | ||
| "type": "LOAD_STARTED", | ||
| "data": {}, | ||
| "group_id": null | ||
| } | ||
| ], | ||
| "latest_event_id": 9038 | ||
| } | ||
| ``` | ||
|
|
||
| Then we save `latest_event_id` and sync the history with the address history api. | ||
| Once we have the wallet on the current state we can start streaming from the `latest_event_id`. | ||
| There can be transactions arriving during this process which would mean we add them during the history sync and during the event streaming, but this issue does not affect the balance or how we process the history. | ||
|
|
||
| ## Best block update | ||
|
|
||
| The fullnode pubsub sends updates whenever the best chain height changes, this is so our wallets can unlock our block rewards, the event queue does not send an update like this but we receive all block transactions as events, meaning we can listen for any transaction with `version` 0 or 3 (block or merged mining block) and with the metadata `voided_by` as `null` (this is because if a block is not voided, it is on the main chain) and derive the best chain height. | ||
|
|
||
| We will always expect the latest unvoided block to be the best chain newest block since during re-orgs (where the best chain changes) we will receive updates and the new best chain will be updated with `voided_by` as `null`. | ||
andreabadesso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ## EventQueueConnection class | ||
|
|
||
| The `EventQueueConnection` class will manage a websocket instance to the event queue api and emit a `wallet-update` event, this is to keep compatibility with the existing `Connection` class. | ||
|
|
||
| The `wallet-update` event will work with the schema: | ||
|
|
||
| ```ts | ||
| interface WalletUpdateEvent { | ||
| type: 'wallet:address_history', | ||
| history: IHistoryTx, | ||
| } | ||
|
|
||
| // Where IHistoryTx is defined as: | ||
|
|
||
| interface IHistoryTx { | ||
| tx_id: string; | ||
| signalBits: number; | ||
| version: number; | ||
| weight: number; | ||
| timestamp: number; | ||
| is_voided: boolean; | ||
| nonce: number, | ||
| inputs: IHistoryInput[]; | ||
| outputs: IHistoryOutput[]; | ||
| parents: string[]; | ||
| token_name?: string; | ||
| token_symbol?: string; | ||
| tokens: string[]; | ||
| height?: number; | ||
| } | ||
|
|
||
| export interface IHistoryInput { | ||
| value: number; | ||
| token_data: number; | ||
| script: string; | ||
| decoded: IHistoryOutputDecoded; | ||
| token: string; | ||
| tx_id: string; | ||
| index: number; | ||
| } | ||
|
|
||
| export interface IHistoryOutputDecoded { | ||
| type?: string; | ||
| address?: string; | ||
| timelock?: number | null; | ||
| data?: string; | ||
| } | ||
|
|
||
| export interface IHistoryOutput { | ||
| value: number; | ||
| token_data: number; | ||
| script: string; | ||
| decoded: IHistoryOutputDecoded; | ||
| token: string; | ||
| spent_by: string | null; | ||
| } | ||
| ``` | ||
|
|
||
| The transaction data from the event queue is in a different format as described below: | ||
|
|
||
| ```ts | ||
| interface EventQueueTxData { | ||
| hash: string; | ||
| version: number; | ||
| weight: number; | ||
| timestamp: number; | ||
| nonce?: number; | ||
| inputs: EventQueueTxInput[]; | ||
| outputs: EventQueueTxOutput[]; | ||
| parents: string[]; | ||
| token_name?: string; | ||
| token_symbol?: string; | ||
| tokens: string[]; | ||
| metadata: EventQueueTxMetadata; | ||
| aux_pow?: string; | ||
| } | ||
|
|
||
| interface EventQueueTxMetadata { | ||
| hash: string; | ||
| spent_outputs: EventQueueSpentOutput[]; | ||
| conflict_with: string[]; | ||
| voided_by: string[]; | ||
| received_by: string[]; | ||
| children: string[]; | ||
| twins: string[]; | ||
| accumulated_weight: number; | ||
| score: number; | ||
| first_block?: string; | ||
| height: number; | ||
| validation: string; | ||
| } | ||
|
|
||
| interface EventQueueTxInput { | ||
| tx_id: string; | ||
| index: number; | ||
| data: string; | ||
| } | ||
|
|
||
| interface EventQueueTxOutput { | ||
| value: number; | ||
| script: string; | ||
| token_data: number; | ||
| } | ||
|
|
||
| interface EventQueueSpentOutput { | ||
| index: number; | ||
| tx_ids: string[]; | ||
| } | ||
| ``` | ||
|
|
||
| ### Data conversion process | ||
|
|
||
| To keep compatibility with the current `Connection` class we need to convert the data with the following process: | ||
|
|
||
| 1. Convert `hash` to `tx_id` | ||
| 2. Assert `nonce` is valid | ||
andreabadesso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| 3. Assign `signalBits` from `version` | ||
| 4. Assign `is_voided` from metadata's `voided_by` | ||
| 5. Assign `height` from metadata's height | ||
| 6. Convert outputs | ||
| 7. Convert inputs | ||
| 8. remove `metadata` and `aux_pow` fields | ||
|
|
||
| Process to convert outputs: | ||
|
|
||
| 1. Derive `token` from `token_data` and `tx.tokens` | ||
| 2. Derive `spent_by` from the output index and `tx.metadata.spent_outputs` | ||
| 3. Derive `decoded` from the script | ||
|
|
||
| Process to convert inputs: | ||
|
|
||
| 1. Try to find the transaction with the input's `tx_id` in storage | ||
| 1. If not found we must fetch the tx from the fullnode api | ||
andreabadesso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| 2. Assign `value`, `token_data`, `script` and `token` from the spent output | ||
|
|
||
| Now that the data matches the current websocket transaction we can emit the data and all processes to manage history from the facade will work as intended. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| # Summary | ||
| [summary]: #summary | ||
|
|
||
| Organize the connection manager classes and move responsabilities from the wallet facade to the connection manager. | ||
|
|
||
| # Motivation | ||
| [motivation]: #motivation | ||
|
|
||
| The connection manager should be responsible for managing the state of the | ||
| connection and how the events from the connection are used, this way we can have | ||
| a multiple connection classes to handle the different behaviors but maintaining | ||
| a common interface. | ||
|
|
||
| # Guide-level explanation | ||
| [guide-level-explanation]: #guide-level-explanation | ||
|
|
||
|
|
||
| ## Current implementation | ||
|
|
||
| The current `Connection` class implementation is a wrapper around a `WebSocket` | ||
| class, the `WebSocket` class is responsible for managing the connection and | ||
| actually sending and receiving data. | ||
| The `Connection` class will interpret the events from the websocket instance and | ||
| emit events that are used by the facade, it also subscribes to specific events | ||
| and handles them differently depending on the type of event. | ||
|
|
||
| The layers of abstraction make the connection very easy to instantiate and use | ||
| but create a black box of events that are interpreted by the facade. | ||
|
|
||
| ## New connection classes | ||
|
|
||
| ### FullnodePubSubConnection and FullnodeEventQueueWSConnection | ||
|
|
||
| Will connect to the fullnode websocket and subscribe to the apropriate | ||
| events, meaning the best block updates and the transaction events. | ||
|
|
||
| Transaction events will be inserted into the storage and the history will be | ||
| processed (calculating metadata, balance, etc) by the connection instance. | ||
| Best block updates will be processed as usual. | ||
|
|
||
| #### Events | ||
|
|
||
| - `new-tx` and `update-tx` | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Semi-unrelated to line number, but remember to add a ping/pong mechanism
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The document of the event queue and the implementation did not have ping/pong anywhere but from what I understand it comes with the websocket lib we use, so I'll add the ping/pong to the design
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Autobahn answers the PING frame with a PONG and also disconnects when no messages are received in a window. It also doesn't send a PING, so the implementation must be done in the client |
||
| - The event data will have the transaction id. // or data | ||
| - `best-block-update` | ||
| - The event data will have the new height | ||
| - `conn_state` | ||
| - Event data is a `ConnectionState`, so the wallet can know if it is receiving events in real | ||
| time or not. | ||
| - `sync-history-partial-update` | ||
| - This is so the wallet facade can update the UI when we are waiting for the | ||
| wallet to load. | ||
| - Will have 3 stages: `syncing`, `processing` and `fetching-tokens`. | ||
| - `syncing` means that the wallet is still fetching the history from the | ||
| fullnode. | ||
| - `processing` means that we are calculating the balance of the wallet. | ||
| - `fetching-tokens` means we are fetching the token data for our tokens. | ||
|
|
||
| #### Methods | ||
|
|
||
| - subscribeAddresses, unsubscribeAddresse(s) | ||
| - Start/stop listening for transactions with the provided addresses. | ||
| - start | ||
| - Start the websocket and add listeners | ||
| - stop | ||
| - Close the websocket and remove listeners | ||
| - getter methods for the configuration | ||
| - getNetwork, getServerURL | ||
|
|
||
| The other methods will be internal. | ||
|
|
||
|
|
||
| #### pubsub vs event queue | ||
|
|
||
| These classes have the same methods and the same events because they are meant | ||
| to be used by the same wallet facade (i.e. `HathorWallet`). | ||
| The difference between them is the underlying websocket class and how it is | ||
| managed. | ||
|
|
||
| The events are also derived in a very different way, the pubsub will receive | ||
| only transactions from the wallet but the event queue will receive all events | ||
| from the fullnode and will have to filter them locally. | ||
|
|
||
| Even with the different implementations since we will expose a common interface | ||
| the facade will be able to work with both of them interchangeably. | ||
|
|
||
| ### WalletServiceConnection | ||
|
|
||
| The wallet-service connection does not have to handle transaction processing | ||
| since all of this is handled by the wallet-service. | ||
| The main concern of this connection is to re-emit events from the wallet-service | ||
| and to signal the wallet facade that it needs to update the "new addresses" list. | ||
|
|
||
| #### Events | ||
|
|
||
| - `new-tx` and `update-tx` | ||
| - The event data will have the transaction id. // or data | ||
| - `conn_state` | ||
| - Event data is a `ConnectionState`, so the wallet can know if it is receiving events in real | ||
| time or not. | ||
| - `reload-data` | ||
| - When the connection becomes available after it went offline this event will | ||
| be emitted to signal that the data needs to be reloaded. | ||
|
|
||
| #### Methods | ||
|
|
||
| - start | ||
| - stop | ||
| - setWalletId | ||
| - getter methods for the configuration | ||
|
|
||
| The other methods will be internal. | ||
|
|
||
| ### FullnodeDataConnection | ||
|
|
||
| This is the only connection class not meant to be used by a wallet facade but to | ||
| be used by an external client to get real-time events from the fullnode, an | ||
| example of how to use this is the explorer main page, to update with new | ||
| transactions and blocks. | ||
|
|
||
| #### Events | ||
|
|
||
| - `network-update` | ||
| - network events are related to peers of the connected node or when a new tx | ||
| is accepted by the network. | ||
| - `dashboard` | ||
| - Dashboard data from the fullnode. | ||
|
|
||
| ## Changes to the wallet facade | ||
|
|
||
| The wallet facade will be updated to use the new connection classes. | ||
| This means that instead of receiving a connection instance the wallet facade | ||
| will receive the connection params and options (i.e. `connection_params`) and | ||
| will instantiate the appropriate connection class. | ||
|
|
||
| The wallet-service facade can only use the `WalletServiceConnection`, but the | ||
| `HathorWallet` will need an aditional parameter to choose which connection class | ||
| to use. | ||
| The `connection_mode` argument will be introduced to resolve this issue, it will | ||
| be one of the following: | ||
|
|
||
| - `pubsub` | ||
| - Uses `FullnodePubSubConnection` | ||
| - `event_queue` | ||
| - Uses `FullnodeEventQueueConnection` | ||
| - `auto` | ||
|
|
||
| If not provided, the `pubsub` mode will be used, since this is the most common | ||
| connection used by our wallets. | ||
|
|
||
| Ideally the `auto` mode would decide to use either `FullnodePubSubConnection` or | ||
| `FullnodeEventQueueConnection` based on the size of the wallet and other | ||
| parameters, but until we have a better strategy it will simply use | ||
| `FullnodePubSubConnection`. | ||
|
|
||
| ## FullnodeEventQueueWSConnection connection managementstate | ||
|
|
||
| Now that the connection class is responsible for syncing the history, we can | ||
| implement a special logic for the event queue. | ||
| When we are reestablishing the connection we can start from the last | ||
| acknowledged event. | ||
|
|
||
| Usually when the connection is lost we will need to clean and sync the history | ||
| of transactions from the beginning since we may have lost some events, but the | ||
| event queue allows us to start streaming events from the moment we lost | ||
| connection. | ||
| This makes it so we don't have to clean the history and can just start the | ||
| stream from where we left off. | ||
| Although this is only applicable when we are connected to the same fullnode, so | ||
| we need to check if the fullnode we are connected to is the same and if it isn't | ||
| we will need to re-sync as usual. | ||
|
|
||
| To make sure we are connected to the same fullnode we will use the peer-id (32 byte identifier) and the `stream_id` (uuid4). | ||
Uh oh!
There was an error while loading. Please reload this page.