Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path"
"sort"
Expand All @@ -22,6 +23,8 @@ import (
"github.com/c2h5oh/datasize"
)

const NoBlockNum = uint64(math.MaxUint64)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This choice might not represent a risk in terms of feasibility, because is such a high number that it won't be reached naturally in a very long time, but it's hard to manage: in order to write proper conditions we'll have to remember things like NoBlockNum > 0, and anyLegitimateBlock < NoBlockNum, or 0 <= anyLegitimateBlock < NoBlockNum

We would have to do mental gymnastics to use it properly:

if blockNum != NoBlockNum {  // ok, no problem, this is readable
 
if blockNum > startBlock {  // you forgot to include `blockNum != NoBlockNum`
    
if blockNum != NoBlockNum && blockNum > startBlock { // this would be the right way to check
    
// simple conditions would become complex
if blockNum >= minBlock && blockNum <= maxBlock { // this actually works, but by accident because we're not checking `NoBlockNum`

if currentBlock - blockNum < 10 { // no NoBlockNum check, and might even lead to underflow

If the intent is to represent block number hasn’t been set, maybe we could use a nil pointer, or a new descriptive type:

type BlockNumber struct{}

func (b BlockNumber) IsValid() bool { … }
func (b BlockNumber) Uint64() uint64 { … }

// or just

func (b BlockNumber) Value() (uint64, bool) { … }

My point is that this technically works, but the cost is extra cognitive load because it's counterintuitive, I'd suggest to rethink this proposal.


type Dataset struct {
Name string
Version string
Expand Down
2 changes: 0 additions & 2 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func (o FilterBuilderOptions[T]) WithDefaults() FilterBuilderOptions[T] {
}

type filterBuilder[T any] struct {
ctx context.Context

indexes map[IndexName]Index[T]
fs storage.FS
}
Expand Down
6 changes: 3 additions & 3 deletions filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func generateMixedIntBlocks() []Block[[]int] {
// 45-49 generate 5 blocks with no data
// 50-69 generate 20 blocks with random but repeating huge numbers

for i := 1; i <= 20; i++ {
for i := 0; i <= 20; i++ {
blocks = append(blocks, Block[[]int]{
Hash: common.BytesToHash([]byte{byte(i)}),
Number: uint64(i),
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestIntMixFiltering(t *testing.T) {
}

onlyEvenResults := onlyEvenFilter.IndexIterator(context.Background())
assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 20)
assert.Len(t, onlyEvenResults.Bitmap().ToArray(), 21)
for _, block := range onlyEvenResults.Bitmap().ToArray() {
assert.True(t, block <= 20)
}
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestLowestIndexedBlockNum(t *testing.T) {
})
assert.NoError(t, err)
lowestBlockIndexed = indexer.BlockNum()
assert.Equal(t, uint64(0), lowestBlockIndexed)
assert.Equal(t, NoBlockNum, lowestBlockIndexed)
blocks := generateIntBlocks()
for _, block := range blocks[:50] {
err = indexer.Index(context.Background(), block)
Expand Down
14 changes: 7 additions & 7 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (u *IndexUpdate) Merge(update *IndexUpdate) {
u.BlockBitmap[indexValue].Or(bm)
}

if u.LastBlockNum < update.LastBlockNum {
if u.LastBlockNum == NoBlockNum || u.LastBlockNum < update.LastBlockNum {
u.LastBlockNum = update.LastBlockNum
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func (i *Index[T]) IndexBlock(ctx context.Context, fs storage.FS, block Block[T]
return nil, fmt.Errorf("unexpected: failed to get number of blocks indexed: %w", err)
}

if block.Number <= numBlocksIndexed {
if numBlocksIndexed != NoBlockNum && block.Number <= numBlocksIndexed {
return nil, nil
}
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (i *Index[T]) Store(ctx context.Context, fs storage.FS, indexUpdate *IndexU
if err != nil {
return fmt.Errorf("failed to get number of blocks indexed: %w", err)
}
if lastBlockNumIndexed >= indexUpdate.LastBlockNum {
if lastBlockNumIndexed != NoBlockNum && lastBlockNumIndexed >= indexUpdate.LastBlockNum {
return nil
}

Expand Down Expand Up @@ -191,19 +191,19 @@ func (i *Index[T]) LastBlockNumIndexed(ctx context.Context, fs storage.FS) (uint
file, err := fs.Open(ctx, indexedBlockNumFilePath(string(i.name)), nil)
if err != nil {
// file doesn't exist
return 0, nil
return NoBlockNum, nil
}
defer file.Close()

buf, err := io.ReadAll(file)
if err != nil {
return 0, fmt.Errorf("failed to read IndexBlock file: %w", err)
return NoBlockNum, fmt.Errorf("failed to read IndexBlock file: %w", err)
}

var numBlocksIndexed uint64
err = binary.Read(bytes.NewReader(buf), binary.BigEndian, &numBlocksIndexed)
if err != nil {
return 0, fmt.Errorf("failed to unmarshal bitmap: %w", err)
return NoBlockNum, fmt.Errorf("failed to unmarshal bitmap: %w", err)
}

i.numBlocksIndexed = &atomic.Uint64{}
Expand All @@ -219,7 +219,7 @@ func (i *Index[T]) storeLastBlockNumIndexed(ctx context.Context, fs storage.FS,
prevBlockIndexed = blocksIndexed
}

if prevBlockIndexed >= numBlocksIndexed {
if prevBlockIndexed != NoBlockNum && prevBlockIndexed >= numBlocksIndexed {
return nil
}

Expand Down
11 changes: 8 additions & 3 deletions indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,21 @@ func (i *Indexer[T]) BlockNum() uint64 {
i.mu.Lock()
defer i.mu.Unlock()

// initialize lowestBlockNum to math.MaxUint64
var lowestBlockNum uint64 = math.MaxUint64
for _, indexUpdate := range i.indexUpdates {
// if no blocks have been indexed, return NoBlockNum
if indexUpdate.LastBlockNum == NoBlockNum {
return NoBlockNum
}

// update lowestBlockNum if the current indexUpdate has a lower last block number
if indexUpdate.LastBlockNum < lowestBlockNum {
lowestBlockNum = indexUpdate.LastBlockNum
}
}

if lowestBlockNum == math.MaxUint64 {
return 0
}
// return the lowest block number indexed by all indexes
return lowestBlockNum
}

Expand Down
28 changes: 18 additions & 10 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"sync"
"time"
Expand All @@ -29,10 +30,9 @@ type Reader[T any] interface {
}

type reader[T any] struct {
options Options
path string
fs storage.FS
useCompression bool
options Options
path string
fs storage.FS

closer io.Closer

Expand Down Expand Up @@ -97,10 +97,11 @@ func NewReader[T any](opt Options) (Reader[T], error) {
}

return &reader[T]{
options: opt,
path: datasetPath,
fs: fs,
fileIndex: fileIndex,
options: opt,
path: datasetPath,
fs: fs,
fileIndex: fileIndex,
lastBlockNum: uint64(math.MaxUint64),
}, nil
}

Expand Down Expand Up @@ -140,8 +141,11 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
}
}

var block Block[T]
for structs.IsZero(block) || block.Number <= r.lastBlockNum {
var (
block Block[T]
hasSuccessfullyReadBlock bool
)
for !hasSuccessfullyReadBlock || block.Number <= r.lastBlockNum && r.lastBlockNum != math.MaxUint64 {
select {
case <-ctx.Done():
return Block[T]{}, ctx.Err()
Expand All @@ -168,6 +172,8 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
r.lastBlockNum = block.Number
}

hasSuccessfullyReadBlock = true

if !r.isBlockWithin(block) {
currentFile := r.fileIndex.At(r.currFileIndex)
return Block[T]{}, fmt.Errorf("block number %d is out of file block %d-%d range",
Expand All @@ -181,6 +187,8 @@ func (r *reader[T]) Read(ctx context.Context) (Block[T], error) {
return Block[T]{}, fmt.Errorf("failed to decode file data: %w", err)
}

hasSuccessfullyReadBlock = true

if !r.isBlockWithin(block) {
currentFile := r.fileIndex.At(r.currFileIndex)
return Block[T]{}, fmt.Errorf("block number %d is out of file block %d-%d range",
Expand Down
17 changes: 15 additions & 2 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"sync"

Expand Down Expand Up @@ -77,18 +78,26 @@ func NewWriter[T any](opt Options) (Writer[T], error) {
return nil, fmt.Errorf("failed to load file index: %w", err)
}

var firstBlockNum uint64
var lastBlockNum uint64
var fileIndexFileList = fileIndex.Files()
if len(fileIndexFileList) > 0 {
lastBlockNum = fileIndexFileList[len(fileIndexFileList)-1].LastBlockNum
}

if lastBlockNum == 0 {
firstBlockNum = NoBlockNum
lastBlockNum = NoBlockNum
} else {
firstBlockNum = lastBlockNum + 1
}

// create new writer
return &writer[T]{
options: opt,
path: datasetPath,
fs: fs,
firstBlockNum: lastBlockNum + 1,
firstBlockNum: firstBlockNum,
lastBlockNum: lastBlockNum,
fileIndex: fileIndex,
buffer: bytes.NewBuffer(make([]byte, 0, defaultFileSize)),
Expand All @@ -103,7 +112,7 @@ func (w *writer[T]) Write(ctx context.Context, b Block[T]) error {
w.mu.Lock()
defer w.mu.Unlock()

if w.lastBlockNum >= b.Number {
if w.lastBlockNum != math.MaxUint64 && w.lastBlockNum >= b.Number {
return nil
}

Expand All @@ -118,6 +127,10 @@ func (w *writer[T]) Write(ctx context.Context, b Block[T]) error {
return fmt.Errorf("failed to encode file data: %w", err)
}

if w.firstBlockNum == NoBlockNum {
w.firstBlockNum = b.Number
}

w.lastBlockNum = b.Number
w.options.FileRollPolicy.onBlockProcessed(w.lastBlockNum)
return nil
Expand Down
5 changes: 3 additions & 2 deletions writer_no_gap.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type noGapWriter[T any] struct {
}

func NewWriterNoGap[T any](w Writer[T]) Writer[T] {
return &noGapWriter[T]{w: w}
return &noGapWriter[T]{w: w, lastBlockNum: w.BlockNum()}
}

func (n *noGapWriter[T]) FileSystem() storage.FS {
Expand All @@ -24,7 +24,7 @@ func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error {
defer func() { n.lastBlockNum = b.Number }()

// skip if block number is less than or equal to last block number
if b.Number <= n.lastBlockNum {
if n.lastBlockNum != NoBlockNum && b.Number <= n.lastBlockNum {
return nil
}

Expand All @@ -40,6 +40,7 @@ func (n *noGapWriter[T]) Write(ctx context.Context, b Block[T]) error {
return err
}
}

return n.w.Write(ctx, b)
}

Expand Down
64 changes: 60 additions & 4 deletions writer_no_gap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func TestWriterNoGap(t *testing.T) {
ngw := NewWriterNoGap[int](w)
require.NotNil(t, w)

err = ngw.Write(context.Background(), Block[int]{Number: 0})
require.NoError(t, err)

err = ngw.Write(context.Background(), Block[int]{Number: 1})
require.NoError(t, err)

Expand All @@ -46,7 +49,7 @@ func TestWriterNoGap(t *testing.T) {
require.NoError(t, err)

walData, err := os.ReadFile(
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 1, LastBlockNum: 3}).Path()),
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 0, LastBlockNum: 3}).Path()),
)
require.NoError(t, err)

Expand All @@ -59,7 +62,57 @@ func TestWriterNoGap(t *testing.T) {
blockCount++
}

require.Equal(t, 3, blockCount)
require.Equal(t, 4, blockCount)
})

t.Run("gap_first_block", func(t *testing.T) {
defer testTeardown(t)

opt := Options{
Dataset: Dataset{
Name: "int-wal",
Path: testPath,
Version: defaultDatasetVersion,
},
NewEncoder: NewJSONEncoder,
}.WithDefaults()

w, err := NewWriter[int](opt)
require.NoError(t, err)

ngw := NewWriterNoGap[int](w)
require.NotNil(t, w)

err = ngw.Write(context.Background(), Block[int]{Number: 1})
require.NoError(t, err)

err = ngw.Write(context.Background(), Block[int]{Number: 2})
require.NoError(t, err)

err = ngw.Write(context.Background(), Block[int]{Number: 3})
require.NoError(t, err)

err = (w.(*writer[int])).rollFile(context.Background())
require.NoError(t, err)

err = ngw.Close(context.Background())
require.NoError(t, err)

walData, err := os.ReadFile(
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 0, LastBlockNum: 3}).Path()),
)
require.NoError(t, err)

d := NewJSONDecoder(bytes.NewBuffer(walData))

var b Block[int]
var blockCount int
for d.Decode(&b) != io.EOF {
require.NoError(t, err)
blockCount++
}

require.Equal(t, 4, blockCount)
})

t.Run("gap_3_10", func(t *testing.T) {
Expand All @@ -80,6 +133,9 @@ func TestWriterNoGap(t *testing.T) {
ngw := NewWriterNoGap[int](w)
require.NotNil(t, w)

err = ngw.Write(context.Background(), Block[int]{Number: 0})
require.NoError(t, err)

err = ngw.Write(context.Background(), Block[int]{Number: 1})
require.NoError(t, err)

Expand All @@ -98,7 +154,7 @@ func TestWriterNoGap(t *testing.T) {
require.NoError(t, err)

walData, err := os.ReadFile(
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 1, LastBlockNum: 10}).Path()),
path.Join(buildETHWALPath(opt.Dataset.Name, opt.Dataset.Version, opt.Dataset.Path), (&File{FirstBlockNum: 0, LastBlockNum: 10}).Path()),
)
require.NoError(t, err)

Expand All @@ -111,6 +167,6 @@ func TestWriterNoGap(t *testing.T) {
blockCount++
}

require.Equal(t, 10, blockCount)
require.Equal(t, 11, blockCount)
})
}
Loading
Loading