diff --git a/README.md b/README.md index ac4e44ab..220632e0 100644 --- a/README.md +++ b/README.md @@ -195,7 +195,7 @@ FlyDB is released under the Apache license. For details, see LICENSE file. ## Thanks To JetBrains -> Thanks to `JetBrains` for the free open source license. +> Thanks to `JetBrains` for the free open source license. FlyDB-logo diff --git a/engine/benchmark/benchmark_test.go b/engine/benchmark/benchmark_test.go index b8708ebf..6dc2356d 100644 --- a/engine/benchmark/benchmark_test.go +++ b/engine/benchmark/benchmark_test.go @@ -8,7 +8,7 @@ import ( "github.com/ByteStorage/FlyDB/flydb" _const "github.com/ByteStorage/FlyDB/lib/const" "math/rand" - "path/filepath" + "os" "testing" "time" ) @@ -34,18 +34,17 @@ func GetValue() []byte { return str.Bytes() } -func init() { +func Benchmark_PutValue_FlyDB(b *testing.B) { opts := config.DefaultOptions - opts.DirPath = filepath.Join("benchmark", "flydbtest") + dir, _ := os.MkdirTemp("", "flydbtest") + opts.DirPath = dir FlyDB, err = flydb.NewFlyDB(opts) defer FlyDB.Clean() if err != nil { panic(err) } -} -func Benchmark_PutValue_FlyDB(b *testing.B) { b.ResetTimer() b.ReportAllocs() @@ -58,6 +57,15 @@ func Benchmark_PutValue_FlyDB(b *testing.B) { } func Benchmark_GetValue_FlyDB(b *testing.B) { + opts := config.DefaultOptions + opts.DirPath = "/tmp/FlyDB" + + FlyDB, err = flydb.NewFlyDB(opts) + defer FlyDB.Close() + if err != nil { + panic(err) + } + for i := 0; i < 500000; i++ { err = FlyDB.Put(GetKey(i), GetValue()) if err != nil { @@ -74,5 +82,4 @@ func Benchmark_GetValue_FlyDB(b *testing.B) { panic(err) } } - } diff --git a/engine/data/data_file.go b/engine/data/data_file.go index 2c26a6fe..5db26ee3 100644 --- a/engine/data/data_file.go +++ b/engine/data/data_file.go @@ -110,11 +110,10 @@ func (df *DataFile) ReadLogRecord(offset int64) (*LogRecord, int64, error) { } func (df *DataFile) Write(buf []byte) error { - size, err := df.IoManager.Write(buf) + _, err := df.IoManager.Write(buf) if err != nil { return err } - df.WriteOff += int64(size) return nil } diff --git a/engine/db.go b/engine/db.go index 4f3115eb..2e47c4b8 100644 --- a/engine/db.go +++ b/engine/db.go @@ -192,7 +192,9 @@ func (db *DB) appendLogRecord(logRecord *data2.LogRecord) (*data2.LogRecordPst, // Write data coding encRecord, size := data2.EncodeLogRecord(logRecord) - if db.activeFile.WriteOff+size > db.options.DataFileSize { + var writeOff int64 + var ok bool + for writeOff, ok = SingleOffset().CanWrite(db.activeFile.FileID, db.options.DataFileSize, size); !ok; writeOff, ok = SingleOffset().CanWrite(db.activeFile.FileID, db.options.DataFileSize, size) { // Persisting data files to ensure that existing data is persisted to disk if err := db.activeFile.Sync(); err != nil { return nil, err @@ -207,7 +209,6 @@ func (db *DB) appendLogRecord(logRecord *data2.LogRecord) (*data2.LogRecordPst, } } - writeOff := db.activeFile.WriteOff if err := db.activeFile.Write(encRecord); err != nil { return nil, err } @@ -243,6 +244,9 @@ func (db *DB) setActiveDataFile() error { return err } db.activeFile = dataFile + + size, _ := dataFile.IoManager.Size() + SingleOffset().AddNew(initialFileID, size) return nil } @@ -466,7 +470,7 @@ func (db *DB) loadIndexFromDataFiles() error { var currentSeqNo = nonTransactionSeqNo // Iterate through all file ids, processing records in the file - for i, fid := range db.fileIds { + for _, fid := range db.fileIds { var fileID = uint32(fid) // If the id is smaller than that of the file that did not participate in the merge recently, // the hint file has been loaded @@ -531,10 +535,7 @@ func (db *DB) loadIndexFromDataFiles() error { offset += size } - // If it is a current active file, update writeOff for this file - if i == len(db.fileIds)-1 { - db.activeFile.WriteOff = offset - } + SingleOffset().AddNew(fileID, offset) } // Update the transaction sequence number to the database field diff --git a/engine/fileio/mmap_io.go b/engine/fileio/mmap_io.go index 0593b3cb..88adc43d 100644 --- a/engine/fileio/mmap_io.go +++ b/engine/fileio/mmap_io.go @@ -1,107 +1,29 @@ package fileio import ( - "errors" - "github.com/edsrzf/mmap-go" - "os" + "sync" ) -type MMapIO struct { - fd *os.File // system file descriptor - data mmap.MMap // the mapping area corresponding to the file - dirty bool // has changed - offset int64 // next write location - fileSize int64 // max file size +type mmapFileController struct { + lock sync.Mutex + files map[string]*MMapIO } -// NewMMapIOManager Initialize Mmap IO -func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) { - mmapIO := &MMapIO{fileSize: fileSize} - - fd, err := os.OpenFile( - fileName, - os.O_CREATE|os.O_RDWR|os.O_APPEND, - DataFilePerm, - ) - if err != nil { - return nil, err - } - info, _ := fd.Stat() - - // Expand files to maximum file size, crop when saving - if err := fd.Truncate(fileSize); err != nil { - return nil, err - } - - // Building mappings between memory and disk files - b, err := mmap.Map(fd, mmap.RDWR, 0) - if err != nil { - return nil, err - } - - mmapIO.fd = fd - mmapIO.data = b - mmapIO.offset = info.Size() - return mmapIO, nil -} - -// Read Copy data from the mapping area to byte slice -func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { - return copy(b, mio.data[offset:]), nil -} - -// Write Copy data from byte slice to the mapping area -func (mio *MMapIO) Write(b []byte) (int, error) { - oldOffset := mio.offset - newOffset := mio.offset + int64(len(b)) - if newOffset > mio.fileSize { - return 0, errors.New("exceed file max content length") - } - - mio.offset = newOffset - mio.dirty = true - return copy(mio.data[oldOffset:], b), nil +var controller = mmapFileController{ + lock: sync.Mutex{}, + files: map[string]*MMapIO{}, } -// Sync Synchronize data from memory to disk -func (mio *MMapIO) Sync() error { - if !mio.dirty { - return nil - } - - if err := mio.data.Flush(); err != nil { - return err - } - - mio.dirty = false - return nil -} +// NewMMapIOManager Initialize Mmap IO +func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) { + controller.lock.Lock() + defer controller.lock.Unlock() -// Close file -func (mio *MMapIO) Close() (err error) { - if err = mio.fd.Truncate(mio.offset); err != nil { - return err - } - if err = mio.Sync(); err != nil { - return err + if v, ok := controller.files[fileName]; ok { + v.AddCount() + return v, nil } - if err = mio.UnMap(); err != nil { - panic(err) - } - return mio.fd.Close() -} -// Size return the size of current file -func (mio *MMapIO) Size() (int64, error) { - return mio.offset, nil -} - -// UnMap Unmapping between memory and files -func (mio *MMapIO) UnMap() error { - if mio.data == nil { - return nil - } - err := mio.data.Unmap() - mio.data = nil - return err + manager, err := (&MMapIO{fileName: fileName, fileSize: fileSize}).Init() + return manager, err } diff --git a/engine/fileio/mmap_io_unix.go b/engine/fileio/mmap_io_unix.go new file mode 100644 index 00000000..711d2db3 --- /dev/null +++ b/engine/fileio/mmap_io_unix.go @@ -0,0 +1,125 @@ +//go:build linux + +package fileio + +import ( + "errors" + atomic2 "go.uber.org/atomic" + "os" + "syscall" + "unsafe" +) + +type MMapIO struct { + fd *os.File // system file descriptor + data []byte // the mapping area corresponding to the file + offset int64 // next write location + fileSize int64 // max file size + fileName string + count atomic2.Int32 // the count of dbs using this mmap io +} + +func (mio *MMapIO) Init() (*MMapIO, error) { + fd, err := os.OpenFile(mio.fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, DataFilePerm) + if err != nil { + return nil, err + } + mio.fd = fd + + info, _ := fd.Stat() + mio.offset = info.Size() + + // Expand files to maximum file size, crop when saving + if err := fd.Truncate(mio.fileSize); err != nil { + return nil, err + } + + // Building mappings between memory and disk files + b, err := syscall.Mmap(int(mio.fd.Fd()), 0, int(mio.fileSize), + syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil, err + } + mio.data = b + + return mio, nil +} + +// UnMapUnix Unmapping between memory and files +func (mio *MMapIO) unmap() error { + if mio.data == nil { + return nil + } + + err := syscall.Munmap(mio.data) + mio.data = nil + + return err +} + +// Read Copy data from the mapping area to byte slice +func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { + return copy(b, mio.data[offset:]), nil +} + +// Write Copy data from byte slice to the mapping area +func (mio *MMapIO) Write(b []byte) (int, error) { + oldOffset := mio.offset + newOffset := mio.offset + int64(len(b)) + if newOffset > mio.fileSize { + return 0, errors.New("exceed file max content length") + } + + mio.offset = newOffset + return copy(mio.data[oldOffset:newOffset], b), nil +} + +// Sync Synchronize data from memory to disk +func (mio *MMapIO) Sync() error { + _, _, err := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&mio.data[0])), uintptr(mio.offset), uintptr(syscall.MS_SYNC)) + if err != 0 { + return err + } + return nil +} + +// Size return the size of current file +func (mio *MMapIO) Size() (int64, error) { + return mio.offset, nil +} + +// Close file +func (mio *MMapIO) Close() (err error) { + controller.lock.Lock() + defer controller.lock.Unlock() + + mio.SubCount() + if mio.GetCount() > 0 { + return nil + } + + delete(controller.files, mio.fileName) + + if err = mio.fd.Truncate(mio.offset); err != nil { + return err + } + if err = mio.Sync(); err != nil { + return err + } + if err = mio.unmap(); err != nil { + panic(err) + } + return mio.fd.Close() +} + +func (mio *MMapIO) GetCount() int32 { + return mio.count.Load() +} + +func (mio *MMapIO) AddCount() { + mio.count.Add(1) +} + +func (mio *MMapIO) SubCount() { + mio.count.Add(-1) +} diff --git a/engine/fileio/mmap_io_windows.go b/engine/fileio/mmap_io_windows.go new file mode 100644 index 00000000..58de9440 --- /dev/null +++ b/engine/fileio/mmap_io_windows.go @@ -0,0 +1,130 @@ +//go:build windows + +package fileio + +import ( + "errors" + "os" + "syscall" + "unsafe" +) + +type MMapIO struct { + fd *os.File // system file descriptor + handle syscall.Handle + data []byte // the mapping area corresponding to the file + offset int64 // next write location + fileSize int64 // max file size + fileName string + count atomic.Int32 // the count of dbs using this mmap io +} + +func (mio *MMapIO) Init() (*MMapIO, error) { + fd, err := os.OpenFile(mio.fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, DataFilePerm) + if err != nil { + return nil, err + } + mio.fd = fd + + info, _ := fd.Stat() + mio.offset = info.Size() + + // Expand files to maximum file size, crop when saving + if err := fd.Truncate(mio.fileSize); err != nil { + return nil, err + } + + // Building mappings between memory and disk files + h, err := syscall.CreateFileMapping(syscall.Handle(mio.fd.Fd()), nil, + syscall.PAGE_READWRITE, 0, uint32(mio.fileSize), nil) + if err != nil { + return nil, err + } + mio.handle = h + + addr, _ := syscall.MapViewOfFile(h, syscall.FILE_MAP_WRITE, 0, + 0, uintptr(mio.fileSize)) + if err != nil { + return nil, err + } + mio.data = *(*[]byte)(unsafe.Pointer(addr)) + + return mio, nil +} + +func (mio *MMapIO) unmap() error { + if mio.data == nil { + return nil + } + + addr := (uintptr)(unsafe.Pointer(&mio.data[0])) + err := syscall.UnmapViewOfFile(addr) + mio.data = nil + + return err +} + +// Read Copy data from the mapping area to byte slice +func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { + return copy(b, mio.data[offset:]), nil +} + +// Write Copy data from byte slice to the mapping area +func (mio *MMapIO) Write(b []byte) (int, error) { + oldOffset := mio.offset + newOffset := mio.offset + int64(len(b)) + if newOffset > mio.fileSize { + return 0, errors.New("exceed file max content length") + } + + mio.offset = newOffset + return copy(mio.data[oldOffset:newOffset], b), nil +} + +func (mio *MMapIO) Sync() error { + err := syscall.FlushFileBuffers(mio.handle) + if err != nil { + return err + } + return nil +} + +// Size return the size of current file +func (mio *MMapIO) Size() (int64, error) { + return mio.offset, nil +} + +func (mio *MMapIO) Close() (err error) { + controller.lock.Lock() + defer controller.lock.Unlock() + + mio.SubCount() + if mio.GetCount() > 0 { + return nil + } + + delete(controller.files, mio.fileName) + + if err = mio.fd.Truncate(mio.offset); err != nil { + return err + } + if err = mio.Sync(); err != nil { + return err + } + if err = mio.unmap(); err != nil { + panic(err) + } + return syscall.CloseHandle(mio.handle) +} + +func (mio *MMapIO) GetCount() int32 { + return mio.count.Load() +} + +func (mio *MMapIO) AddCount() { + mio.count.Add(1) +} + +func (mio *MMapIO) SubCount() { + mio.count.Add(-1) +} diff --git a/engine/offset_controller.go b/engine/offset_controller.go new file mode 100644 index 00000000..89641e33 --- /dev/null +++ b/engine/offset_controller.go @@ -0,0 +1,46 @@ +package engine + +import "sync" + +type OffsetController struct { + m sync.Mutex + offset map[uint32]int64 +} + +var Controller = &OffsetController{ + m: sync.Mutex{}, + offset: map[uint32]int64{}, +} + +func SingleOffset() *OffsetController { + return Controller +} + +func (c *OffsetController) CanWrite(fileId uint32, filesize int64, size int64) (int64, bool) { + c.m.Lock() + defer c.m.Unlock() + + if v, ok := c.offset[fileId]; ok { + if v+size <= filesize { + c.offset[fileId] = v + size + return v, true + } + } + return 0, false +} + +func (c *OffsetController) AddNew(fileId uint32, offset int64) int64 { + c.m.Lock() + defer c.m.Unlock() + if _, ok := c.offset[fileId]; ok { + return c.offset[fileId] + } + c.offset[fileId] = offset + return offset +} + +func (c *OffsetController) ChangeOffset(fileId uint32, offset int64) { + c.m.Lock() + defer c.m.Unlock() + c.offset[fileId] = offset +} diff --git a/go.mod b/go.mod index 7458ff5f..9ad78fe0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/bwmarrin/snowflake v0.3.0 github.com/chen3feng/stl4go v0.1.1 github.com/desertbit/grumble v1.1.3 - github.com/edsrzf/mmap-go v1.1.0 github.com/fatih/color v1.13.0 github.com/golang/protobuf v1.5.3 github.com/google/btree v1.1.2 @@ -55,7 +54,7 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.8.0 // indirect - golang.org/x/sys v0.6.0 // indirect + golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.8.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index fc1015ee..20a11a80 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,6 @@ github.com/desertbit/grumble v1.1.3 h1:gbdgVGWsHmNraJ7Gn6Q4TiUEIHU/UHfbc1KUSbBlg github.com/desertbit/grumble v1.1.3/go.mod h1:r7j3ShNy5EmOsegRD2DzTutIaGiLiA3M5yBTXXeLwcs= github.com/desertbit/readline v1.5.1 h1:/wOIZkWYl1s+IvJm/5bOknfUgs6MhS9svRNZpFM53Os= github.com/desertbit/readline v1.5.1/go.mod h1:pHQgTsCFs9Cpfh5mlSUFi9Xa5kkL4d8L1Jo4UVWzPw0= -github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= -github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= @@ -238,8 +236,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= diff --git a/lib/proto/gstring/db.proto b/lib/proto/gstring/db.proto index 84bb8a6f..400063c8 100644 --- a/lib/proto/gstring/db.proto +++ b/lib/proto/gstring/db.proto @@ -258,5 +258,5 @@ message MSetNXRequest { } message MSetNXResponse { - bool success = 1; + bool success = 1; } \ No newline at end of file