diff --git a/blockchain/claimtrie.go b/blockchain/claimtrie.go index f821537ad5..790c7e760d 100644 --- a/blockchain/claimtrie.go +++ b/blockchain/claimtrie.go @@ -35,7 +35,7 @@ func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view ht := block.Height() for _, tx := range block.Transactions() { - h := handler{ht, tx, view, map[string][]byte{}} + h := handler{ht, tx, view, map[change.ClaimID][]byte{}} if err := h.handleTxIns(b.claimTrie); err != nil { return err } @@ -67,7 +67,7 @@ type handler struct { ht int32 tx *btcutil.Tx view *UtxoViewpoint - spent map[string][]byte + spent map[change.ClaimID][]byte } func (h *handler) handleTxIns(ct *claimtrie.ClaimTrie) error { @@ -171,6 +171,9 @@ func (b *BlockChain) GetClaimsForName(height int32, name string) (string, *node. n, err := b.claimTrie.NodeAt(height, normalizedName) if err != nil { + if n != nil { + n.Close() + } return string(normalizedName), nil, err } diff --git a/claimtrie/change/claimid.go b/claimtrie/change/claimid.go index e7a9256577..b05f498310 100644 --- a/claimtrie/change/claimid.go +++ b/claimtrie/change/claimid.go @@ -39,8 +39,8 @@ func NewIDFromString(s string) (id ClaimID, err error) { } // Key is for in-memory maps -func (id ClaimID) Key() string { - return string(id[:]) +func (id ClaimID) Key() ClaimID { + return id } // String is for anything written to a DB diff --git a/claimtrie/claimtrie_test.go b/claimtrie/claimtrie_test.go index 61194f1ad8..ebd62f03cc 100644 --- a/claimtrie/claimtrie_test.go +++ b/claimtrie/claimtrie_test.go @@ -133,6 +133,7 @@ func TestNormalizationFork(t *testing.T) { r.NoError(err) r.NotNil(n.BestClaim) r.Equal(int32(1), n.TakenOverAt) + n.Close() o8 := wire.OutPoint{Hash: hash, Index: 8} err = ct.AddClaim([]byte("aN˃EJO"), o8, change.NewClaimID(o8), 8) @@ -150,6 +151,7 @@ func TestNormalizationFork(t *testing.T) { n, err = ct.nodeManager.NodeAt(ct.nodeManager.Height(), []byte("test")) r.NoError(err) r.Equal(int64(18), n.BestClaim.Amount+n.SupportSums[n.BestClaim.ClaimID.Key()]) + n.Close() } func TestActivationsOnNormalizationFork(t *testing.T) { @@ -229,6 +231,7 @@ func verifyBestIndex(t *testing.T, ct *ClaimTrie, name string, idx uint32, claim if claims > 0 { r.Equal(idx, n.BestClaim.OutPoint.Index) } + n.Close() } func TestRebuild(t *testing.T) { diff --git a/claimtrie/cmd/cmd/node.go b/claimtrie/cmd/cmd/node.go index 08112e9446..008340d2ff 100644 --- a/claimtrie/cmd/cmd/node.go +++ b/claimtrie/cmd/cmd/node.go @@ -52,10 +52,11 @@ func NewNodeDumpCommand() *cobra.Command { } defer repo.Close() - changes, err := repo.LoadChanges([]byte(name)) + changes, closer, err := repo.LoadChanges([]byte(name)) if err != nil { return errors.Wrapf(err, "load commands") } + defer closer() for _, chg := range changes { if chg.Height > height { @@ -107,6 +108,7 @@ func NewNodeReplayCommand() *cobra.Command { } showNode(n) + n.Close() return nil }, } diff --git a/claimtrie/node/cache.go b/claimtrie/node/cache.go index 0f556af1a2..1ae9caa7c0 100644 --- a/claimtrie/node/cache.go +++ b/claimtrie/node/cache.go @@ -3,6 +3,7 @@ package node import ( "container/list" "sync" + "sync/atomic" "github.com/lbryio/lbcd/claimtrie/change" ) @@ -27,8 +28,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) { nc.mtx.Lock() defer nc.mtx.Unlock() + atomic.AddInt32(&n.refcnt, 1) + existing := nc.nodes[key] if existing != nil { + existing.node.Close() existing.node = n existing.height = height existing.changes = nil @@ -38,8 +42,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) { for nc.order.Len() >= nc.limit { // TODO: maybe ensure that we don't remove nodes that have a lot of changes? - delete(nc.nodes, nc.order.Back().Value.(string)) + exp := nc.order.Back().Value.(string) + expired := nc.nodes[exp] + delete(nc.nodes, exp) nc.order.Remove(nc.order.Back()) + expired.node.Close() } element := nc.order.PushFront(key) @@ -55,6 +62,7 @@ func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32 existing := nc.nodes[key] if existing != nil && existing.height <= height { nc.order.MoveToFront(existing.element) + atomic.AddInt32(&existing.node.refcnt, 1) return existing.node, existing.changes, existing.height } return nil, nil, -1 @@ -84,6 +92,7 @@ func (nc *Cache) drop(names [][]byte) { // we can't roll it backwards because we don't know its previous height value; just toast it delete(nc.nodes, key) nc.order.Remove(existing.element) + existing.node.Close() } } } @@ -91,6 +100,9 @@ func (nc *Cache) drop(names [][]byte) { func (nc *Cache) clear() { nc.mtx.Lock() defer nc.mtx.Unlock() + for _, existing := range nc.nodes { + existing.node.Close() + } nc.nodes = map[string]*cacheLeaf{} nc.order = list.New() // we'll let the GC sort out the remains... diff --git a/claimtrie/node/claim.go b/claimtrie/node/claim.go index 09a7ed088f..87838f0f38 100644 --- a/claimtrie/node/claim.go +++ b/claimtrie/node/claim.go @@ -4,6 +4,7 @@ import ( "bytes" "strconv" "strings" + "sync" "github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/claimtrie/change" @@ -32,6 +33,12 @@ type Claim struct { Sequence int32 `msgpack:",omitempty"` } +func newClaim() interface{} { + return &Claim{} +} + +var claimPool = sync.Pool{New: newClaim} + func (c *Claim) setOutPoint(op wire.OutPoint) *Claim { c.OutPoint = op return c diff --git a/claimtrie/node/hashfork_manager.go b/claimtrie/node/hashfork_manager.go index bbd814eecb..f43a190c9f 100644 --- a/claimtrie/node/hashfork_manager.go +++ b/claimtrie/node/hashfork_manager.go @@ -15,6 +15,7 @@ func (nm *HashV2Manager) computeClaimHashes(name []byte) (*chainhash.Hash, int32 if err != nil || n == nil { return nil, 0 } + defer n.Close() n.SortClaimsByBid() claimHashes := make([]*chainhash.Hash, 0, len(n.Claims)) diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index 31ba0f1a20..924dacb250 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -53,10 +53,11 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { n, changes, oldHeight := nm.cache.fetch(name, height) if n == nil { - changes, err := nm.repo.LoadChanges(name) + changes, closer, err := nm.repo.LoadChanges(name) if err != nil { return nil, errors.Wrap(err, "in load changes") } + defer closer() if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block changes = append(changes, nm.tempChanges[string(name)]...) @@ -64,6 +65,9 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { n, err = nm.newNodeFromChanges(changes, height) if err != nil { + if n != nil { + n.Close() + } return nil, errors.Wrap(err, "in new node") } // TODO: how can we tell what needs to be cached? @@ -74,9 +78,14 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block changes = append(changes, nm.tempChanges[string(name)]...) } + old := n n = n.Clone() + old.Close() updated, err := nm.updateFromChanges(n, changes, height) if err != nil { + if n != nil { + n.Close() + } return nil, errors.Wrap(err, "in update from changes") } if !updated { @@ -121,6 +130,7 @@ func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, heigh delay := nm.getDelayForName(n, chg) err := n.ApplyChange(chg, delay) if err != nil { + n.Close() return false, errors.Wrap(err, "in apply change") } } @@ -128,6 +138,7 @@ func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, heigh if count <= 0 { // we applied no changes, which means we shouldn't exist if we had all the changes // or might mean nothing significant if we are applying a partial changeset + n.Close() return false, nil } lastChange := changes[count-1] @@ -417,10 +428,13 @@ func (nm *BaseManager) hasChildren(name []byte, height int32, spentChildren map[ return true // children that are spent in the same block cannot count as active children } n, _ := nm.newNodeFromChanges(changes, height) - if n != nil && n.HasActiveBestClaim() { - c[changes[0].Name[len(name)]] = true - if len(c) >= required { - return false + if n != nil { + defer n.Close() + if n.HasActiveBestClaim() { + c[changes[0].Name[len(name)]] = true + if len(c) >= required { + return false + } } } return true diff --git a/claimtrie/node/manager_test.go b/claimtrie/node/manager_test.go index c907bb4c3c..bb13296c63 100644 --- a/claimtrie/node/manager_test.go +++ b/claimtrie/node/manager_test.go @@ -147,6 +147,7 @@ func TestNodeSort(t *testing.T) { r.True(OutPointLess(*out1, *out3)) n := New() + defer n.Close() n.Claims = append(n.Claims, &Claim{OutPoint: *out1, AcceptedAt: 3, Amount: 3, ClaimID: change.ClaimID{1}}) n.Claims = append(n.Claims, &Claim{OutPoint: *out2, AcceptedAt: 3, Amount: 3, ClaimID: change.ClaimID{2}}) n.handleExpiredAndActivated(3) @@ -167,6 +168,7 @@ func TestClaimSort(t *testing.T) { param.ActiveParams.ExtendedClaimExpirationTime = 1000 n := New() + defer n.Close() n.Claims = append(n.Claims, &Claim{OutPoint: *out2, AcceptedAt: 3, Amount: 3, ClaimID: change.ClaimID{2}, Status: Activated}) n.Claims = append(n.Claims, &Claim{OutPoint: *out3, AcceptedAt: 3, Amount: 2, ClaimID: change.ClaimID{3}, Status: Activated}) n.Claims = append(n.Claims, &Claim{OutPoint: *out3, AcceptedAt: 4, Amount: 2, ClaimID: change.ClaimID{4}, Status: Activated}) diff --git a/claimtrie/node/node.go b/claimtrie/node/node.go index ff45fc11e1..4568cb6187 100644 --- a/claimtrie/node/node.go +++ b/claimtrie/node/node.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "sort" + "sync/atomic" "github.com/lbryio/lbcd/claimtrie/change" "github.com/lbryio/lbcd/claimtrie/param" @@ -14,18 +15,45 @@ type Node struct { TakenOverAt int32 // The height at when the current BestClaim took over. Claims ClaimList // List of all Claims. Supports ClaimList // List of all Supports, including orphaned ones. - SupportSums map[string]int64 + SupportSums map[change.ClaimID]int64 + refcnt int32 } // New returns a new node. func New() *Node { - return &Node{SupportSums: map[string]int64{}} + return &Node{SupportSums: map[change.ClaimID]int64{}, refcnt: 1} } func (n *Node) HasActiveBestClaim() bool { return n.BestClaim != nil && n.BestClaim.Status == Activated } +func (n *Node) close() { + n.BestClaim = nil + n.SupportSums = nil + + for i := range n.Claims { + claimPool.Put(n.Claims[i]) + } + n.Claims = nil + + for i := range n.Supports { + claimPool.Put(n.Supports[i]) + } + n.Supports = nil +} + +func (n *Node) Close() { + new := atomic.AddInt32(&n.refcnt, -1) + if new < 0 { + panic("node refcnt underflow") + } + if new > 0 { + return + } + n.close() +} + func (n *Node) ApplyChange(chg change.Change, delay int32) error { visibleAt := chg.VisibleHeight @@ -35,17 +63,19 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error { switch chg.Type { case change.AddClaim: - c := &Claim{ - OutPoint: chg.OutPoint, - Amount: chg.Amount, - ClaimID: chg.ClaimID, - // CreatedAt: chg.Height, - AcceptedAt: chg.Height, - ActiveAt: chg.Height + delay, - VisibleAt: visibleAt, - Sequence: int32(len(n.Claims)), - } - // old := n.Claims.find(byOut(chg.OutPoint)) // TODO: remove this after proving ResetHeight works + c := claimPool.Get().(*Claim) + // set all 8 fields on c as they aren't initialized to 0: + c.Status = Accepted + c.OutPoint = chg.OutPoint + c.Amount = chg.Amount + c.ClaimID = chg.ClaimID + // CreatedAt: chg.Height, + c.AcceptedAt = chg.Height + c.ActiveAt = chg.Height + delay + c.VisibleAt = visibleAt + c.Sequence = int32(len(n.Claims)) + // removed this after proving ResetHeight works: + // old := n.Claims.find(byOut(chg.OutPoint)) // if old != nil { // return errors.Errorf("CONFLICT WITH EXISTING TXO! Name: %s, Height: %d", chg.Name, chg.Height) // } @@ -63,7 +93,6 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error { // 'two' at 481100, 36a719a156a1df178531f3c712b8b37f8e7cc3b36eea532df961229d936272a1:0 case change.UpdateClaim: - // Find and remove the claim, which has just been spent. c := n.Claims.find(byID(chg.ClaimID)) if c != nil && c.Status == Deactivated { @@ -82,14 +111,18 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error { LogOnce(fmt.Sprintf("Updating claim but missing existing claim with ID %s", chg.ClaimID)) } case change.AddSupport: - n.Supports = append(n.Supports, &Claim{ - OutPoint: chg.OutPoint, - Amount: chg.Amount, - ClaimID: chg.ClaimID, - AcceptedAt: chg.Height, - ActiveAt: chg.Height + delay, - VisibleAt: visibleAt, - }) + s := claimPool.Get().(*Claim) + // set all 8 fields on s: + s.Status = Accepted + s.OutPoint = chg.OutPoint + s.Amount = chg.Amount + s.ClaimID = chg.ClaimID + s.AcceptedAt = chg.Height + s.ActiveAt = chg.Height + delay + s.VisibleAt = visibleAt + s.Sequence = int32(len(n.Supports)) + + n.Supports = append(n.Supports, s) case change.SpendSupport: s := n.Supports.find(byOut(chg.OutPoint)) @@ -166,7 +199,7 @@ func (n *Node) handleExpiredAndActivated(height int32) int { } changes := 0 - update := func(items ClaimList, sums map[string]int64) ClaimList { + update := func(items ClaimList, sums map[change.ClaimID]int64) ClaimList { for i := 0; i < len(items); i++ { c := items[i] if c.Status == Accepted && c.ActiveAt <= height && c.VisibleAt <= height { @@ -343,19 +376,19 @@ func (n *Node) SortClaimsByBid() { func (n *Node) Clone() *Node { clone := New() if n.SupportSums != nil { - clone.SupportSums = map[string]int64{} + clone.SupportSums = map[change.ClaimID]int64{} for key, value := range n.SupportSums { clone.SupportSums[key] = value } } clone.Supports = make(ClaimList, len(n.Supports)) for i, support := range n.Supports { - clone.Supports[i] = &Claim{} + clone.Supports[i] = claimPool.Get().(*Claim) *clone.Supports[i] = *support } clone.Claims = make(ClaimList, len(n.Claims)) for i, claim := range n.Claims { - clone.Claims[i] = &Claim{} + clone.Claims[i] = claimPool.Get().(*Claim) *clone.Claims[i] = *claim } clone.TakenOverAt = n.TakenOverAt diff --git a/claimtrie/node/noderepo/noderepo_test.go b/claimtrie/node/noderepo/noderepo_test.go index fb0a9764b8..c634ea5690 100644 --- a/claimtrie/node/noderepo/noderepo_test.go +++ b/claimtrie/node/noderepo/noderepo_test.go @@ -92,8 +92,9 @@ func testNodeRepo(t *testing.T, repo node.Repo, setup, cleanup func()) { err := repo.AppendChanges(tt.changes) r.NoError(err) - changes, err := repo.LoadChanges(testNodeName1) + changes, closer, err := repo.LoadChanges(testNodeName1) r.NoError(err) + defer closer() r.Equalf(tt.expected, changes[:len(tt.expected)], tt.name) cleanup() @@ -150,8 +151,9 @@ func testNodeRepo(t *testing.T, repo node.Repo, setup, cleanup func()) { r.NoError(err) } - changes, err := repo.LoadChanges(testNodeName1) + changes, closer, err := repo.LoadChanges(testNodeName1) r.NoError(err) + defer closer() r.Equalf(tt.expected, changes[:len(tt.expected)], tt.name) cleanup() diff --git a/claimtrie/node/noderepo/pebble.go b/claimtrie/node/noderepo/pebble.go index 32ca2c0412..63333a55d0 100644 --- a/claimtrie/node/noderepo/pebble.go +++ b/claimtrie/node/noderepo/pebble.go @@ -112,11 +112,11 @@ func (repo *Pebble) AppendChanges(changes []change.Change) error { return errors.Wrap(batch.Commit(pebble.NoSync), "in commit") } -func (repo *Pebble) LoadChanges(name []byte) ([]change.Change, error) { +func (repo *Pebble) LoadChanges(name []byte) ([]change.Change, func(), error) { data, closer, err := repo.db.Get(name) if err != nil && err != pebble.ErrNotFound { - return nil, errors.Wrapf(err, "in get %s", name) // does returning a name in an error expose too much? + return nil, nil, errors.Wrapf(err, "in get %s", name) // does returning a name in an error expose too much? } if closer != nil { defer closer.Close() @@ -125,9 +125,16 @@ func (repo *Pebble) LoadChanges(name []byte) ([]change.Change, error) { return unmarshalChanges(name, data) } -func unmarshalChanges(name, data []byte) ([]change.Change, error) { - // data is 84bytes+ per change - changes := make([]change.Change, 0, len(data)/84+1) // average is 5.1 changes +var changesPool = sync.Pool{ + New: func() interface{} { + return make([]change.Change, 0, 6) + }, +} + +func unmarshalChanges(name, data []byte) ([]change.Change, func(), error) { + // data is 84bytes+ per change, average is 5.1 changes + changes := changesPool.Get().([]change.Change)[:0] + closer := func() { changesPool.Put(changes) } buffer := bytes.NewBuffer(data) sortNeeded := false @@ -135,7 +142,8 @@ func unmarshalChanges(name, data []byte) ([]change.Change, error) { var chg change.Change err := chg.Unmarshal(buffer) if err != nil { - return nil, errors.Wrap(err, "in decode") + closer() + return nil, nil, errors.Wrap(err, "in decode") } chg.Name = name if len(changes) > 0 && chg.Height < changes[len(changes)-1].Height { @@ -150,14 +158,17 @@ func unmarshalChanges(name, data []byte) ([]change.Change, error) { return changes[i].Height < changes[j].Height }) } - return changes, nil + + return changes, closer, nil } func (repo *Pebble) DropChanges(name []byte, finalHeight int32) error { - changes, err := repo.LoadChanges(name) + changes, closer, err := repo.LoadChanges(name) if err != nil { return errors.Wrapf(err, "in load changes for %s", name) } + defer closer() + buffer := bytes.NewBuffer(nil) for i := 0; i < len(changes); i++ { // assuming changes are ordered by height if changes[i].Height > finalHeight { @@ -206,10 +217,11 @@ func (repo *Pebble) IterateChildren(name []byte, f func(changes []change.Change) for iter.First(); iter.Valid(); iter.Next() { // NOTE! iter.Key() is ephemeral! - changes, err := unmarshalChanges(iter.Key(), iter.Value()) + changes, closer, err := unmarshalChanges(iter.Key(), iter.Value()) if err != nil { return errors.Wrapf(err, "from unmarshaller at %s", iter.Key()) } + defer closer() if !f(changes) { break } diff --git a/claimtrie/node/normalizing_manager.go b/claimtrie/node/normalizing_manager.go index 604fa34d41..ba2ae922a5 100644 --- a/claimtrie/node/normalizing_manager.go +++ b/claimtrie/node/normalizing_manager.go @@ -72,6 +72,7 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3 if err != nil || n == nil { return true } + defer n.Close() for _, c := range n.Claims { nm.Manager.AppendChange(change.Change{ Type: change.AddClaim, diff --git a/claimtrie/node/repo.go b/claimtrie/node/repo.go index 4aaa65e894..a180db213c 100644 --- a/claimtrie/node/repo.go +++ b/claimtrie/node/repo.go @@ -13,7 +13,9 @@ type Repo interface { // LoadChanges loads changes of a node up to (includes) the specified height. // If no changes found, both returned slice and error will be nil. - LoadChanges(name []byte) ([]change.Change, error) + // The returned closer func() should be called to release changes after + // work on them is finished. + LoadChanges(name []byte) (changes []change.Change, closer func(), err error) DropChanges(name []byte, finalHeight int32) error diff --git a/rpcclaimtrie.go b/rpcclaimtrie.go index 11706920ea..6999f8cd2a 100644 --- a/rpcclaimtrie.go +++ b/rpcclaimtrie.go @@ -106,6 +106,7 @@ func handleGetClaimsForName(s *rpcServer, cmd interface{}, _ <-chan struct{}) (i Message: "Message: " + err.Error(), } } + defer n.Close() var results []btcjson.ClaimResult for i := range n.Claims { @@ -140,6 +141,7 @@ func handleGetClaimsForNameByID(s *rpcServer, cmd interface{}, _ <-chan struct{} Message: "Message: " + err.Error(), } } + defer n.Close() var results []btcjson.ClaimResult for i := 0; i < len(n.Claims); i++ { @@ -179,6 +181,7 @@ func handleGetClaimsForNameByBid(s *rpcServer, cmd interface{}, _ <-chan struct{ Message: "Message: " + err.Error(), } } + defer n.Close() var results []btcjson.ClaimResult for _, b := range c.Bids { // claims are already sorted in bid order @@ -215,6 +218,7 @@ func handleGetClaimsForNameBySeq(s *rpcServer, cmd interface{}, _ <-chan struct{ Message: "Message: " + err.Error(), } } + defer n.Close() sm := map[int32]bool{} for _, seq := range c.Sequences {