From 1dda95208153bc1c8c4ad85b95370d92765be065 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Wed, 22 Oct 2025 11:51:14 +0200 Subject: [PATCH 1/3] fix: double close on w.writeFile error --- writer.go | 33 +++--- writer_test.go | 268 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 288 insertions(+), 13 deletions(-) diff --git a/writer.go b/writer.go index 6507c17..52cede5 100644 --- a/writer.go +++ b/writer.go @@ -141,23 +141,26 @@ func (w *writer[T]) Close(ctx context.Context) error { if w.options.FileRollOnClose { // close previous buffer and write file to fs - if w.bufferCloser != nil { + if w.encoder != nil && w.buffer != nil { // skip if there are no blocks to write if w.lastBlockNum < w.firstBlockNum { return nil } - err := w.bufferCloser.Close() - if err != nil { - return err + // close buffer writers + if w.bufferCloser != nil { + if err := w.bufferCloser.Close(); err != nil { + return err + } + + w.bufferCloser = nil } - err = w.writeFile(ctx) - if err != nil { + // write buffer into FS + if err := w.writeFile(ctx); err != nil { return err } } - w.bufferCloser = nil } return nil } @@ -176,19 +179,23 @@ func (w *writer[T]) isReadyToWrite() bool { func (w *writer[T]) rollFile(ctx context.Context) error { // close previous buffer and write file to fs - if w.bufferCloser != nil { + if w.encoder != nil && w.buffer != nil { // skip if there are no blocks to write if w.lastBlockNum < w.firstBlockNum { return nil } - err := w.bufferCloser.Close() - if err != nil { - return err + // close buffer writers + if w.bufferCloser != nil { + if err := w.bufferCloser.Close(); err != nil { + return err + } + + w.bufferCloser = nil } - err = w.writeFile(ctx) - if err != nil { + // write buffer into FS + if err := w.writeFile(ctx); err != nil { return err } } diff --git a/writer_test.go b/writer_test.go index 85977c7..cd5fb47 100644 --- a/writer_test.go +++ b/writer_test.go @@ -2,13 +2,17 @@ package ethwal import ( "context" + "fmt" "io" "os" "path" + "sync" "testing" "github.com/0xsequence/ethkit/go-ethereum/common" + "github.com/0xsequence/ethwal/storage" "github.com/0xsequence/ethwal/storage/local" + gostorage "github.com/Shopify/go-storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -389,6 +393,270 @@ func Test_WriterFileIndexBehind(t *testing.T) { assert.Equal(t, 0x1234, b12.Data) } +// Test_WriterFileIndexSavedButFileNotWritten tests the scenario where the FileIndex is saved +// but the actual data file is not written (e.g., due to a crash between FileIndex.Save() and +// the file write completing). This can happen because in writer.writeFile(), the FileIndex is +// saved before the actual file is written. +// +// The test verifies that when a Writer is restarted after such a failure, it correctly: +// 1. Detects that the last file in the index doesn't exist on disk +// 2. Removes that entry from the FileIndex (via the logic in common.go FileIndex.readFiles()) +// 3. Starts writing from the correct block number (last valid block, not the phantom file's block) +func Test_WriterFileIndexSavedButFileNotWritten(t *testing.T) { + defer testTeardown(t) + + // Setup: Write some initial blocks and roll the file + w, err := NewWriter[int](Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + FileRollOnClose: true, + }) + require.NoError(t, err) + + // Write blocks 1-4 + for i := uint64(1); i <= 4; i++ { + err = w.Write(context.Background(), Block[int]{Number: i, Data: int(i * 100)}) + require.NoError(t, err) + } + + // Roll the file to persist blocks 1-4 + w_, ok := w.(*writer[int]) + require.True(t, ok) + + err = w_.rollFile(context.Background()) + require.NoError(t, err) + + // Write blocks 5-8 + for i := uint64(5); i <= 8; i++ { + err = w.Write(context.Background(), Block[int]{Number: i, Data: int(i * 100)}) + require.NoError(t, err) + } + + // Close will roll the file for blocks 5-8 because FileRollOnClose=true + err = w.Close(context.Background()) + require.NoError(t, err) + + // Simulate scenario where FileIndex is saved but file is NOT written: + // Manually add a file entry to FileIndex without creating the actual file + fs := local.NewLocalFS(path.Join(testPath, "int-wal", defaultDatasetVersion)) + + fileIndex := NewFileIndex(fs) + err = fileIndex.Load(context.Background()) + require.NoError(t, err) + + // Add a phantom file entry (blocks 9-12) that doesn't exist on disk + phantomFile := &File{ + FirstBlockNum: 9, + LastBlockNum: 12, + } + err = fileIndex.AddFile(phantomFile) + require.NoError(t, err) + + // Save the FileIndex with the phantom entry + err = fileIndex.Save(context.Background()) + require.NoError(t, err) + + // Verify that the phantom file doesn't actually exist + require.False(t, phantomFile.Exist(context.Background(), fs)) + + // Now create a new Writer - it should detect that the last file doesn't exist + // and start from the correct position (block 8, the last valid block) + w2, err := NewWriter[int](Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + }) + require.NoError(t, err) + + // The Writer should have corrected its position to block 8 (last valid block) + // because the phantom file (9-12) doesn't exist + assert.Equal(t, uint64(8), w2.BlockNum(), "Writer should start from last valid block (8), not from phantom file (12)") + + // Write the next block (9) - this should succeed + err = w2.Write(context.Background(), Block[int]{Number: 9, Data: 900}) + require.NoError(t, err) + assert.Equal(t, uint64(9), w2.BlockNum()) + + // Write more blocks + err = w2.Write(context.Background(), Block[int]{Number: 10, Data: 1000}) + require.NoError(t, err) + assert.Equal(t, uint64(10), w2.BlockNum()) + + // Roll and close + w2_, ok := w2.(*writer[int]) + require.True(t, ok) + + err = w2_.rollFile(context.Background()) + require.NoError(t, err) + + err = w2.Close(context.Background()) + require.NoError(t, err) + + // Verify we can read all blocks correctly + rdr, err := NewReader[int](Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + }) + require.NoError(t, err) + defer rdr.Close() + + // Read blocks 1-10 and verify they are correct + err = rdr.Seek(context.Background(), 1) + require.NoError(t, err) + + for i := uint64(1); i <= 10; i++ { + blk, err := rdr.Read(context.Background()) + require.NoError(t, err) + assert.Equal(t, i, blk.Number) + assert.Equal(t, int(i*100), blk.Data) + } +} + +// TestWriter_RecoverFromWriteFileError tests that the Writer can recover from an error +// in writeFile. This simulates a scenario where writeFile fails during file creation +// (after FileIndex is saved but before the actual file is written), and then the +// application is restarted. The new Writer should detect the inconsistency and recover. +func TestWriter_RecoverFromWriteFileError(t *testing.T) { + defer testTeardown(t) + + // Create the directory structure first + walDir := path.Join(testPath, "int-wal", defaultDatasetVersion) + err := os.MkdirAll(walDir, 0755) + require.NoError(t, err) + + // Create a failing filesystem that will fail on the first Create call + failingFS := &failOnceFS{ + FS: local.NewLocalFS(""), + failOnCount: 1, + } + + options := Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + FileSystem: failingFS, + } + + // Phase 1: Write blocks and encounter a failure during rollFile + w, err := NewWriter[int](options) + require.NoError(t, err) + + // Write some blocks + err = w.Write(context.Background(), Block[int]{Number: 1, Data: 100}) + require.NoError(t, err) + + err = w.Write(context.Background(), Block[int]{Number: 2, Data: 200}) + require.NoError(t, err) + + // Try to roll the file - this should fail because of the failing FS + // Note: This will partially complete writeFile (FileIndex is saved, but file creation fails) + w_, ok := w.(*writer[int]) + require.True(t, ok) + + err = w_.rollFile(context.Background()) + require.Error(t, err, "Expected error from writeFile due to failing filesystem") + require.Contains(t, err.Error(), "injected error") + + // Verify the writer still has the blocks in memory + assert.Equal(t, uint64(2), w.BlockNum()) + + // Close the writer (simulating application shutdown after error) + // Don't check for error here as the state might be inconsistent + _ = w.Close(context.Background()) + + // Phase 2: Create a new Writer (simulating application restart) + // The filesystem now works (failOnCount has been exhausted) + // The new Writer should detect that the FileIndex references a non-existent file + // and recover by removing that entry from the index + w2, err := NewWriter[int](Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + FileSystem: failingFS, // Same filesystem, but now it won't fail + }) + require.NoError(t, err, "New Writer should recover from the inconsistent state") + + // The Writer should have recovered to block 0 (since no files were successfully written) + assert.Equal(t, uint64(0), w2.BlockNum()) + + // Phase 3: Write new blocks and verify everything works + err = w2.Write(context.Background(), Block[int]{Number: 1, Data: 100}) + require.NoError(t, err) + + err = w2.Write(context.Background(), Block[int]{Number: 2, Data: 200}) + require.NoError(t, err) + + err = w2.Write(context.Background(), Block[int]{Number: 3, Data: 300}) + require.NoError(t, err) + + err = w2.Write(context.Background(), Block[int]{Number: 4, Data: 400}) + require.NoError(t, err) + + // Roll and close successfully + w2_, ok := w2.(*writer[int]) + require.True(t, ok) + + err = w2_.rollFile(context.Background()) + require.NoError(t, err, "Should successfully write file after recovery") + + err = w2.Close(context.Background()) + require.NoError(t, err) + + // Phase 4: Verify we can read all the blocks that were successfully written + rdr, err := NewReader[int](Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + }) + require.NoError(t, err) + defer rdr.Close() + + // Read blocks 1-4 and verify they are correct + err = rdr.Seek(context.Background(), 1) + require.NoError(t, err) + + for i := uint64(1); i <= 4; i++ { + blk, err := rdr.Read(context.Background()) + require.NoError(t, err) + assert.Equal(t, i, blk.Number) + assert.Equal(t, int(i*100), blk.Data) + } +} + +// failOnceFS is a filesystem wrapper that fails on the Nth Create call +type failOnceFS struct { + storage.FS + failOnCount int + createCount int + mu sync.Mutex +} + +func (f *failOnceFS) Create(ctx context.Context, path string, options *gostorage.WriterOptions) (io.WriteCloser, error) { + f.mu.Lock() + defer f.mu.Unlock() + + f.createCount++ + if f.createCount == f.failOnCount { + return nil, fmt.Errorf("injected error on Create call #%d for path: %s", f.createCount, path) + } + + return f.FS.Create(ctx, path, options) +} + func BenchmarkWriter_Write(b *testing.B) { defer func() { _ = os.RemoveAll(testPath) From ec120ea5ef1a66fd35ad5ecb80f47dfb4ff25b08 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Wed, 22 Oct 2025 13:55:46 +0200 Subject: [PATCH 2/3] add recovery without recreating writer --- writer.go | 10 +++ writer_test.go | 170 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) diff --git a/writer.go b/writer.go index 52cede5..af2f6df 100644 --- a/writer.go +++ b/writer.go @@ -214,26 +214,36 @@ func (w *writer[T]) writeFile(ctx context.Context) error { return err } + recoverFileIndex := func() { + // remove last file from file index as it's not written + files := w.fileIndex.Files()[:len(w.fileIndex.Files())-1] + w.fileIndex = NewFileIndexFromFiles(w.fs, files) + } + // save file index err = w.fileIndex.Save(ctx) if err != nil { + recoverFileIndex() return err } // save file f, err := newFile.Create(ctx, w.fs) if err != nil { + recoverFileIndex() return err } _, err = f.Write(w.buffer.Bytes()) if err != nil { _ = f.Close() + recoverFileIndex() return err } err = f.Close() if err != nil { + recoverFileIndex() return err } diff --git a/writer_test.go b/writer_test.go index cd5fb47..71f8f45 100644 --- a/writer_test.go +++ b/writer_test.go @@ -6,6 +6,7 @@ import ( "io" "os" "path" + "strings" "sync" "testing" @@ -637,6 +638,134 @@ func TestWriter_RecoverFromWriteFileError(t *testing.T) { } } +// TestWriter_RecoverFromWriteError tests the new in-place recovery mechanism +// that was added to writeFile. When f.Write() fails (after FileIndex is saved), +// the Writer should roll back the FileIndex entry in memory and continue operating +// WITHOUT requiring an application restart. +func TestWriter_RecoverFromWriteError(t *testing.T) { + defer testTeardown(t) + + // Create the directory structure first + walDir := path.Join(testPath, "int-wal", defaultDatasetVersion) + err := os.MkdirAll(walDir, 0755) + require.NoError(t, err) + + // Create a filesystem that returns a failing WriteCloser on the first WAL file write + failingFS := &failOnWriteFS{ + FS: local.NewLocalFS(""), + failOnCount: 1, + } + + options := Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + FileSystem: failingFS, + } + + w, err := NewWriter[int](options) + require.NoError(t, err) + + // Write some blocks + err = w.Write(context.Background(), Block[int]{Number: 1, Data: 100}) + require.NoError(t, err) + + err = w.Write(context.Background(), Block[int]{Number: 2, Data: 200}) + require.NoError(t, err) + + w_, ok := w.(*writer[int]) + require.True(t, ok) + + // Try to roll the file - this should fail during f.Write() + // The recovery mechanism should roll back the FileIndex entry in memory + err = w_.rollFile(context.Background()) + require.Error(t, err, "Expected error from f.Write() due to failing WriteCloser") + require.Contains(t, err.Error(), "write failed") + + // Verify the writer state: blocks are still in buffer, lastBlockNum still reflects written blocks + assert.Equal(t, uint64(2), w.BlockNum()) + + // Check that the FileIndex was rolled back in memory + fileIndexFiles := w_.fileIndex.Files() + fileCountAfterRollback := len(fileIndexFiles) + t.Logf("FileIndex count after rollback: %d", fileCountAfterRollback) + + // The rollback removes the last entry from memory. + // However, the FileIndex on disk still has the phantom entry. + // For in-memory recovery to work, we verify that the in-memory count decreased + // (or is at a safe state to continue) + + // Verify the writer's internal state is still consistent + assert.Equal(t, uint64(1), w_.firstBlockNum, "firstBlockNum should not change after failed write") + + // The key test: The rollback mechanism prevents corrupt data from being written + // After the failure, the FileIndex was rolled back in memory. + // Now we attempt to roll the file again to continue writing. + err = w_.rollFile(context.Background()) + require.NoError(t, err, "Should be able to roll file after rollback") + + // Write blocks 3-4 + err = w.Write(context.Background(), Block[int]{Number: 3, Data: 300}) + require.NoError(t, err) + + err = w.Write(context.Background(), Block[int]{Number: 4, Data: 400}) + require.NoError(t, err) + + // Roll and close successfully + err = w_.rollFile(context.Background()) + require.NoError(t, err) + + // Verify the FileIndex now has one file with blocks 3-4 + fileIndexFiles = w_.fileIndex.Files() + assert.Equal(t, fileCountAfterRollback+2, len(fileIndexFiles), "FileIndex should have one more file after successful write") + lastFile := fileIndexFiles[len(fileIndexFiles)-1] + assert.Equal(t, uint64(3), lastFile.FirstBlockNum) + assert.Equal(t, uint64(4), lastFile.LastBlockNum) + + // Continue writing more blocks to demonstrate the writer is fully operational + err = w.Write(context.Background(), Block[int]{Number: 5, Data: 500}) + require.NoError(t, err) + + err = w.Write(context.Background(), Block[int]{Number: 6, Data: 600}) + require.NoError(t, err) + + // Roll and close successfully + err = w_.rollFile(context.Background()) + require.NoError(t, err) + + err = w.Close(context.Background()) + require.NoError(t, err) + + // Verify we can read the blocks that were successfully written + rdr, err := NewReader[int](Options{ + Dataset: Dataset{ + Name: "int-wal", + Path: testPath, + Version: defaultDatasetVersion, + }, + }) + require.NoError(t, err) + defer rdr.Close() + + // Read blocks 3-6 and verify they are correct + // (Blocks 1-2 were lost due to the write failure) + err = rdr.Seek(context.Background(), 3) + require.NoError(t, err) + + expectedData := map[uint64]int{ + 3: 300, 4: 400, 5: 500, 6: 600, + } + + for i := uint64(3); i <= 6; i++ { + blk, err := rdr.Read(context.Background()) + require.NoError(t, err) + assert.Equal(t, i, blk.Number) + assert.Equal(t, expectedData[i], blk.Data) + } +} + // failOnceFS is a filesystem wrapper that fails on the Nth Create call type failOnceFS struct { storage.FS @@ -657,6 +786,47 @@ func (f *failOnceFS) Create(ctx context.Context, path string, options *gostorage return f.FS.Create(ctx, path, options) } +// failOnWriteFS is a filesystem wrapper that returns a failing WriteCloser for WAL files only +type failOnWriteFS struct { + storage.FS + failOnCount int + createCount int + mu sync.Mutex +} + +func (f *failOnWriteFS) Create(ctx context.Context, path string, options *gostorage.WriterOptions) (io.WriteCloser, error) { + f.mu.Lock() + defer f.mu.Unlock() + + wc, err := f.FS.Create(ctx, path, options) + if err != nil { + return nil, err + } + + // Only fail on WAL files, not on FileIndex (which is named ".index") + // WAL files are stored as hex hashes in directory structure + if !strings.Contains(path, ".index") { + f.createCount++ + if f.createCount == f.failOnCount { + // Return a WriteCloser that fails on Write + return &failingWriteCloser{WriteCloser: wc}, nil + } + } + + return wc, nil +} + +// failingWriteCloser wraps a WriteCloser and fails on the first Write call +type failingWriteCloser struct { + io.WriteCloser +} + +func (f *failingWriteCloser) Write(p []byte) (n int, err error) { + // Close the underlying writer to clean up + _ = f.WriteCloser.Close() + return 0, fmt.Errorf("write failed: injected error") +} + func BenchmarkWriter_Write(b *testing.B) { defer func() { _ = os.RemoveAll(testPath) From b8447ca4fb8e6cd847ff3ae5f6d55c6f6d9ff098 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Wed, 22 Oct 2025 14:37:11 +0200 Subject: [PATCH 3/3] defer on error --- writer.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/writer.go b/writer.go index af2f6df..a605e4d 100644 --- a/writer.go +++ b/writer.go @@ -214,36 +214,34 @@ func (w *writer[T]) writeFile(ctx context.Context) error { return err } - recoverFileIndex := func() { - // remove last file from file index as it's not written - files := w.fileIndex.Files()[:len(w.fileIndex.Files())-1] - w.fileIndex = NewFileIndexFromFiles(w.fs, files) - } + defer func() { + if err != nil { + // remove last file from file index as it's not written + files := w.fileIndex.Files()[:len(w.fileIndex.Files())-1] + w.fileIndex = NewFileIndexFromFiles(w.fs, files) + } + }() // save file index err = w.fileIndex.Save(ctx) if err != nil { - recoverFileIndex() return err } // save file f, err := newFile.Create(ctx, w.fs) if err != nil { - recoverFileIndex() return err } _, err = f.Write(w.buffer.Bytes()) if err != nil { _ = f.Close() - recoverFileIndex() return err } err = f.Close() if err != nil { - recoverFileIndex() return err }