From 865b7aec5f292ea3482aa964f7888b5689b66323 Mon Sep 17 00:00:00 2001 From: Nikola Novakovic Date: Mon, 1 Dec 2025 14:02:31 -0500 Subject: [PATCH] Fix EOF errors when reading at file rotation boundaries When reading from completed files at maxBytesPerFileRead boundary, the reader would attempt to read the message header before checking if it should rotate to the next file. This caused EOF errors and incorrectly marked valid files as corrupted (.bad). Added pre-rotation check in readOne() that rotates to the next file before attempting to read when readPos >= maxBytesPerFileRead. The check only applies to completed files (readFileNum < writeFileNum) to avoid advancing past the current write file. Tests added: - TestLargeMessageBoundary: Verifies file rotation with large messages - TestReadCurrentWriteFile: Ensures reader doesn't advance past write file --- diskqueue.go | 17 +++++++++++ diskqueue_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 2b54bd4..98d2f6c 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -316,6 +316,23 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } + // check if we should rotate to the next file before attempting to read + // only rotate if we're reading a completed file (not the current write file) + if d.readFileNum < d.writeFileNum && d.readPos >= d.maxBytesPerFileRead { + if d.readFile != nil { + d.readFile.Close() + d.readFile = nil + } + + d.readFileNum++ + d.readPos = 0 + d.nextReadFileNum = d.readFileNum + d.nextReadPos = 0 + + // recursively call readOne to open and read from the next file + return d.readOne() + } + err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() diff --git a/diskqueue_test.go b/diskqueue_test.go index c5374c3..3d6d1b0 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -847,3 +847,75 @@ func TestWriteRollReadEOF(t *testing.T) { return err }) } + +// TestLargeMessageBoundary verifies that file rotation works correctly when messages are large relative to maxBytesPerFile, +// ensuring no EOF errors occur at boundaries and all messages can be read back without corruption. +func TestLargeMessageBoundary(t *testing.T) { + l := NewTestLogger(t) + tmpDir, _ := ioutil.TempDir("", "nsq-test") + defer os.RemoveAll(tmpDir) + + // Use smaller sizes to test the same behavior more efficiently + // 5KB file limit, 4KB max message (same 10:8 ratio as 50MB:40MB in production) + maxBytesPerFile := int64(5 * 1024) + maxMsgSize := int32(4 * 1024) + + dq := New("test_large_msg", tmpDir, maxBytesPerFile, 1, maxMsgSize, 1000, 2*time.Second, l) + defer dq.Close() + + // Create messages that will cause multiple rotations + largeMsg := make([]byte, 4000) // ~4KB message + for i := 0; i < 15; i++ { // ~60KB total, should rotate cleanly across multiple files + err := dq.Put(largeMsg) + Nil(t, err) + } + + // Read all messages back + for i := 0; i < 15; i++ { + msg := <-dq.ReadChan() + Equal(t, len(largeMsg), len(msg)) + } + + // Verify no .bad files were created + filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + if strings.HasSuffix(path, ".bad") { + t.Fatalf("Found corrupted file: %s", path) + } + return err + }) +} + +// TestReadCurrentWriteFile verifies that when reading the current write file, +// the reader doesn't try to rotate past the write file when reaching maxBytesPerFileRead +func TestReadCurrentWriteFile(t *testing.T) { + l := NewTestLogger(t) + tmpDir, _ := ioutil.TempDir("", "nsq-test") + defer os.RemoveAll(tmpDir) + + // Small file limit to trigger boundary easily + maxBytesPerFile := int64(1024) + dq := New("test_current_file", tmpDir, maxBytesPerFile, 4, 1024, 1000, 2*time.Second, l) + defer dq.Close() + + // Write messages up to the file limit + msg := []byte("test message") + for i := 0; i < 60; i++ { // Enough to fill first file and start second + err := dq.Put(msg) + Nil(t, err) + } + + // Read all messages back - this tests reading from current write file + // without trying to advance past it + for i := 0; i < 60; i++ { + readMsg := <-dq.ReadChan() + Equal(t, msg, readMsg) + } + + // Verify no .bad files were created + filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + if strings.HasSuffix(path, ".bad") { + t.Fatalf("Found corrupted file: %s", path) + } + return err + }) +}