@@ -69,10 +69,48 @@ enum ChannelAnnouncement {
6969 Full ( msgs:: ChannelAnnouncement ) ,
7070 Unsigned ( msgs:: UnsignedChannelAnnouncement ) ,
7171}
72+ impl ChannelAnnouncement {
73+ fn node_id_1 ( & self ) -> & NodeId {
74+ match self {
75+ ChannelAnnouncement :: Full ( msg) => & msg. contents . node_id_1 ,
76+ ChannelAnnouncement :: Unsigned ( msg) => & msg. node_id_1 ,
77+ }
78+ }
79+ }
80+
81+ enum NodeAnnouncement {
82+ Full ( msgs:: NodeAnnouncement ) ,
83+ Unsigned ( msgs:: UnsignedNodeAnnouncement ) ,
84+ }
85+ impl NodeAnnouncement {
86+ fn timestamp ( & self ) -> u32 {
87+ match self {
88+ NodeAnnouncement :: Full ( msg) => msg. contents . timestamp ,
89+ NodeAnnouncement :: Unsigned ( msg) => msg. timestamp ,
90+ }
91+ }
92+ }
93+
94+ enum ChannelUpdate {
95+ Full ( msgs:: ChannelUpdate ) ,
96+ Unsigned ( msgs:: UnsignedChannelUpdate ) ,
97+ }
98+ impl ChannelUpdate {
99+ fn timestamp ( & self ) -> u32 {
100+ match self {
101+ ChannelUpdate :: Full ( msg) => msg. contents . timestamp ,
102+ ChannelUpdate :: Unsigned ( msg) => msg. timestamp ,
103+ }
104+ }
105+ }
72106
73107struct UtxoMessages {
74108 complete : Option < Result < TxOut , UtxoLookupError > > ,
75109 channel_announce : Option < ChannelAnnouncement > ,
110+ latest_node_announce_a : Option < NodeAnnouncement > ,
111+ latest_node_announce_b : Option < NodeAnnouncement > ,
112+ latest_channel_update_a : Option < ChannelUpdate > ,
113+ latest_channel_update_b : Option < ChannelUpdate > ,
76114}
77115
78116/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
@@ -98,13 +136,17 @@ impl UtxoFuture {
98136 Self { state : Arc :: new ( Mutex :: new ( UtxoMessages {
99137 complete : None ,
100138 channel_announce : None ,
139+ latest_node_announce_a : None ,
140+ latest_node_announce_b : None ,
141+ latest_channel_update_a : None ,
142+ latest_channel_update_b : None ,
101143 } ) ) }
102144 }
103145
104146 /// Resolves this future against the given `graph` and with the given `result`.
105147 pub fn resolve < L : Deref > ( & self , graph : & NetworkGraph < L > , result : Result < TxOut , UtxoLookupError > )
106148 where L :: Target : Logger {
107- let announcement = {
149+ let ( announcement, node_a , node_b , update_a , update_b ) = {
108150 let mut pending_checks = graph. pending_checks . internal . lock ( ) . unwrap ( ) ;
109151 let mut async_messages = self . state . lock ( ) . unwrap ( ) ;
110152
@@ -115,14 +157,19 @@ impl UtxoFuture {
115157 async_messages. complete = Some ( result) ;
116158 return ;
117159 }
160+
118161 let announcement_msg = match async_messages. channel_announce . as_ref ( ) . unwrap ( ) {
119162 ChannelAnnouncement :: Full ( signed_msg) => & signed_msg. contents ,
120163 ChannelAnnouncement :: Unsigned ( msg) => & msg,
121164 } ;
122165
123166 pending_checks. lookup_completed ( announcement_msg, & Arc :: downgrade ( & self . state ) ) ;
124167
125- async_messages. channel_announce . take ( ) . unwrap ( )
168+ ( async_messages. channel_announce . take ( ) . unwrap ( ) ,
169+ async_messages. latest_node_announce_a . take ( ) ,
170+ async_messages. latest_node_announce_b . take ( ) ,
171+ async_messages. latest_channel_update_a . take ( ) ,
172+ async_messages. latest_channel_update_b . take ( ) )
126173 } ;
127174
128175 // Now that we've updated our internal state, pass the pending messages back through the
@@ -138,11 +185,36 @@ impl UtxoFuture {
138185 let _ = graph. update_channel_from_unsigned_announcement ( & msg, & Some ( & resolver) ) ;
139186 } ,
140187 }
188+
189+ for announce in core:: iter:: once ( node_a) . chain ( core:: iter:: once ( node_b) ) {
190+ match announce {
191+ Some ( NodeAnnouncement :: Full ( signed_msg) ) => {
192+ let _ = graph. update_node_from_announcement ( & signed_msg) ;
193+ } ,
194+ Some ( NodeAnnouncement :: Unsigned ( msg) ) => {
195+ let _ = graph. update_node_from_unsigned_announcement ( & msg) ;
196+ } ,
197+ None => { } ,
198+ }
199+ }
200+
201+ for update in core:: iter:: once ( update_a) . chain ( core:: iter:: once ( update_b) ) {
202+ match update {
203+ Some ( ChannelUpdate :: Full ( signed_msg) ) => {
204+ let _ = graph. update_channel ( & signed_msg) ;
205+ } ,
206+ Some ( ChannelUpdate :: Unsigned ( msg) ) => {
207+ let _ = graph. update_channel_unsigned ( & msg) ;
208+ } ,
209+ None => { } ,
210+ }
211+ }
141212 }
142213}
143214
144215struct PendingChecksContext {
145216 channels : HashMap < u64 , Weak < Mutex < UtxoMessages > > > ,
217+ nodes : HashMap < NodeId , Vec < Weak < Mutex < UtxoMessages > > > > ,
146218}
147219
148220impl PendingChecksContext {
@@ -154,6 +226,15 @@ impl PendingChecksContext {
154226 e. remove ( ) ;
155227 }
156228 }
229+
230+ if let hash_map:: Entry :: Occupied ( mut e) = self . nodes . entry ( msg. node_id_1 ) {
231+ e. get_mut ( ) . retain ( |elem| !Weak :: ptr_eq ( & elem, & completed_state) ) ;
232+ if e. get ( ) . is_empty ( ) { e. remove ( ) ; }
233+ }
234+ if let hash_map:: Entry :: Occupied ( mut e) = self . nodes . entry ( msg. node_id_2 ) {
235+ e. get_mut ( ) . retain ( |elem| !Weak :: ptr_eq ( & elem, & completed_state) ) ;
236+ if e. get ( ) . is_empty ( ) { e. remove ( ) ; }
237+ }
157238 }
158239}
159240
@@ -165,10 +246,98 @@ pub(super) struct PendingChecks {
165246impl PendingChecks {
166247 pub ( super ) fn new ( ) -> Self {
167248 PendingChecks { internal : Mutex :: new ( PendingChecksContext {
168- channels : HashMap :: new ( ) ,
249+ channels : HashMap :: new ( ) , nodes : HashMap :: new ( ) ,
169250 } ) }
170251 }
171252
253+ /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
254+ /// and, if so, stores the channel message for handling later and returns an `Err`.
255+ pub ( super ) fn check_hold_pending_channel_update (
256+ & self , msg : & msgs:: UnsignedChannelUpdate , full_msg : Option < & msgs:: ChannelUpdate >
257+ ) -> Result < ( ) , LightningError > {
258+ let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
259+ if let hash_map:: Entry :: Occupied ( e) = pending_checks. channels . entry ( msg. short_channel_id ) {
260+ let is_from_a = ( msg. flags & 1 ) == 1 ;
261+ match Weak :: upgrade ( e. get ( ) ) {
262+ Some ( msgs_ref) => {
263+ let mut messages = msgs_ref. lock ( ) . unwrap ( ) ;
264+ let latest_update = if is_from_a {
265+ & mut messages. latest_channel_update_a
266+ } else {
267+ & mut messages. latest_channel_update_b
268+ } ;
269+ if latest_update. is_none ( ) || latest_update. as_ref ( ) . unwrap ( ) . timestamp ( ) < msg. timestamp {
270+ // If the messages we got has a higher timestamp, just blindly assume the
271+ // signatures on the new message are correct and drop the old message. This
272+ // may cause us to end up dropping valid `channel_update`s if a peer is
273+ // malicious, but we should get the correct ones when the node updates them.
274+ * latest_update = Some (
275+ if let Some ( msg) = full_msg { ChannelUpdate :: Full ( msg. clone ( ) ) }
276+ else { ChannelUpdate :: Unsigned ( msg. clone ( ) ) } ) ;
277+ }
278+ return Err ( LightningError {
279+ err : "Awaiting channel_announcement validation to accept channel_update" . to_owned ( ) ,
280+ action : ErrorAction :: IgnoreAndLog ( Level :: Gossip ) ,
281+ } ) ;
282+ } ,
283+ None => { e. remove ( ) ; } ,
284+ }
285+ }
286+ Ok ( ( ) )
287+ }
288+
289+ /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
290+ /// given node and, if so, stores the channel message for handling later and returns an `Err`.
291+ pub ( super ) fn check_hold_pending_node_announcement (
292+ & self , msg : & msgs:: UnsignedNodeAnnouncement , full_msg : Option < & msgs:: NodeAnnouncement >
293+ ) -> Result < ( ) , LightningError > {
294+ let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
295+ if let hash_map:: Entry :: Occupied ( mut e) = pending_checks. nodes . entry ( msg. node_id ) {
296+ let mut found_at_least_one_chan = false ;
297+ e. get_mut ( ) . retain ( |node_msgs| {
298+ match Weak :: upgrade ( & node_msgs) {
299+ Some ( chan_mtx) => {
300+ let mut chan_msgs = chan_mtx. lock ( ) . unwrap ( ) ;
301+ if let Some ( chan_announce) = & chan_msgs. channel_announce {
302+ let latest_announce =
303+ if * chan_announce. node_id_1 ( ) == msg. node_id {
304+ & mut chan_msgs. latest_node_announce_a
305+ } else {
306+ & mut chan_msgs. latest_node_announce_b
307+ } ;
308+ if latest_announce. is_none ( ) ||
309+ latest_announce. as_ref ( ) . unwrap ( ) . timestamp ( ) < msg. timestamp
310+ {
311+ // If the messages we got has a higher timestamp, just blindly
312+ // assume the signatures on the new message are correct and drop
313+ // the old message. This may cause us to end up dropping valid
314+ // `node_announcement`s if a peer is malicious, but we should get
315+ // the correct ones when the node updates them.
316+ * latest_announce = Some (
317+ if let Some ( msg) = full_msg { NodeAnnouncement :: Full ( msg. clone ( ) ) }
318+ else { NodeAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
319+ }
320+ found_at_least_one_chan = true ;
321+ true
322+ } else {
323+ debug_assert ! ( false , "channel_announce is set before struct is added to node map" ) ;
324+ false
325+ }
326+ } ,
327+ None => false ,
328+ }
329+ } ) ;
330+ if e. get ( ) . is_empty ( ) { e. remove ( ) ; }
331+ if found_at_least_one_chan {
332+ return Err ( LightningError {
333+ err : "Awaiting channel_announcement validation to accept node_announcement" . to_owned ( ) ,
334+ action : ErrorAction :: IgnoreAndLog ( Level :: Gossip ) ,
335+ } ) ;
336+ }
337+ }
338+ Ok ( ( ) )
339+ }
340+
172341 fn check_replace_previous_entry ( msg : & msgs:: UnsignedChannelAnnouncement ,
173342 full_msg : Option < & msgs:: ChannelAnnouncement > , replacement : Option < Weak < Mutex < UtxoMessages > > > ,
174343 pending_channels : & mut HashMap < u64 , Weak < Mutex < UtxoMessages > > >
@@ -282,6 +451,10 @@ impl PendingChecks {
282451 async_messages. channel_announce = Some (
283452 if let Some ( msg) = full_msg { ChannelAnnouncement :: Full ( msg. clone ( ) ) }
284453 else { ChannelAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
454+ pending_checks. nodes . entry ( msg. node_id_1 )
455+ . or_insert ( Vec :: new ( ) ) . push ( Arc :: downgrade ( & future. state ) ) ;
456+ pending_checks. nodes . entry ( msg. node_id_2 )
457+ . or_insert ( Vec :: new ( ) ) . push ( Arc :: downgrade ( & future. state ) ) ;
285458 Err ( LightningError {
286459 err : "Channel being checked async" . to_owned ( ) ,
287460 action : ErrorAction :: IgnoreAndLog ( Level :: Gossip ) ,
0 commit comments