-
Notifications
You must be signed in to change notification settings - Fork 6
feat: add active peer probing and a cached addr book #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
3896470
feat: add cached peer book with higher ttls
2color 7dd33ca
feat: initial implementation of active peer probing
2color 0e86ea4
feat: use the cached router
2color ec2a67a
chore: go mod tidy
2color fe68140
feat: log probe duration
2color 06c2d0c
chore: log in probe loop
2color fc76783
fix: update peer state if doesn't exist
2color e904c3e
fix: add addresses to cached address book
2color 814ae58
fix: wrap with cached router only if available
2color a4d6456
feat: make everything a little bit better
2color 81feca7
chore: small refinements
2color e75992f
test: add test for cached addr book
2color a20a4c3
chore: rename files
2color c5f1d62
feat: add options to cached addr book
2color e678be8
feat: add instrumentation
2color a0965bc
fix: thread safety
2color d82ad0f
docs: update changelog
2color a84d5f6
fix: small fixes
2color 9ab02e1
fix: simplify cached router
2color 9658af8
feat(metric): cached_router_peer_addr_lookups
lidel 7cdb5be
Apply suggestions from code review
2color 762136e
Update CHANGELOG.md
2color 2cf46d4
chore: use service name for namespace
2color a0d5c62
fix: type errors and missing imports
2color 75f1bf2
feat: add queue probe
2color 4cbaa91
Revert "feat: add queue probe"
2color d038301
chore: simplify composite literal
2color 796e94f
fix: implement custom cache fallback iterator
2color 2e4d12c
fix: add cancel and simplify
2color 811dce8
fix: move select to Val function
2color b4da9cd
fix: concurrency bug from the ongoingLookups
2color d00fcb4
chore: clean up comments
2color 6219804
fix: add lint ignores
2color 662f0d4
docs: update changelog
2color c812cf4
fix: increase bucket sizes for probe duration
2color 8646f38
chore: remove unused peer state fields
2color 46a74a3
feat: enable caching for FindPeer in cached router
2color d9601e4
fix: handle peer not found case
2color 986b010
Apply suggestions from code review
2color ecd0757
fix: wait longer during cleanup function
2color a0443d0
test: remove bitswap record test
2color 22aacd7
refactor: extract connectedness checks to a func
2color fe372ac
fix: set ttl for both signed and unsigned addrs
2color 03a4078
fix: prevent race condition
2color 84393fd
feat: use 2q-lru cache for peer state
2color d466dc7
chore: remove return count
2color 8078cb5
test: improve reliability of tests
2color 7decf6c
fix: record failed connections
2color b536e82
feat: add exponential backoff for probes/peer lookups
2color 7182699
fix: return peers with no addrs that wont probe
2color b0b24e0
fix: brittle test
2color 697457d
feat: add probed peers counter
2color 7fcf45f
fix: adjust probe duration metric buckets
2color 1718215
fix: prevent race conditions
2color dc57e9f
feat: increase cache size and add max backoff
2color c5abeec
fix: omit providers whose peer cannot be found
2color 0cc76f9
chore: remove unused function
2color f0e0bd4
deps: upgrade go-libp2p
2color 2211aae
fix: avoid using the cache in FindPeers
2color be5958a
fix: do not return cached results for FindPeers
2color af7c3a8
refactor: small optimisation
2color 62c0d9f
chore: re-add comment
2color 8b36b0c
Apply suggestions from code review
2color b58b50d
Apply suggestions from code review
2color 41922af
fix: use separate context for dispatched jobs
2color 06cef21
fix: ensure proper cleanup of cache fallback iter
2color 7a2160a
Update main.go
2color 84bc4f7
fix: formatting
2color 0c28c6b
fix: let consumer handle cleanup
2color e0a601f
fix: remove from address book when removed from peer state
2color 7f0ec50
fix: use normal lru cache instead of 2Q
2color 2e025eb
fix: update the metric when removing from the peer cache
2color 6b4b40d
fix: increase max backoff to 48 hours
2color fe7ad54
feat: add env var for recently connected ttl
2color 49efe9b
feat: add env var to control active probing
2color 8ca4d19
fix: bug from closing the iterator twice
2color 317ccb7
docs: update comment
2color 327f9cb
docs: improve changelog
2color 48e1943
test: fix background test
2color c1ac41b
feat(metrics): track online vs offline probe ratio
lidel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,354 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "io" | ||
| "sync" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| lru "github.com/hashicorp/golang-lru/v2" | ||
| "github.com/ipfs/boxo/routing/http/types" | ||
| "github.com/libp2p/go-libp2p-kad-dht/amino" | ||
| "github.com/libp2p/go-libp2p/core/event" | ||
| "github.com/libp2p/go-libp2p/core/host" | ||
| "github.com/libp2p/go-libp2p/core/network" | ||
| "github.com/libp2p/go-libp2p/core/peer" | ||
| "github.com/libp2p/go-libp2p/core/peerstore" | ||
| "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" | ||
| "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" | ||
| ma "github.com/multiformats/go-multiaddr" | ||
| manet "github.com/multiformats/go-multiaddr/net" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promauto" | ||
| ) | ||
|
|
||
| const ( | ||
| Subsystem = "cached_addr_book" | ||
|
|
||
| // The default TTL to keep recently connected peers' multiaddrs for | ||
| DefaultRecentlyConnectedAddrTTL = amino.DefaultProvideValidity | ||
|
|
||
| // Connected peers don't expire until they disconnect | ||
| ConnectedAddrTTL = peerstore.ConnectedAddrTTL | ||
|
|
||
| // How long to wait since last connection before probing a peer again | ||
| PeerProbeThreshold = time.Hour | ||
lidel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // How often to run the probe peers loop | ||
| ProbeInterval = time.Minute * 15 | ||
|
|
||
| // How many concurrent probes to run at once | ||
| MaxConcurrentProbes = 20 | ||
|
|
||
| // How long to wait for a connect in a probe to complete. | ||
| // The worst case is a peer behind a relay, so we use the relay connect timeout. | ||
| ConnectTimeout = relay.ConnectTimeout | ||
|
|
||
| // How many peers to cache in the peer state cache | ||
| // 1_000_000 is 10x the default number of signed peer records cached by the memory address book. | ||
| PeerCacheSize = 1_000_000 | ||
|
|
||
| // Maximum backoff duration for probing a peer. After this duration, we will stop | ||
| // trying to connect to the peer and remove it from the cache. | ||
| MaxBackoffDuration = amino.DefaultProvideValidity | ||
|
|
||
| probeResult = "result" | ||
| probeResultOnline = "online" | ||
| probeResultOffline = "offline" | ||
| ) | ||
|
|
||
| var ( | ||
| probeDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ | ||
| Name: "probe_duration_seconds", | ||
| Namespace: name, | ||
| Subsystem: Subsystem, | ||
| Help: "Duration of peer probing operations in seconds", | ||
| // Buckets probe durations from 5s to 15 minutes | ||
| Buckets: []float64{5, 10, 30, 60, 120, 300, 600, 900}, | ||
| }) | ||
|
|
||
| probedPeersCounter = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
| Name: "probed_peers", | ||
| Subsystem: Subsystem, | ||
| Namespace: name, | ||
| Help: "Number of peers probed", | ||
| }, | ||
| []string{probeResult}, | ||
| ) | ||
|
|
||
| peerStateSize = promauto.NewGauge(prometheus.GaugeOpts{ | ||
| Name: "peer_state_size", | ||
| Subsystem: Subsystem, | ||
| Namespace: name, | ||
| Help: "Number of peers object currently in the peer state", | ||
| }) | ||
| ) | ||
|
|
||
| type peerState struct { | ||
| lastConnTime time.Time // last time we successfully connected to this peer | ||
| lastFailedConnTime time.Time // last time we failed to find or connect to this peer | ||
| connectFailures uint // number of times we've failed to connect to this peer | ||
| } | ||
|
|
||
| type cachedAddrBook struct { | ||
| addrBook peerstore.AddrBook // memory address book | ||
| peerCache *lru.Cache[peer.ID, peerState] // LRU cache with additional metadata about peer | ||
| probingEnabled bool | ||
| isProbing atomic.Bool | ||
| allowPrivateIPs bool // for testing | ||
| recentlyConnectedTTL time.Duration | ||
| } | ||
|
|
||
| type AddrBookOption func(*cachedAddrBook) error | ||
|
|
||
| func WithAllowPrivateIPs() AddrBookOption { | ||
| return func(cab *cachedAddrBook) error { | ||
| cab.allowPrivateIPs = true | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func WithRecentlyConnectedTTL(ttl time.Duration) AddrBookOption { | ||
| return func(cab *cachedAddrBook) error { | ||
| cab.recentlyConnectedTTL = ttl | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func WithActiveProbing(enabled bool) AddrBookOption { | ||
| return func(cab *cachedAddrBook) error { | ||
| cab.probingEnabled = enabled | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func newCachedAddrBook(opts ...AddrBookOption) (*cachedAddrBook, error) { | ||
| peerCache, err := lru.New[peer.ID, peerState](PeerCacheSize) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| cab := &cachedAddrBook{ | ||
| peerCache: peerCache, | ||
| addrBook: pstoremem.NewAddrBook(), | ||
| recentlyConnectedTTL: DefaultRecentlyConnectedAddrTTL, // Set default value | ||
| } | ||
|
|
||
| for _, opt := range opts { | ||
| err := opt(cab) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| logger.Infof("Using TTL of %s for recently connected peers", cab.recentlyConnectedTTL) | ||
| logger.Infof("Probing enabled: %t", cab.probingEnabled) | ||
| return cab, nil | ||
| } | ||
|
|
||
| func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) { | ||
| sub, err := host.EventBus().Subscribe([]interface{}{ | ||
| &event.EvtPeerIdentificationCompleted{}, | ||
| &event.EvtPeerConnectednessChanged{}, | ||
| }) | ||
| if err != nil { | ||
| logger.Errorf("failed to subscribe to peer identification events: %v", err) | ||
| return | ||
| } | ||
| defer sub.Close() | ||
|
|
||
| probeTicker := time.NewTicker(ProbeInterval) | ||
| defer probeTicker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| cabCloser, ok := cab.addrBook.(io.Closer) | ||
| if ok { | ||
| errClose := cabCloser.Close() | ||
| if errClose != nil { | ||
| logger.Warnf("failed to close addr book: %v", errClose) | ||
| } | ||
| } | ||
| return | ||
| case ev := <-sub.Out(): | ||
| switch ev := ev.(type) { | ||
| case event.EvtPeerIdentificationCompleted: | ||
| pState, exists := cab.peerCache.Peek(ev.Peer) | ||
| if !exists { | ||
| pState = peerState{} | ||
| } | ||
| pState.lastConnTime = time.Now() | ||
| pState.lastFailedConnTime = time.Time{} // reset failed connection time | ||
| pState.connectFailures = 0 // reset connect failures on successful connection | ||
| cab.peerCache.Add(ev.Peer, pState) | ||
| peerStateSize.Set(float64(cab.peerCache.Len())) // update metric | ||
|
|
||
| ttl := cab.getTTL(host.Network().Connectedness(ev.Peer)) | ||
| if ev.SignedPeerRecord != nil { | ||
| logger.Debug("Caching signed peer record") | ||
| cab, ok := peerstore.GetCertifiedAddrBook(cab.addrBook) | ||
| if ok { | ||
| _, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl) | ||
| if err != nil { | ||
| logger.Warnf("failed to consume signed peer record: %v", err) | ||
| } | ||
| } | ||
| } else { | ||
| logger.Debug("No signed peer record, caching listen addresses") | ||
| // We don't have a signed peer record, so we use the listen addresses | ||
| cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ttl) | ||
| } | ||
| case event.EvtPeerConnectednessChanged: | ||
| // If the peer is not connected or limited, we update the TTL | ||
| if !hasValidConnectedness(ev.Connectedness) { | ||
| cab.addrBook.UpdateAddrs(ev.Peer, ConnectedAddrTTL, cab.recentlyConnectedTTL) | ||
| } | ||
| } | ||
| case <-probeTicker.C: | ||
| if !cab.probingEnabled { | ||
| logger.Debug("Probing disabled, skipping") | ||
| continue | ||
| } | ||
| if cab.isProbing.Load() { | ||
| logger.Debug("Skipping peer probe, still running") | ||
| continue | ||
| } | ||
| logger.Debug("Starting to probe peers") | ||
| cab.isProbing.Store(true) | ||
| go cab.probePeers(ctx, host) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Loops over all peers with addresses and probes them if they haven't been probed recently | ||
| func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) { | ||
| defer cab.isProbing.Store(false) | ||
|
|
||
| start := time.Now() | ||
| defer func() { | ||
| duration := time.Since(start).Seconds() | ||
| probeDurationHistogram.Observe(duration) | ||
| logger.Debugf("Finished probing peers in %s", duration) | ||
| }() | ||
|
|
||
| var wg sync.WaitGroup | ||
| // semaphore channel to limit the number of concurrent probes | ||
| semaphore := make(chan struct{}, MaxConcurrentProbes) | ||
|
|
||
| for i, p := range cab.addrBook.PeersWithAddrs() { | ||
| if hasValidConnectedness(host.Network().Connectedness(p)) { | ||
| continue // don't probe connected peers | ||
| } | ||
|
|
||
| if !cab.ShouldProbePeer(p) { | ||
| continue | ||
| } | ||
|
|
||
| addrs := cab.addrBook.Addrs(p) | ||
|
|
||
| if !cab.allowPrivateIPs { | ||
| addrs = ma.FilterAddrs(addrs, manet.IsPublicAddr) | ||
| } | ||
|
|
||
| if len(addrs) == 0 { | ||
| continue // no addresses to probe | ||
| } | ||
|
|
||
| wg.Add(1) | ||
| semaphore <- struct{}{} | ||
| go func() { | ||
| defer func() { | ||
| <-semaphore // Release semaphore | ||
| wg.Done() | ||
| }() | ||
| ctx, cancel := context.WithTimeout(ctx, ConnectTimeout) | ||
| defer cancel() | ||
| logger.Debugf("Probe %d: PeerID: %s, Addrs: %v", i+1, p, addrs) | ||
| // if connect succeeds and identify runs, the background loop will take care of updating the peer state and cache | ||
| err := host.Connect(ctx, peer.AddrInfo{ | ||
| ID: p, | ||
| Addrs: addrs, | ||
| }) | ||
| if err != nil { | ||
| logger.Debugf("failed to connect to peer %s: %v", p, err) | ||
| cab.RecordFailedConnection(p) | ||
| probedPeersCounter.WithLabelValues(probeResultOffline).Inc() | ||
| } else { | ||
| probedPeersCounter.WithLabelValues(probeResultOnline).Inc() | ||
| } | ||
| }() | ||
| } | ||
| wg.Wait() | ||
| } | ||
|
|
||
| // Returns the cached addresses for a peer, incrementing the return count | ||
| func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr { | ||
| cachedAddrs := cab.addrBook.Addrs(p) | ||
|
|
||
| if len(cachedAddrs) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| result := make([]types.Multiaddr, 0, len(cachedAddrs)) // convert to local Multiaddr type 🙃 | ||
| for _, addr := range cachedAddrs { | ||
| result = append(result, types.Multiaddr{Multiaddr: addr}) | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| // Update the peer cache with information about a failed connection | ||
| // This should be called when a connection attempt to a peer fails | ||
| func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) { | ||
| pState, exists := cab.peerCache.Peek(p) | ||
| if !exists { | ||
| pState = peerState{} | ||
| } | ||
| now := time.Now() | ||
| // once probing of offline peer reached MaxBackoffDuration and still failed, | ||
| // we opportunistically remove the dead peer from cache to save time on probing it further | ||
| if exists && pState.connectFailures > 1 && now.Sub(pState.lastFailedConnTime) > MaxBackoffDuration { | ||
| cab.peerCache.Remove(p) | ||
| peerStateSize.Set(float64(cab.peerCache.Len())) // update metric | ||
| // remove the peer from the addr book. Otherwise it will be probed again in the probe loop | ||
| cab.addrBook.ClearAddrs(p) | ||
| return | ||
| } | ||
| pState.lastFailedConnTime = now | ||
| pState.connectFailures++ | ||
| cab.peerCache.Add(p, pState) | ||
| } | ||
|
|
||
| // Returns true if we should probe a peer (either by dialing known addresses or by dispatching a FindPeer) | ||
| // based on the last failed connection time and connection failures | ||
| func (cab *cachedAddrBook) ShouldProbePeer(p peer.ID) bool { | ||
| pState, exists := cab.peerCache.Peek(p) | ||
| if !exists { | ||
| return true // default to probing if the peer is not in the cache | ||
| } | ||
|
|
||
| var backoffDuration time.Duration | ||
| if pState.connectFailures > 0 { | ||
| // Calculate backoff only if we have failures | ||
| // this is effectively 2^(connectFailures - 1) * PeerProbeThreshold | ||
| // A single failure results in a 1 hour backoff and each additional failure doubles the backoff | ||
| backoffDuration = PeerProbeThreshold * time.Duration(1<<(pState.connectFailures-1)) | ||
| backoffDuration = min(backoffDuration, MaxBackoffDuration) // clamp to max backoff duration | ||
| } else { | ||
| backoffDuration = PeerProbeThreshold | ||
| } | ||
|
|
||
| // Only dispatch if we've waited long enough based on the backoff | ||
| return time.Since(pState.lastFailedConnTime) > backoffDuration | ||
| } | ||
|
|
||
| func hasValidConnectedness(connectedness network.Connectedness) bool { | ||
| return connectedness == network.Connected || connectedness == network.Limited | ||
| } | ||
|
|
||
| func (cab *cachedAddrBook) getTTL(connectedness network.Connectedness) time.Duration { | ||
| if hasValidConnectedness(connectedness) { | ||
| return ConnectedAddrTTL | ||
| } | ||
| return cab.recentlyConnectedTTL | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.