diff --git a/filter.go b/filter.go index e755e1c..0c5c60b 100644 --- a/filter.go +++ b/filter.go @@ -5,27 +5,29 @@ import ( "context" "fmt" "path" + "reflect" "github.com/0xsequence/ethwal/storage" "github.com/0xsequence/ethwal/storage/local" "github.com/RoaringBitmap/roaring/v2/roaring64" ) -type Filter interface { - Eval(ctx context.Context) FilterIterator -} +// Filter is an interface that defines the methods to filter blocks +// based on the index data. +type Filter[T any] interface { + // Filter blocks inner data based on the filter criteria. + Filter(block Block[T]) Block[T] + + // IndexIterator returns the iterator for the filter. + IndexIterator(ctx context.Context) *IndexIterator -type FilterIterator interface { - HasNext() bool - Next() (uint64, uint16) - Peek() (uint64, uint16) - Bitmap() *roaring64.Bitmap + bitmap(block Block[T]) *roaring64.Bitmap } -type FilterBuilder interface { - And(filters ...Filter) Filter - Or(filters ...Filter) Filter - Eq(index string, key string) Filter +type FilterBuilder[T any] interface { + And(filters ...Filter[T]) Filter[T] + Or(filters ...Filter[T]) Filter[T] + Eq(index string, key string) Filter[T] } type FilterBuilderOptions[T any] struct { @@ -47,7 +49,7 @@ type filterBuilder[T any] struct { fs storage.FS } -func NewFilterBuilder[T any](opt FilterBuilderOptions[T]) (FilterBuilder, error) { +func NewFilterBuilder[T any](opt FilterBuilderOptions[T]) (FilterBuilder[T], error) { // apply default options on uninitialized fields opt = opt.WithDefaults() @@ -60,29 +62,54 @@ func NewFilterBuilder[T any](opt FilterBuilderOptions[T]) (FilterBuilder, error) }, nil } -type filter struct { - resultSet func(ctx context.Context) *roaring64.Bitmap +type filter[T any] struct { + blockBitmap func(ctx context.Context) *roaring64.Bitmap + dataIndexBitmapFunc func(block Block[T]) *roaring64.Bitmap } -func (c *filter) Eval(ctx context.Context) FilterIterator { - if c.resultSet == nil { - c.resultSet = func(ctx context.Context) *roaring64.Bitmap { +func (c *filter[T]) IndexIterator(ctx context.Context) *IndexIterator { + if c.blockBitmap == nil { + c.blockBitmap = func(ctx context.Context) *roaring64.Bitmap { return roaring64.New() } } - return newFilterIterator(c.resultSet(ctx)) + return NewIndexIterator(c.blockBitmap(ctx)) +} + +func (c *filter[T]) Filter(block Block[T]) Block[T] { + dataIndexesBitmap := c.dataIndexBitmapFunc(block) + dataIndexes := dataIndexesBitmap.ToArray() + if len(dataIndexes) == 1 && dataIndexes[0] == IndexAllDataIndexes { + return block + } + + if dType := reflect.TypeOf(block.Data); dType.Kind() == reflect.Slice || dType.Kind() == reflect.Array { + newData := reflect.Indirect(reflect.New(dType)) + for _, dataIndex := range dataIndexes { + newData = reflect.Append(newData, reflect.ValueOf(block.Data).Index(int(dataIndex))) + } + block.Data = newData.Interface().(T) + } + return block +} + +func (c *filter[T]) bitmap(block Block[T]) *roaring64.Bitmap { + if c.dataIndexBitmapFunc == nil { + return roaring64.New() + } + return c.dataIndexBitmapFunc(block) } -func (c *filterBuilder[T]) And(filters ...Filter) Filter { - return &filter{ - resultSet: func(ctx context.Context) *roaring64.Bitmap { +func (c *filterBuilder[T]) And(conds ...Filter[T]) Filter[T] { + return &filter[T]{ + blockBitmap: func(ctx context.Context) *roaring64.Bitmap { var bmap *roaring64.Bitmap - for _, filter := range filters { - if filter == nil { + for _, cond := range conds { + if cond == nil { continue } - iter := filter.Eval(ctx) + iter := cond.IndexIterator(ctx) if bmap == nil { bmap = iter.Bitmap().Clone() } else { @@ -91,19 +118,31 @@ func (c *filterBuilder[T]) And(filters ...Filter) Filter { } return bmap }, + dataIndexBitmapFunc: func(block Block[T]) *roaring64.Bitmap { + var bmap *roaring64.Bitmap + for _, cond := range conds { + condBitmap := cond.bitmap(block) + if bmap == nil { + bmap = condBitmap.Clone() + } else { + bmap.And(condBitmap) + } + } + return bmap + }, } } -func (c *filterBuilder[T]) Or(filters ...Filter) Filter { - return &filter{ - resultSet: func(ctx context.Context) *roaring64.Bitmap { +func (c *filterBuilder[T]) Or(conds ...Filter[T]) Filter[T] { + return &filter[T]{ + blockBitmap: func(ctx context.Context) *roaring64.Bitmap { var bmap *roaring64.Bitmap - for _, filter := range filters { - if filter == nil { + for _, cond := range conds { + if cond == nil { continue } - iter := filter.Eval(ctx) + iter := cond.IndexIterator(ctx) if bmap == nil { bmap = iter.Bitmap().Clone() } else { @@ -112,13 +151,24 @@ func (c *filterBuilder[T]) Or(filters ...Filter) Filter { } return bmap }, + dataIndexBitmapFunc: func(block Block[T]) *roaring64.Bitmap { + var bmap *roaring64.Bitmap + for _, cond := range conds { + condBitmap := cond.bitmap(block) + if bmap == nil { + bmap = condBitmap.Clone() + } else { + bmap.Or(condBitmap) + } + } + return bmap + }, } } -func (c *filterBuilder[T]) Eq(index string, key string) Filter { - - return &filter{ - resultSet: func(ctx context.Context) *roaring64.Bitmap { +func (c *filterBuilder[T]) Eq(index string, key string) Filter[T] { + return &filter[T]{ + blockBitmap: func(ctx context.Context) *roaring64.Bitmap { // fetch the index file and include it in the result set index_ := IndexName(index).Normalize() idx, ok := c.indexes[index_] @@ -132,36 +182,19 @@ func (c *filterBuilder[T]) Eq(index string, key string) Filter { } return bitmap }, - } -} - -type filterIterator struct { - iter roaring64.IntPeekable64 - bitmap *roaring64.Bitmap -} + dataIndexBitmapFunc: func(block Block[T]) *roaring64.Bitmap { + index_ := IndexName(index).Normalize() + idx, ok := c.indexes[index_] + if !ok { + return roaring64.New() + } -func newFilterIterator(bmap *roaring64.Bitmap) FilterIterator { - return &filterIterator{ - iter: bmap.Iterator(), - bitmap: bmap, + indexUpdate, _ := idx.IndexBlock(context.Background(), nil, block) + bitmap, ok := indexUpdate.DataIndexBitmap[IndexedValue(key)] + if !ok { + return roaring64.New() + } + return bitmap + }, } } - -func (f *filterIterator) HasNext() bool { - return f.iter.HasNext() -} - -func (f *filterIterator) Next() (uint64, uint16) { - // TODO: how to handle if there's no next? - val := f.iter.Next() - return IndexCompoundID(val).Split() -} - -func (f *filterIterator) Peek() (uint64, uint16) { - val := f.iter.PeekNext() - return IndexCompoundID(val).Split() -} - -func (f *filterIterator) Bitmap() *roaring64.Bitmap { - return f.bitmap -} diff --git a/filter_test.go b/filter_test.go index f733895..c2074b1 100644 --- a/filter_test.go +++ b/filter_test.go @@ -182,7 +182,7 @@ func indexOnlyEvenBlocks(block Block[[]int]) (toIndex bool, indexValueMap map[In } if toIndex { - indexValueMap["true"] = []uint16{math.MaxUint16} + indexValueMap["true"] = []uint16{IndexAllDataIndexes} } return @@ -202,7 +202,7 @@ func indexOnlyOddBlocks(block Block[[]int]) (toIndex bool, indexValueMap map[Ind } if toIndex { - indexValueMap["true"] = []uint16{math.MaxUint16} + indexValueMap["true"] = []uint16{IndexAllDataIndexes} } return @@ -251,12 +251,6 @@ func indexNone(block Block[[]int]) (toIndex bool, indexValueMap map[IndexedValue return false, nil, nil } -func TestMaxMagicCompoundID(t *testing.T) { - id := NewIndexCompoundID(uint64(math.Exp2(48)-1), math.MaxUint16) - assert.Equal(t, uint64(math.Exp2(48)-1), id.BlockNumber()) - assert.Equal(t, uint16(math.MaxUint16), id.DataIndex()) -} - func TestIntMixFiltering(t *testing.T) { _, indexes, _, cleanup, err := setupMockData(generateMixedIntIndexes, generateMixedIntBlocks) assert.NoError(t, err) @@ -284,7 +278,7 @@ func TestIntMixFiltering(t *testing.T) { "555", "111", } - var numberFilter Filter + var numberFilter Filter[[]int] for _, number := range numbersIdxs { if numberFilter == nil { numberFilter = f.Eq("all", number) @@ -293,30 +287,27 @@ func TestIntMixFiltering(t *testing.T) { } } - onlyEvenResults := onlyEvenFilter.Eval(context.Background()) + onlyEvenResults := onlyEvenFilter.IndexIterator(context.Background()) assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20) - for _, id := range onlyEvenResults.Bitmap().ToArray() { - block, _ := IndexCompoundID(id).Split() + for _, block := range onlyEvenResults.Bitmap().ToArray() { assert.True(t, block <= 20) } - onlyOddResults := onlyOddFilter.Eval(context.Background()) + onlyOddResults := onlyOddFilter.IndexIterator(context.Background()) assert.Len(t, onlyOddResults.Bitmap().ToArray(), 20+20) - for _, id := range onlyOddResults.Bitmap().ToArray() { - block, _ := IndexCompoundID(id).Split() + for _, block := range onlyOddResults.Bitmap().ToArray() { assert.True(t, (block > 20 && block < 41) || (block > 50 && block < 71)) } - numberAllResults := numberFilter.Eval(context.Background()) - // 20*20 - assert.Len(t, numberAllResults.Bitmap().ToArray(), 400) - for _, id := range numberAllResults.Bitmap().ToArray() { - block, _ := IndexCompoundID(id).Split() + numberAllResults := numberFilter.IndexIterator(context.Background()) + // 20 + assert.Len(t, numberAllResults.Bitmap().ToArray(), 20) + for _, block := range numberAllResults.Bitmap().ToArray() { assert.True(t, block > 50 && block < 71) } allNumberAndOdd := f.And(numberFilter, oddFilter) - allNumberOddResults := allNumberAndOdd.Eval(context.Background()) + allNumberOddResults := allNumberAndOdd.IndexIterator(context.Background()) assert.ElementsMatch(t, numberAllResults.Bitmap().ToArray(), allNumberOddResults.Bitmap().ToArray()) } @@ -331,20 +322,20 @@ func TestFiltering(t *testing.T) { }) assert.NoError(t, err) assert.NotNil(t, f) - result := f.Or(f.And(f.Eq("all", "1"), f.Eq("all", "2")), f.Eq("all", "3")).Eval(context.Background()) + result := f.Or(f.And(f.Eq("all", "1"), f.Eq("all", "2")), f.Eq("all", "3")).IndexIterator(context.Background()) // result should contain block 1, 2, 3 assert.Len(t, result.Bitmap().ToArray(), 3) - block, _ := result.Next() + block := result.Next() assert.Equal(t, uint64(1), block) - block, _ = result.Next() + block = result.Next() assert.Equal(t, uint64(2), block) - block, _ = result.Next() + block = result.Next() assert.Equal(t, uint64(3), block) - result = f.And(f.Eq("all", "1"), f.Eq("all", "2")).Eval(context.Background()) + result = f.And(f.Eq("all", "1"), f.Eq("all", "2")).IndexIterator(context.Background()) // result should contain block 1 assert.Len(t, result.Bitmap().ToArray(), 1) - block, _ = result.Next() + block = result.Next() assert.Equal(t, uint64(1), block) } diff --git a/index.go b/index.go index 6530b2c..d3d76c6 100644 --- a/index.go +++ b/index.go @@ -25,25 +25,6 @@ const IndexAllDataIndexes = math.MaxUint16 // The function should return a map of index values to positions in the block. type IndexFunction[T any] func(block Block[T]) (toIndex bool, indexValueMap map[IndexedValue][]uint16, err error) -// IndexCompoundID is a compound ID for an index. It is a combination of the block number and the index within the block. -type IndexCompoundID uint64 - -func NewIndexCompoundID(blockNum uint64, dataIndex uint16) IndexCompoundID { - return IndexCompoundID(uint64(blockNum<<16 | uint64(dataIndex))) -} - -func (i IndexCompoundID) BlockNumber() uint64 { - return (uint64(i) & 0xFFFFFFFFFFFF0000) >> 16 -} - -func (i IndexCompoundID) DataIndex() uint16 { - return uint16(i) & 0xFFFF -} - -func (i IndexCompoundID) Split() (uint64, uint16) { - return i.BlockNumber(), i.DataIndex() -} - // IndexName is the name of an index. type IndexName string @@ -56,16 +37,17 @@ type IndexedValue string // IndexUpdate is a map of indexed values and their corresponding bitmaps. type IndexUpdate struct { - Data map[IndexedValue]*roaring64.Bitmap - LastBlockNum uint64 + BlockBitmap map[IndexedValue]*roaring64.Bitmap + DataIndexBitmap map[IndexedValue]*roaring64.Bitmap + LastBlockNum uint64 } func (u *IndexUpdate) Merge(update *IndexUpdate) { - for indexValue, bm := range update.Data { - if _, ok := u.Data[indexValue]; !ok { - u.Data[indexValue] = roaring64.New() + for indexValue, bm := range update.BlockBitmap { + if _, ok := u.BlockBitmap[indexValue]; !ok { + u.BlockBitmap[indexValue] = roaring64.New() } - u.Data[indexValue].Or(bm) + u.BlockBitmap[indexValue].Or(bm) } if u.LastBlockNum < update.LastBlockNum { @@ -109,13 +91,15 @@ func (i *Index[T]) Fetch(ctx context.Context, fs storage.FS, indexValue IndexedV } func (i *Index[T]) IndexBlock(ctx context.Context, fs storage.FS, block Block[T]) (*IndexUpdate, error) { - numBlocksIndexed, err := i.LastBlockNumIndexed(ctx, fs) - if err != nil { - return nil, fmt.Errorf("unexpected: failed to get number of blocks indexed: %w", err) - } + if fs != nil { + numBlocksIndexed, err := i.LastBlockNumIndexed(ctx, fs) + if err != nil { + return nil, fmt.Errorf("unexpected: failed to get number of blocks indexed: %w", err) + } - if block.Number <= numBlocksIndexed { - return nil, nil + if block.Number <= numBlocksIndexed { + return nil, nil + } } toIndex, indexValueMap, err := i.indexFunc(block) @@ -126,31 +110,36 @@ func (i *Index[T]) IndexBlock(ctx context.Context, fs storage.FS, block Block[T] return &IndexUpdate{LastBlockNum: block.Number}, nil } - indexValueCompoundMap := make(map[IndexedValue][]IndexCompoundID) - for indexValue, positions := range indexValueMap { - if _, ok := indexValueMap[indexValue]; !ok { - indexValueCompoundMap[indexValue] = make([]IndexCompoundID, 0) - } - for _, pos := range positions { - indexValueCompoundMap[indexValue] = append(indexValueCompoundMap[indexValue], NewIndexCompoundID(block.Number, pos)) + indexValueCompoundMap := make(map[IndexedValue]uint64) + for indexValue, _ := range indexValueMap { + if _, ok := indexValueCompoundMap[indexValue]; !ok { + indexValueCompoundMap[indexValue] = block.Number } } indexUpdate := &IndexUpdate{ - Data: make(map[IndexedValue]*roaring64.Bitmap), - LastBlockNum: block.Number, + BlockBitmap: make(map[IndexedValue]*roaring64.Bitmap), + DataIndexBitmap: make(map[IndexedValue]*roaring64.Bitmap), + LastBlockNum: block.Number, } - for indexValue, indexIDs := range indexValueCompoundMap { - bm, ok := indexUpdate.Data[indexValue] + for indexValue, blockNumber := range indexValueCompoundMap { + bm, ok := indexUpdate.BlockBitmap[indexValue] if !ok { bm = roaring64.New() - indexUpdate.Data[indexValue] = bm + indexUpdate.BlockBitmap[indexValue] = bm } + bm.Add(blockNumber) - for _, indexID := range indexIDs { - bm.Add(uint64(indexID)) + dataIndexBM, ok := indexUpdate.DataIndexBitmap[indexValue] + if !ok { + dataIndexBM = roaring64.New() + indexUpdate.DataIndexBitmap[indexValue] = dataIndexBM + } + for _, dataIndex := range indexValueMap[indexValue] { + dataIndexBM.Add(uint64(dataIndex)) } } + return indexUpdate, nil } @@ -163,7 +152,7 @@ func (i *Index[T]) Store(ctx context.Context, fs storage.FS, indexUpdate *IndexU return nil } - for indexValue, bmUpdate := range indexUpdate.Data { + for indexValue, bmUpdate := range indexUpdate.BlockBitmap { if bmUpdate.IsEmpty() { continue } @@ -270,3 +259,47 @@ func indexPath(index string, indexValue string) string { fmt.Sprintf("%s.idx", indexValue), // filename ) } + +type IndexIterator struct { + bm *roaring64.Bitmap + iter roaring64.IntIterable64 +} + +func NewIndexIterator(bitmap *roaring64.Bitmap) *IndexIterator { + return &IndexIterator{bm: bitmap, iter: bitmap.Iterator()} +} + +func (i *IndexIterator) First() bool { + i.iter = i.bm.Iterator() + if i.iter.HasNext() { + return true + } + return false +} + +func (i *IndexIterator) Last() bool { + i.iter = i.bm.ReverseIterator() + if i.iter.HasNext() { + return true + } + return false +} + +func (i *IndexIterator) HasNext() bool { + return i.iter.HasNext() +} + +func (i *IndexIterator) Next() uint64 { + return i.iter.Next() +} + +func (i *IndexIterator) Peek() uint64 { + if peekable, ok := i.iter.(roaring64.IntPeekable64); ok { + return peekable.PeekNext() + } + return math.MaxUint64 +} + +func (i *IndexIterator) Bitmap() *roaring64.Bitmap { + return i.bm +} diff --git a/indexer.go b/indexer.go index 6deeb1d..e385e0f 100644 --- a/indexer.go +++ b/indexer.go @@ -52,7 +52,7 @@ func NewIndexer[T any](ctx context.Context, opt IndexerOptions[T]) (*Indexer[T], return nil, fmt.Errorf("Indexer.NewIndexer: failed to get last block number indexed for %s: %w", index.Name(), err) } - indexMaps[index.name] = &IndexUpdate{Data: make(map[IndexedValue]*roaring64.Bitmap), LastBlockNum: lastBlockNum} + indexMaps[index.name] = &IndexUpdate{BlockBitmap: make(map[IndexedValue]*roaring64.Bitmap), LastBlockNum: lastBlockNum} } return &Indexer[T]{ @@ -88,7 +88,7 @@ func (i *Indexer[T]) EstimatedBatchSize() datasize.ByteSize { var size datasize.ByteSize = 0 for _, indexUpdate := range i.indexUpdates { - for _, bm := range indexUpdate.Data { + for _, bm := range indexUpdate.BlockBitmap { size += datasize.ByteSize(bm.GetSizeInBytes()) } } @@ -123,7 +123,7 @@ func (i *Indexer[T]) Flush(ctx context.Context) error { // clear indexUpdates for _, index := range i.indexes { - i.indexUpdates[index.name].Data = make(map[IndexedValue]*roaring64.Bitmap) + i.indexUpdates[index.name].BlockBitmap = make(map[IndexedValue]*roaring64.Bitmap) } return nil } diff --git a/reader_with_filter.go b/reader_with_filter.go index df906b3..1820778 100644 --- a/reader_with_filter.go +++ b/reader_with_filter.go @@ -3,19 +3,18 @@ package ethwal import ( "context" "io" - "reflect" ) type readerWithFilter[T any] struct { lastBlockNum uint64 reader Reader[T] - filter Filter - iterator FilterIterator + filter Filter[T] + iterator *IndexIterator } var _ Reader[any] = (*readerWithFilter[any])(nil) -func NewReaderWithFilter[T any](reader Reader[T], filter Filter) (Reader[T], error) { +func NewReaderWithFilter[T any](reader Reader[T], filter Filter[T]) (Reader[T], error) { return &readerWithFilter[T]{ reader: reader, filter: filter, @@ -31,9 +30,9 @@ func (c *readerWithFilter[T]) FileIndex() *FileIndex { } func (c *readerWithFilter[T]) Seek(ctx context.Context, blockNum uint64) error { - iter := c.filter.Eval(ctx) + iter := c.filter.IndexIterator(ctx) for iter.HasNext() { - nextBlock, _ := iter.Peek() + nextBlock := iter.Peek() if nextBlock >= blockNum { break } @@ -51,7 +50,7 @@ func (c *readerWithFilter[T]) BlockNum() uint64 { func (c *readerWithFilter[T]) Read(ctx context.Context) (Block[T], error) { // Lazy init iterator if c.iterator == nil { - c.iterator = c.filter.Eval(ctx) + c.iterator = c.filter.IndexIterator(ctx) } // Check if there are no more blocks to read @@ -60,19 +59,7 @@ func (c *readerWithFilter[T]) Read(ctx context.Context) (Block[T], error) { } // Collect all data indexes for the block - blockNum, dataIndex := c.iterator.Next() - dataIndexes := []uint16{dataIndex} - - doFilter := dataIndex != IndexAllDataIndexes - for c.iterator.HasNext() { - nextBlockNum, nextDataIndex := c.iterator.Peek() - if blockNum != nextBlockNum { - break - } - - _, _ = c.iterator.Next() - dataIndexes = append(dataIndexes, nextDataIndex) - } + blockNum := c.iterator.Next() // Seek to the block err := c.reader.Seek(ctx, blockNum) @@ -85,14 +72,8 @@ func (c *readerWithFilter[T]) Read(ctx context.Context) (Block[T], error) { return Block[T]{}, err } - // Filter the block data - if dType := reflect.TypeOf(block.Data); doFilter && (dType.Kind() == reflect.Slice || dType.Kind() == reflect.Array) { - newData := reflect.Indirect(reflect.New(dType)) - for _, dataIndex := range dataIndexes { - newData = reflect.Append(newData, reflect.ValueOf(block.Data).Index(int(dataIndex))) - } - block.Data = newData.Interface().(T) - } + // Filter + block = c.filter.Filter(block) c.lastBlockNum = blockNum return block, nil