@@ -105,6 +105,7 @@ impl UtxoFuture {
105105 pub fn resolve < L : Deref > ( & self , graph : & NetworkGraph < L > , result : Result < TxOut , UtxoLookupError > )
106106 where L :: Target : Logger {
107107 let announcement = {
108+ let mut pending_checks = graph. pending_checks . internal . lock ( ) . unwrap ( ) ;
108109 let mut async_messages = self . state . lock ( ) . unwrap ( ) ;
109110
110111 if async_messages. channel_announce . is_none ( ) {
@@ -114,6 +115,12 @@ impl UtxoFuture {
114115 async_messages. complete = Some ( result) ;
115116 return ;
116117 }
118+ let announcement_msg = match async_messages. channel_announce . as_ref ( ) . unwrap ( ) {
119+ ChannelAnnouncement :: Full ( signed_msg) => & signed_msg. contents ,
120+ ChannelAnnouncement :: Unsigned ( msg) => & msg,
121+ } ;
122+
123+ pending_checks. lookup_completed ( announcement_msg, & Arc :: downgrade ( & self . state ) ) ;
117124
118125 async_messages. channel_announce . take ( ) . unwrap ( )
119126 } ;
@@ -134,13 +141,87 @@ impl UtxoFuture {
134141 }
135142}
136143
144+ struct PendingChecksContext {
145+ channels : HashMap < u64 , Weak < Mutex < UtxoMessages > > > ,
146+ }
147+
148+ impl PendingChecksContext {
149+ fn lookup_completed ( & mut self ,
150+ msg : & msgs:: UnsignedChannelAnnouncement , completed_state : & Weak < Mutex < UtxoMessages > >
151+ ) {
152+ if let hash_map:: Entry :: Occupied ( e) = self . channels . entry ( msg. short_channel_id ) {
153+ if Weak :: ptr_eq ( e. get ( ) , & completed_state) {
154+ e. remove ( ) ;
155+ }
156+ }
157+ }
158+ }
159+
137160/// A set of messages which are pending UTXO lookups for processing.
138161pub ( super ) struct PendingChecks {
162+ internal : Mutex < PendingChecksContext > ,
139163}
140164
141165impl PendingChecks {
142166 pub ( super ) fn new ( ) -> Self {
143- PendingChecks { }
167+ PendingChecks { internal : Mutex :: new ( PendingChecksContext {
168+ channels : HashMap :: new ( ) ,
169+ } ) }
170+ }
171+
172+ fn check_replace_previous_entry ( msg : & msgs:: UnsignedChannelAnnouncement ,
173+ full_msg : Option < & msgs:: ChannelAnnouncement > , replacement : Option < Weak < Mutex < UtxoMessages > > > ,
174+ pending_channels : & mut HashMap < u64 , Weak < Mutex < UtxoMessages > > >
175+ ) -> Result < ( ) , msgs:: LightningError > {
176+ match pending_channels. entry ( msg. short_channel_id ) {
177+ hash_map:: Entry :: Occupied ( mut e) => {
178+ // There's already a pending lookup for the given SCID. Check if the messages
179+ // are the same and, if so, return immediately (don't bother spawning another
180+ // lookup if we haven't gotten that far yet).
181+ match Weak :: upgrade ( & e. get ( ) ) {
182+ Some ( pending_msgs) => {
183+ let pending_matches = match & pending_msgs. lock ( ) . unwrap ( ) . channel_announce {
184+ Some ( ChannelAnnouncement :: Full ( pending_msg) ) => Some ( pending_msg) == full_msg,
185+ Some ( ChannelAnnouncement :: Unsigned ( pending_msg) ) => pending_msg == msg,
186+ None => {
187+ // This shouldn't actually be reachable. We set the
188+ // `channel_announce` field under the same lock as setting the
189+ // channel map entry. Still, we can just treat it as
190+ // non-matching and let the new request fly.
191+ debug_assert ! ( false ) ;
192+ false
193+ } ,
194+ } ;
195+ if pending_matches {
196+ return Err ( LightningError {
197+ err : "Channel announcement is already being checked" . to_owned ( ) ,
198+ action : ErrorAction :: IgnoreDuplicateGossip ,
199+ } ) ;
200+ } else {
201+ // The earlier lookup is a different message. If we have another
202+ // request in-flight now replace the original.
203+ // Note that in the replace case whether to replace is somewhat
204+ // arbitrary - both results will be handled, we're just updating the
205+ // value that will be compared to future lookups with the same SCID.
206+ if let Some ( item) = replacement {
207+ * e. get_mut ( ) = item;
208+ }
209+ }
210+ } ,
211+ None => {
212+ // The earlier lookup already resolved. We can't be sure its the same
213+ // so just remove/replace it and move on.
214+ if let Some ( item) = replacement {
215+ * e. get_mut ( ) = item;
216+ } else { e. remove ( ) ; }
217+ } ,
218+ }
219+ } ,
220+ hash_map:: Entry :: Vacant ( v) => {
221+ if let Some ( item) = replacement { v. insert ( item) ; }
222+ } ,
223+ }
224+ Ok ( ( ) )
144225 }
145226
146227 pub ( super ) fn check_channel_announcement < U : Deref > ( & self ,
@@ -177,6 +258,9 @@ impl PendingChecks {
177258 }
178259 } ;
179260
261+ Self :: check_replace_previous_entry ( msg, full_msg, None ,
262+ & mut self . internal . lock ( ) . unwrap ( ) . channels ) ?;
263+
180264 match utxo_lookup {
181265 & None => {
182266 // Tentatively accept, potentially exposing us to DoS attacks
@@ -186,12 +270,15 @@ impl PendingChecks {
186270 match utxo_lookup. get_utxo ( & msg. chain_hash , msg. short_channel_id ) {
187271 UtxoResult :: Sync ( res) => handle_result ( res) ,
188272 UtxoResult :: Async ( future) => {
273+ let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
189274 let mut async_messages = future. state . lock ( ) . unwrap ( ) ;
190275 if let Some ( res) = async_messages. complete . take ( ) {
191276 // In the unlikely event the future resolved before we managed to get it,
192277 // handle the result in-line.
193278 handle_result ( res)
194279 } else {
280+ Self :: check_replace_previous_entry ( msg, full_msg,
281+ Some ( Arc :: downgrade ( & future. state ) ) , & mut pending_checks. channels ) ?;
195282 async_messages. channel_announce = Some (
196283 if let Some ( msg) = full_msg { ChannelAnnouncement :: Full ( msg. clone ( ) ) }
197284 else { ChannelAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
0 commit comments