From be41a83a5dd177f84caeccf85630b83261ea9508 Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Sun, 23 Jul 2023 01:44:42 +0800 Subject: [PATCH 1/7] add offset_controller.go to control multi goroutine write to one file && add mmap file reuse --- engine/db.go | 6 +++-- engine/fileio/mmap_io.go | 37 +++++++++++++++++++++-------- engine/offset_controller.go | 46 +++++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 12 deletions(-) create mode 100644 engine/offset_controller.go diff --git a/engine/db.go b/engine/db.go index 4f3115eb..dec262db 100644 --- a/engine/db.go +++ b/engine/db.go @@ -192,7 +192,9 @@ func (db *DB) appendLogRecord(logRecord *data2.LogRecord) (*data2.LogRecordPst, // Write data coding encRecord, size := data2.EncodeLogRecord(logRecord) - if db.activeFile.WriteOff+size > db.options.DataFileSize { + var writeOff int64 + var ok bool + for writeOff, ok = SingleOffset().CanWrite(db.activeFile.FileID, db.options.DataFileSize, size); !ok; writeOff, ok = SingleOffset().CanWrite(db.activeFile.FileID, db.options.DataFileSize, size) { // Persisting data files to ensure that existing data is persisted to disk if err := db.activeFile.Sync(); err != nil { return nil, err @@ -207,7 +209,6 @@ func (db *DB) appendLogRecord(logRecord *data2.LogRecord) (*data2.LogRecordPst, } } - writeOff := db.activeFile.WriteOff if err := db.activeFile.Write(encRecord); err != nil { return nil, err } @@ -243,6 +244,7 @@ func (db *DB) setActiveDataFile() error { return err } db.activeFile = dataFile + SingleOffset().AddNew(initialFileID, 0) return nil } diff --git a/engine/fileio/mmap_io.go b/engine/fileio/mmap_io.go index d458db0e..62bc0af3 100644 --- a/engine/fileio/mmap_io.go +++ b/engine/fileio/mmap_io.go @@ -3,6 +3,7 @@ package fileio import ( "errors" "os" + "sync" "syscall" "unsafe" ) @@ -10,14 +11,33 @@ import ( type MMapIO struct { fd *os.File // system file descriptor data []byte // the mapping area corresponding to the file - dirty bool // has changed offset int64 // next write location fileSize int64 // max file size + count int +} + +type mmapFileController struct { + lock sync.Mutex + files map[string]*MMapIO +} + +var controller = mmapFileController{ + lock: sync.Mutex{}, + files: map[string]*MMapIO{}, } // NewMMapIOManager Initialize Mmap IO func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) { - mmapIO := &MMapIO{fileSize: fileSize} + controller.lock.Lock() + defer controller.lock.Unlock() + + if v, ok := controller.files[fileName]; ok { + v.count++ + return v, nil + } + + mmapIO := &MMapIO{fileSize: fileSize, count: 1} + controller.files[fileName] = mmapIO fd, err := os.OpenFile( fileName, @@ -60,27 +80,24 @@ func (mio *MMapIO) Write(b []byte) (int, error) { } mio.offset = newOffset - mio.dirty = true - return copy(mio.data[oldOffset:], b), nil + return copy(mio.data[oldOffset:newOffset], b), nil } // Sync Synchronize data from memory to disk func (mio *MMapIO) Sync() error { - if !mio.dirty { - return nil - } - _, _, err := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&mio.data[0])), uintptr(mio.offset), uintptr(syscall.MS_SYNC)) if err != 0 { return err } - - mio.dirty = false return nil } // Close file func (mio *MMapIO) Close() (err error) { + mio.count-- + if mio.count > 0 { + return nil + } if err = mio.fd.Truncate(mio.offset); err != nil { return err } diff --git a/engine/offset_controller.go b/engine/offset_controller.go new file mode 100644 index 00000000..89641e33 --- /dev/null +++ b/engine/offset_controller.go @@ -0,0 +1,46 @@ +package engine + +import "sync" + +type OffsetController struct { + m sync.Mutex + offset map[uint32]int64 +} + +var Controller = &OffsetController{ + m: sync.Mutex{}, + offset: map[uint32]int64{}, +} + +func SingleOffset() *OffsetController { + return Controller +} + +func (c *OffsetController) CanWrite(fileId uint32, filesize int64, size int64) (int64, bool) { + c.m.Lock() + defer c.m.Unlock() + + if v, ok := c.offset[fileId]; ok { + if v+size <= filesize { + c.offset[fileId] = v + size + return v, true + } + } + return 0, false +} + +func (c *OffsetController) AddNew(fileId uint32, offset int64) int64 { + c.m.Lock() + defer c.m.Unlock() + if _, ok := c.offset[fileId]; ok { + return c.offset[fileId] + } + c.offset[fileId] = offset + return offset +} + +func (c *OffsetController) ChangeOffset(fileId uint32, offset int64) { + c.m.Lock() + defer c.m.Unlock() + c.offset[fileId] = offset +} From 36d66857dcc9decfe7988edb15f1626185cb3b96 Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Wed, 26 Jul 2023 01:48:13 +0800 Subject: [PATCH 2/7] hot fix mmap --- engine/fileio/mmap_io.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/engine/fileio/mmap_io.go b/engine/fileio/mmap_io.go index 62bc0af3..fa41f592 100644 --- a/engine/fileio/mmap_io.go +++ b/engine/fileio/mmap_io.go @@ -13,7 +13,8 @@ type MMapIO struct { data []byte // the mapping area corresponding to the file offset int64 // next write location fileSize int64 // max file size - count int + fileName string + count int // the count of dbs using this mmap io } type mmapFileController struct { @@ -36,7 +37,7 @@ func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) { return v, nil } - mmapIO := &MMapIO{fileSize: fileSize, count: 1} + mmapIO := &MMapIO{fileSize: fileSize, fileName: fileName, count: 1} controller.files[fileName] = mmapIO fd, err := os.OpenFile( @@ -94,10 +95,16 @@ func (mio *MMapIO) Sync() error { // Close file func (mio *MMapIO) Close() (err error) { + controller.lock.Lock() + defer controller.lock.Unlock() + mio.count-- if mio.count > 0 { return nil } + + delete(controller.files, mio.fileName) + if err = mio.fd.Truncate(mio.offset); err != nil { return err } From 1b0289e51ce878e0acd695bb0ef5e7660021135c Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Wed, 26 Jul 2023 02:12:51 +0800 Subject: [PATCH 3/7] merge error fix --- engine/grpc/client/client.go | 10 ++++++++++ lib/proto/glist/db.pb.go | 28 ++++++++++++++-------------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/engine/grpc/client/client.go b/engine/grpc/client/client.go index 40f8e940..0c9900da 100644 --- a/engine/grpc/client/client.go +++ b/engine/grpc/client/client.go @@ -2,6 +2,7 @@ package client import ( "github.com/ByteStorage/FlyDB/lib/proto/ghash" + "github.com/ByteStorage/FlyDB/lib/proto/glist" "github.com/ByteStorage/FlyDB/lib/proto/gstring" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -32,3 +33,12 @@ func newHashGrpcClient(addr string) (ghash.GHashServiceClient, error) { client := ghash.NewGHashServiceClient(conn) return client, nil } + +func newListGrpcClient(addr string) (glist.GListServiceClient, error) { + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + client := glist.NewGListServiceClient(conn) + return client, nil +} diff --git a/lib/proto/glist/db.pb.go b/lib/proto/glist/db.pb.go index 33766a3c..8b0dfc9a 100644 --- a/lib/proto/glist/db.pb.go +++ b/lib/proto/glist/db.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.6.1 -// source: db.proto +// protoc-gen-go v1.26.0 +// protoc v3.14.0 +// source: lib/proto/glist/db.proto package glist @@ -41,7 +41,7 @@ type GListLPushRequest struct { func (x *GListLPushRequest) Reset() { *x = GListLPushRequest{} if protoimpl.UnsafeEnabled { - mi := &file_db_proto_msgTypes[0] + mi := &file_lib_proto_glist_db_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -54,7 +54,7 @@ func (x *GListLPushRequest) String() string { func (*GListLPushRequest) ProtoMessage() {} func (x *GListLPushRequest) ProtoReflect() protoreflect.Message { - mi := &file_db_proto_msgTypes[0] + mi := &file_lib_proto_glist_db_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -67,7 +67,7 @@ func (x *GListLPushRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GListLPushRequest.ProtoReflect.Descriptor instead. func (*GListLPushRequest) Descriptor() ([]byte, []int) { - return file_db_proto_rawDescGZIP(), []int{0} + return file_lib_proto_glist_db_proto_rawDescGZIP(), []int{0} } func (x *GListLPushRequest) GetKey() string { @@ -216,7 +216,7 @@ func (x *GListLPushResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GListLPushResponse.ProtoReflect.Descriptor instead. func (*GListLPushResponse) Descriptor() ([]byte, []int) { - return file_db_proto_rawDescGZIP(), []int{1} + return file_lib_proto_glist_db_proto_rawDescGZIP(), []int{1} } func (x *GListLPushResponse) GetOk() bool { @@ -238,7 +238,7 @@ type GListLPushsRequest struct { func (x *GListLPushsRequest) Reset() { *x = GListLPushsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_db_proto_msgTypes[2] + mi := &file_lib_proto_glist_db_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -251,7 +251,7 @@ func (x *GListLPushsRequest) String() string { func (*GListLPushsRequest) ProtoMessage() {} func (x *GListLPushsRequest) ProtoReflect() protoreflect.Message { - mi := &file_db_proto_msgTypes[2] + mi := &file_lib_proto_glist_db_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -264,7 +264,7 @@ func (x *GListLPushsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GListLPushsRequest.ProtoReflect.Descriptor instead. func (*GListLPushsRequest) Descriptor() ([]byte, []int) { - return file_db_proto_rawDescGZIP(), []int{2} + return file_lib_proto_glist_db_proto_rawDescGZIP(), []int{2} } func (x *GListLPushsRequest) GetKey() string { @@ -301,7 +301,7 @@ type Value struct { func (x *Value) Reset() { *x = Value{} if protoimpl.UnsafeEnabled { - mi := &file_db_proto_msgTypes[3] + mi := &file_lib_proto_glist_db_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -314,7 +314,7 @@ func (x *Value) String() string { func (*Value) ProtoMessage() {} func (x *Value) ProtoReflect() protoreflect.Message { - mi := &file_db_proto_msgTypes[3] + mi := &file_lib_proto_glist_db_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -867,7 +867,7 @@ func (x *GListLPopResponse) String() string { func (*GListLPopResponse) ProtoMessage() {} func (x *GListLPopResponse) ProtoReflect() protoreflect.Message { - mi := &file_db_proto_msgTypes[10] + mi := &file_lib_proto_glist_db_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -880,7 +880,7 @@ func (x *GListLPopResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GListLPopResponse.ProtoReflect.Descriptor instead. func (*GListLPopResponse) Descriptor() ([]byte, []int) { - return file_db_proto_rawDescGZIP(), []int{10} + return file_lib_proto_glist_db_proto_rawDescGZIP(), []int{10} } func (m *GListLPopResponse) GetValue() isGListLPopResponse_Value { From 82f717d0398a2242dfc03b89eef6ed32505cf799 Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Wed, 26 Jul 2023 02:43:00 +0800 Subject: [PATCH 4/7] merge error fix --- README.md | 10 +- README_CN.md | 8 +- cmd/client/root.go | 264 ++++++++++++++++++++++++++++++++++ cmd/client/string.go | 45 ++++++ engine/grpc/base.go | 8 ++ engine/grpc/client/client.go | 2 - engine/grpc/client/string.go | 96 +++++++++++++ engine/grpc/service/string.go | 77 ++++++++++ lib/proto/glist/db.proto | 3 +- lib/proto/glist/db_grpc.pb.go | 6 +- lib/proto/gstring/db.proto | 41 ++++++ structure/list.go | 7 +- 12 files changed, 555 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index f777489f..e16e7e55 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,12 @@ English | [中文](https://github.com/ByteStorage/flydb/blob/master/README_CN.md **FlyDB** is a high-performance key-value (KV) storage engine based on the efficient bitcask model. It offers fast and reliable data retrieval and storage capabilities. By leveraging the simplicity and effectiveness of the bitcask model, **FlyDB** ensures efficient read and write operations, resulting in improved overall performance. It provides a streamlined approach to storing and accessing key-value pairs, making it an excellent choice for scenarios that require fast and responsive data access. **FlyDB's** focus on speed and simplicity makes it a valuable alternative for applications that prioritize performance while balancing storage costs.  -## 🏁 Fast Start: FlyDB +## 🏁 Fast Start: FlyDB You can install FlyDB using the Go command line tool: ```GO -go get github.com/ByteStorage/FlyDB@v1.0.6 +go get github.com/ByteStorage/FlyDB@v1.0.10 ``` Or clone this project from github: @@ -172,6 +172,10 @@ V1.0.7: Short-term support version v1.0.7 supports data structures of BitMap typ V1.0.8: Short-term support version v1.0.8 Support for a new in-memory index, SkipList. Some codes are optimized. +V1.0.9: Short-term support version v1.0.9 supports data structures of Sort Set type. Some codes are optimized. + +V1.0.10: Short-term support version v1.0.10 supports data structures of Set type. Some codes are optimized. + ## 👀 Contributor @@ -191,7 +195,7 @@ FlyDB is released under the Apache license. For details, see LICENSE file. ## Thanks To JetBrains -> Thanks to `JetBrains` for the free open source license. +> Thanks to `JetBrains` for the free open source license. FlyDB-logo diff --git a/README_CN.md b/README_CN.md index e07e7fbe..5b8fe8fa 100644 --- a/README_CN.md +++ b/README_CN.md @@ -16,7 +16,7 @@ 您可以使用Go命令行工具安装FlyDB: ```GO -go get github.com/ByteStorage/FlyDB@v1.0.6 +go get github.com/ByteStorage/FlyDB@v1.0.10 ``` 或者从github克隆这个项目: @@ -165,7 +165,7 @@ PASS V1.0.4: 短期支持版本v1.0.4使用mmap来优化索引部分,与v1.0.3版本相比,显著提高了读写性能。50w数据写入速度从1.35秒加快到0.56秒,读取速度从1.06秒加快到0.355秒。 -V1.0.5: 短期支持版本v1.0.5支持String、List类型的数据结构。并对部分代码进行了优化。 +V1.0.5: 短期支持版本v1.0.5支持String、List类型的数据结构。并对部分代码进行了优化。 V1.0.6: 短期支持版本v1.0.6支持Hash类型的数据结构。并对部分代码进行了优化。 @@ -173,6 +173,10 @@ V1.0.7: 短期支持版本v1.0.7支持BitMap类型的数据结构。并对部分 V1.0.8: 短期支持版本v1.0.8支持新的内存索引SkipList。对部分代码进行了优化。 +V1.0.9: 短期支持版本v1.0.9支持Sort Set类型的数据结构。并对部分代码进行了优化。 + +V1.0.10: 短期支持版本v1.0.10支持Set类型的数据结构。并对部分代码进行了优化。 + ## 👀 贡献者列表 diff --git a/cmd/client/root.go b/cmd/client/root.go index 306a8bda..0df7187b 100644 --- a/cmd/client/root.go +++ b/cmd/client/root.go @@ -31,6 +31,148 @@ func register(app *grumble.App) { }, }) + app.AddCommand(&grumble.Command{ + Name: "strlen", + Help: "get the length of the value stored in a key in string-structure", + Run: stringStrLen, + Args: func(a *grumble.Args) { + a.String("key", "The key whose value length to retrieve", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "type", + Help: "get the type of the value stored in a key in string-structure", + Run: stringGetType, + Args: func(a *grumble.Args) { + a.String("key", "The key whose value type to retrieve", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "getset", + Help: "set the value of a key and return its old value in string-structure", + Run: stringGetSet, + Args: func(a *grumble.Args) { + a.String("key", "The key to set", grumble.Default("")) + a.String("value", "The new value to set", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "append", + Help: "append a value to a key in string-structure", + Run: stringAppend, + Args: func(a *grumble.Args) { + a.String("key", "The key to append to", grumble.Default("")) + a.String("value", "The value to append", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "incr", + Help: "increment the integer value of a key in string-structure", + Run: stringIncr, + Args: func(a *grumble.Args) { + a.String("key", "The key to increment", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "incrby", + Help: "increment the integer value of a key by a specific amount in string-structure", + Run: stringIncrBy, + Args: func(a *grumble.Args) { + a.String("key", "The key to increment", grumble.Default("")) + a.Int64("amount", "The amount to increment by", grumble.Default(1)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "incrbyfloat", + Help: "increment the float value of a key by a specific amount in string-structure", + Run: stringIncrByFloat, + Args: func(a *grumble.Args) { + a.String("key", "The key to increment", grumble.Default("")) + a.Float64("amount", "The amount to increment by", grumble.Default(1.0)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "decr", + Help: "decrement the integer value of a key in string-structure", + Run: stringDecr, + Args: func(a *grumble.Args) { + a.String("key", "The key to decrement", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "decrby", + Help: "decrement the integer value of a key by a specific amount in string-structure", + Run: stringDecrBy, + Args: func(a *grumble.Args) { + a.String("key", "The key to decrement", grumble.Default("")) + a.Int64("amount", "The amount to decrement by", grumble.Default(1)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "exists", + Help: "check if a key exists in string-structure", + Run: stringExists, + Args: func(a *grumble.Args) { + a.String("key", "The key to check for existence", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "expire", + Help: "set a timeout on a key in string-structure", + Run: stringExpire, + Args: func(a *grumble.Args) { + a.String("key", "The key to set a timeout on", grumble.Default("")) + a.Int64("ttl", "The time-to-live (TTL) in seconds", grumble.Default(0)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "persist", + Help: "remove the timeout on a key, making it persist in string-structure", + Run: stringPersist, + Args: func(a *grumble.Args) { + a.String("key", "The key to make persistent", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "mget", + Help: "get the values of multiple keys in string-structure", + Run: stringMGet, + Args: func(a *grumble.Args) { + a.StringList("key", "The keys to get values for", grumble.Default([]string{})) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "mset", + Help: "Set multiple key-value pairs in string-structure", + Run: stringMSet, + Args: func(a *grumble.Args) { + a.StringList("key-value", "key-value pairs (e.g., key1 value1 key2 value2)", grumble.Default("")) + }, + }) + + // Command for stringMSetNX + app.AddCommand(&grumble.Command{ + Name: "msetnx", + Help: "Set multiple key-value pairs if the keys do not exist in string-structure", + Run: stringMSetNX, + Args: func(a *grumble.Args) { + a.StringList("key-value", "key-value pairs (e.g., key1 value1 key2 value2)", grumble.Default("")) + }, + }) + app.AddCommand(&grumble.Command{ Name: "HSet", Help: "put data in hash-structure", @@ -166,4 +308,126 @@ func register(app *grumble.App) { a.String("field", "field", grumble.Default("")) }, }) + + app.AddCommand(&grumble.Command{ + Name: "LPush", + Help: "Inserts a value at the head of a list in list-structure", + Run: stringLPushData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.String("value", "value", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LPushs", + Help: "Inserts multiple values at the head of a list in list-structure", + Run: stringLPushsData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.StringList("values", "values", grumble.Default([]string{})) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "RPush", + Help: "Inserts a value at the tail of a list in list-structure", + Run: stringRPushData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.String("value", "value", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "RPushs", + Help: "Push elements to the end of a list in list-structure", + Run: stringRPushsData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.StringList("values", "values", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LPop", + Help: "Removes and returns the first element of a list in list-structure", + Run: stringLPopData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "RPop", + Help: "Removes and returns the last element of a list in list-structure", + Run: stringRPopData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LRange", + Help: "Returns a range of elements from a list in list-structure", + Run: stringLRangeData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.Int("start", "start", grumble.Default(0)) + a.Int("stop", "stop", grumble.Default(-1)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LLen", + Help: "Returns the length of a list in list-structure", + Run: stringLLenData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LRem", + Help: "Remove elements from a list in list-structure", + Run: stringLRemData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.Int("count", "count", grumble.Default(0)) + a.String("value", "value", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LIndex", + Help: "Get the element at a specific index in a list in list-structure", + Run: stringLIndexData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.Int("index", "index", grumble.Default(0)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LSet", + Help: "Set the value of an element at a specific index in a list in list-structure", + Run: stringLSetData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.Int("index", "index", grumble.Default(0)) + a.String("value", "value", grumble.Default("")) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "LTrim", + Help: "Trim a list to a specified range of elements in list-structure", + Run: stringLTrimData, + Args: func(a *grumble.Args) { + a.String("key", "key", grumble.Default("")) + a.Int("start", "start", grumble.Default(0)) + a.Int("stop", "stop", grumble.Default(0)) + }, + }) + } diff --git a/cmd/client/string.go b/cmd/client/string.go index e8df7058..a8ace719 100644 --- a/cmd/client/string.go +++ b/cmd/client/string.go @@ -1,6 +1,7 @@ package client import ( + "errors" "fmt" "github.com/ByteStorage/FlyDB/engine/grpc/client" "github.com/desertbit/grumble" @@ -234,3 +235,47 @@ func stringMGet(c *grumble.Context) error { fmt.Println(values) return nil } + +func stringMSet(c *grumble.Context) error { + keyValuePairs := c.Args.StringList("key-value") + if len(keyValuePairs) == 0 || len(keyValuePairs)%2 != 0 { + return errors.New("invalid number of arguments, must provide key-value pairs") + } + + var pairs []interface{} + for i := 0; i < len(keyValuePairs); i += 2 { + key := keyValuePairs[i] + value := keyValuePairs[i+1] + pairs = append(pairs, key, value) + } + + err := newClient().MSet(pairs...) + if err != nil { + fmt.Println("set data error:", err) + return err + } + fmt.Println("Data successfully set.") + return nil +} + +func stringMSetNX(c *grumble.Context) error { + keyValuePairs := c.Args.StringList("key-value") + if len(keyValuePairs) == 0 || len(keyValuePairs)%2 != 0 { + return errors.New("invalid number of arguments, must provide key-value pairs") + } + + var pairs []interface{} + for i := 0; i < len(keyValuePairs); i += 2 { + key := keyValuePairs[i] + value := keyValuePairs[i+1] + pairs = append(pairs, key, value) + } + + err := newClient().MSetNX(pairs...) + if err != nil { + fmt.Println("set data error:", err) + return err + } + fmt.Println("Data successfully set.") + return nil +} diff --git a/engine/grpc/base.go b/engine/grpc/base.go index 55709e9f..dcfa366a 100644 --- a/engine/grpc/base.go +++ b/engine/grpc/base.go @@ -5,6 +5,7 @@ import ( "github.com/ByteStorage/FlyDB/config" "github.com/ByteStorage/FlyDB/engine/grpc/service" "github.com/ByteStorage/FlyDB/lib/proto/ghash" + "github.com/ByteStorage/FlyDB/lib/proto/glist" "github.com/ByteStorage/FlyDB/lib/proto/gstring" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -53,6 +54,13 @@ func NewService(options config.Options, addr string) (Base, error) { baseService.RegisterService(hashService) ghash.RegisterGHashServiceServer(baseService.server, hashService) + listService, err := service.NewListService(options) + if err != nil { + return nil, err + } + baseService.RegisterService(listService) + glist.RegisterGListServiceServer(baseService.server, listService) + return baseService, nil } diff --git a/engine/grpc/client/client.go b/engine/grpc/client/client.go index 0c9900da..437d6408 100644 --- a/engine/grpc/client/client.go +++ b/engine/grpc/client/client.go @@ -19,7 +19,6 @@ func newGrpcClient(addr string) (gstring.GStringServiceClient, error) { if err != nil { return nil, err } - //client := gstring.NewGStringServiceClient(conn) client := gstring.NewGStringServiceClient(conn) return client, nil } @@ -29,7 +28,6 @@ func newHashGrpcClient(addr string) (ghash.GHashServiceClient, error) { if err != nil { return nil, err } - //client := gstring.NewGStringServiceClient(conn) client := ghash.NewGHashServiceClient(conn) return client, nil } diff --git a/engine/grpc/client/string.go b/engine/grpc/client/string.go index 10792cb3..2d570a20 100644 --- a/engine/grpc/client/string.go +++ b/engine/grpc/client/string.go @@ -327,3 +327,99 @@ func (c *Client) MGet(keys []string) ([]interface{}, error) { } return values, nil } + +func (c *Client) MSet(pairs ...interface{}) error { + client, err := newGrpcClient(c.Addr) + if err != nil { + return err + } + + var values []*gstring.MSetRequest_KeyValue + // The input should be a map[string]interface{} + if len(pairs)%2 != 0 { + return errors.New("invalid number of arguments") + } + + for i := 0; i < len(pairs); i += 2 { + key, ok := pairs[i].(string) + if !ok { + return errors.New("invalid key type") + } + + value := pairs[i+1] + switch v := value.(type) { + case string: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_StringValue{StringValue: v}}) + case int32: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_Int32Value{Int32Value: v}}) + case int64: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_Int64Value{Int64Value: v}}) + case float32: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_Float32Value{Float32Value: v}}) + case float64: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_Float64Value{Float64Value: v}}) + case bool: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_BoolValue{BoolValue: v}}) + case []byte: + values = append(values, &gstring.MSetRequest_KeyValue{Key: key, Value: &gstring.MSetRequest_KeyValue_BytesValue{BytesValue: v}}) + default: + return errors.New("unsupported value type") + } + } + + _, err = client.MSet(context.Background(), &gstring.MSetRequest{Pairs: values}) + if err != nil { + return err + } + + return nil +} + +func (c *Client) MSetNX(pairs ...interface{}) error { + client, err := newGrpcClient(c.Addr) + if err != nil { + return err + } + + if len(pairs)%2 != 0 { + return errors.New("invalid number of arguments, must provide key-value pairs") + } + + var values []*gstring.MSetNXRequest_KeyValue + + for i := 0; i < len(pairs); i += 2 { + key, ok := pairs[i].(string) + if !ok { + return errors.New("invalid key type") + } + + value := pairs[i+1] + switch v := value.(type) { + case string: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_StringValue{StringValue: v}}) + case int32: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_Int32Value{Int32Value: v}}) + case int64: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_Int64Value{Int64Value: v}}) + case float32: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_Float32Value{Float32Value: v}}) + case float64: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_Float64Value{Float64Value: v}}) + case bool: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_BoolValue{BoolValue: v}}) + case []byte: + values = append(values, &gstring.MSetNXRequest_KeyValue{Key: key, Value: &gstring.MSetNXRequest_KeyValue_BytesValue{BytesValue: v}}) + default: + return errors.New("unsupported value type") + } + } + + resp, err := client.MSetNX(context.Background(), &gstring.MSetNXRequest{Pairs: values}) + if err != nil { + return err + } + if !resp.Success { + return errors.New("MSetNX failed any key has existed") + } + return nil +} diff --git a/engine/grpc/service/string.go b/engine/grpc/service/string.go index ad3b66ff..80009b5b 100644 --- a/engine/grpc/service/string.go +++ b/engine/grpc/service/string.go @@ -2,6 +2,7 @@ package service import ( "context" + "errors" "fmt" "github.com/ByteStorage/FlyDB/config" "github.com/ByteStorage/FlyDB/lib/proto/gstring" @@ -286,3 +287,79 @@ func (s *str) MGet(ctx context.Context, req *gstring.MGetRequest) (*gstring.MGet return resp, nil } + +func (s *str) MSet(ctx context.Context, req *gstring.MSetRequest) (*gstring.MSetResponse, error) { + // Extract the key-value pairs from the request and populate the `values` slice. + var values []interface{} + for _, keyValue := range req.GetPairs() { + switch v := keyValue.Value.(type) { + case *gstring.MSetRequest_KeyValue_StringValue: + values = append(values, keyValue.GetKey(), v.StringValue) + case *gstring.MSetRequest_KeyValue_Int32Value: + values = append(values, keyValue.GetKey(), v.Int32Value) + case *gstring.MSetRequest_KeyValue_Int64Value: + values = append(values, keyValue.GetKey(), v.Int64Value) + case *gstring.MSetRequest_KeyValue_Float32Value: + values = append(values, keyValue.GetKey(), v.Float32Value) + case *gstring.MSetRequest_KeyValue_Float64Value: + values = append(values, keyValue.GetKey(), v.Float64Value) + case *gstring.MSetRequest_KeyValue_BoolValue: + values = append(values, keyValue.GetKey(), v.BoolValue) + case *gstring.MSetRequest_KeyValue_BytesValue: + values = append(values, keyValue.GetKey(), v.BytesValue) + default: + return nil, errors.New("unsupported value type") + } + } + //print(values) + // Use the `MSet` method of the store to set the key-value pairs. + err := s.dbs.MSet(values...) + if err != nil { + return nil, err + } + + // Create the response indicating success. + response := &gstring.MSetResponse{ + Success: true, + } + + return response, nil +} + +func (s *str) MSetNX(ctx context.Context, req *gstring.MSetNXRequest) (*gstring.MSetNXResponse, error) { + // Extract the key-value pairs from the request and populate the `values` slice. + var values []interface{} + for _, keyValue := range req.GetPairs() { + switch v := keyValue.Value.(type) { + case *gstring.MSetNXRequest_KeyValue_StringValue: + values = append(values, keyValue.GetKey(), v.StringValue) + case *gstring.MSetNXRequest_KeyValue_Int32Value: + values = append(values, keyValue.GetKey(), v.Int32Value) + case *gstring.MSetNXRequest_KeyValue_Int64Value: + values = append(values, keyValue.GetKey(), v.Int64Value) + case *gstring.MSetNXRequest_KeyValue_Float32Value: + values = append(values, keyValue.GetKey(), v.Float32Value) + case *gstring.MSetNXRequest_KeyValue_Float64Value: + values = append(values, keyValue.GetKey(), v.Float64Value) + case *gstring.MSetNXRequest_KeyValue_BoolValue: + values = append(values, keyValue.GetKey(), v.BoolValue) + case *gstring.MSetNXRequest_KeyValue_BytesValue: + values = append(values, keyValue.GetKey(), v.BytesValue) + default: + return nil, errors.New("unsupported value type") + } + } + //print(values) + // Use the `MSet` method of the store to set the key-value pairs. + exists, err := s.dbs.MSetNX(values...) + if err != nil { + return nil, err + } + + // Create the response indicating success. + response := &gstring.MSetNXResponse{ + Success: exists, + } + + return response, nil +} diff --git a/lib/proto/glist/db.proto b/lib/proto/glist/db.proto index c1e19836..1446c75d 100644 --- a/lib/proto/glist/db.proto +++ b/lib/proto/glist/db.proto @@ -1,7 +1,8 @@ syntax = "proto3"; package glist; -option go_package = ".;glist"; + +option go_package = "flydb/lib/proto/glist"; service GListService { // example diff --git a/lib/proto/glist/db_grpc.pb.go b/lib/proto/glist/db_grpc.pb.go index c0dc9f2e..21d2cdac 100644 --- a/lib/proto/glist/db_grpc.pb.go +++ b/lib/proto/glist/db_grpc.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.6.1 -// source: db.proto +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.14.0 +// source: lib/proto/glist/db.proto package glist diff --git a/lib/proto/gstring/db.proto b/lib/proto/gstring/db.proto index f164558b..400063c8 100644 --- a/lib/proto/gstring/db.proto +++ b/lib/proto/gstring/db.proto @@ -21,6 +21,8 @@ service GStringService { rpc Expire(ExpireRequest) returns (ExpireResponse) {} rpc Persist(PersistRequest) returns (PersistResponse) {} rpc MGet(MGetRequest) returns (MGetResponse) {} + rpc MSet(MSetRequest) returns (MSetResponse) {} + rpc MSetNX(MSetNXRequest) returns (MSetNXResponse) {} } message FlyDBOption { @@ -219,3 +221,42 @@ message MGetValue { bytes bytes_value = 7; } } +message MSetRequest { + repeated KeyValue pairs = 1; + + message KeyValue { + string key = 1; + oneof value { + string string_value = 2; + int32 int32_value = 3; + int64 int64_value = 4; + float float32_value = 5; + double float64_value = 6; + bool bool_value = 7; + bytes bytes_value = 8; + } + } +} +message MSetResponse { + bool success = 1; +} +message MSetNXRequest { + repeated KeyValue pairs = 1; + + message KeyValue { + string key = 1; + oneof value { + string string_value = 2; + int32 int32_value = 3; + int64 int64_value = 4; + float float32_value = 5; + double float64_value = 6; + bool bool_value = 7; + bytes bytes_value = 8; + } + } +} + +message MSetNXResponse { + bool success = 1; +} \ No newline at end of file diff --git a/structure/list.go b/structure/list.go index 31cff58f..01ebd2e0 100644 --- a/structure/list.go +++ b/structure/list.go @@ -212,7 +212,7 @@ func (l *ListStructure) RPop(key string) (interface{}, error) { // Find the new tail newTail := lst.Head - for i := 0; i < lst.Length-1; i++ { + for i := 0; i < lst.Length-2; i++ { newTail = newTail.Next } popValue := newTail.Next.Value @@ -622,3 +622,8 @@ func (l *ListStructure) decodeList(value []byte) (*list, error) { return &lst, nil } + +func (s *ListStructure) Stop() error { + err := s.db.Close() + return err +} From 4ef9e85b4156f55c6af3750c34498504d82e615b Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Sat, 29 Jul 2023 19:18:15 +0800 Subject: [PATCH 5/7] provide windows mmap --- engine/fileio/mmap_io.go | 110 +------------------------- engine/fileio/mmap_io_unix.go | 125 +++++++++++++++++++++++++++++ engine/fileio/mmap_io_windows.go | 131 +++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 2 + 5 files changed, 262 insertions(+), 108 deletions(-) create mode 100644 engine/fileio/mmap_io_unix.go create mode 100644 engine/fileio/mmap_io_windows.go diff --git a/engine/fileio/mmap_io.go b/engine/fileio/mmap_io.go index fa41f592..88adc43d 100644 --- a/engine/fileio/mmap_io.go +++ b/engine/fileio/mmap_io.go @@ -1,22 +1,9 @@ package fileio import ( - "errors" - "os" "sync" - "syscall" - "unsafe" ) -type MMapIO struct { - fd *os.File // system file descriptor - data []byte // the mapping area corresponding to the file - offset int64 // next write location - fileSize int64 // max file size - fileName string - count int // the count of dbs using this mmap io -} - type mmapFileController struct { lock sync.Mutex files map[string]*MMapIO @@ -33,101 +20,10 @@ func NewMMapIOManager(fileName string, fileSize int64) (*MMapIO, error) { defer controller.lock.Unlock() if v, ok := controller.files[fileName]; ok { - v.count++ + v.AddCount() return v, nil } - mmapIO := &MMapIO{fileSize: fileSize, fileName: fileName, count: 1} - controller.files[fileName] = mmapIO - - fd, err := os.OpenFile( - fileName, - os.O_CREATE|os.O_RDWR|os.O_APPEND, - DataFilePerm, - ) - if err != nil { - return nil, err - } - info, _ := fd.Stat() - - // Expand files to maximum file size, crop when saving - if err := fd.Truncate(fileSize); err != nil { - return nil, err - } - - // Building mappings between memory and disk files - b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileSize), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) - if err != nil { - return nil, err - } - - mmapIO.fd = fd - mmapIO.data = b - mmapIO.offset = info.Size() - return mmapIO, nil -} - -// Read Copy data from the mapping area to byte slice -func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { - return copy(b, mio.data[offset:]), nil -} - -// Write Copy data from byte slice to the mapping area -func (mio *MMapIO) Write(b []byte) (int, error) { - oldOffset := mio.offset - newOffset := mio.offset + int64(len(b)) - if newOffset > mio.fileSize { - return 0, errors.New("exceed file max content length") - } - - mio.offset = newOffset - return copy(mio.data[oldOffset:newOffset], b), nil -} - -// Sync Synchronize data from memory to disk -func (mio *MMapIO) Sync() error { - _, _, err := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&mio.data[0])), uintptr(mio.offset), uintptr(syscall.MS_SYNC)) - if err != 0 { - return err - } - return nil -} - -// Close file -func (mio *MMapIO) Close() (err error) { - controller.lock.Lock() - defer controller.lock.Unlock() - - mio.count-- - if mio.count > 0 { - return nil - } - - delete(controller.files, mio.fileName) - - if err = mio.fd.Truncate(mio.offset); err != nil { - return err - } - if err = mio.Sync(); err != nil { - return err - } - if err = mio.UnMap(); err != nil { - panic(err) - } - return mio.fd.Close() -} - -// Size return the size of current file -func (mio *MMapIO) Size() (int64, error) { - return mio.offset, nil -} - -// UnMap Unmapping between memory and files -func (mio *MMapIO) UnMap() error { - if mio.data == nil { - return nil - } - err := syscall.Munmap(mio.data) - mio.data = nil - return err + manager, err := (&MMapIO{fileName: fileName, fileSize: fileSize}).Init() + return manager, err } diff --git a/engine/fileio/mmap_io_unix.go b/engine/fileio/mmap_io_unix.go new file mode 100644 index 00000000..96aa862f --- /dev/null +++ b/engine/fileio/mmap_io_unix.go @@ -0,0 +1,125 @@ +//go:build linux + +package fileio + +import ( + "errors" + "os" + "sync/atomic" + "syscall" + "unsafe" +) + +type MMapIO struct { + fd *os.File // system file descriptor + data []byte // the mapping area corresponding to the file + offset int64 // next write location + fileSize int64 // max file size + fileName string + count atomic.Int32 // the count of dbs using this mmap io +} + +func (mio *MMapIO) Init() (*MMapIO, error) { + fd, err := os.OpenFile(mio.fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, DataFilePerm) + if err != nil { + return nil, err + } + mio.fd = fd + + info, _ := fd.Stat() + mio.offset = info.Size() + + // Expand files to maximum file size, crop when saving + if err := fd.Truncate(mio.fileSize); err != nil { + return nil, err + } + + // Building mappings between memory and disk files + b, err := syscall.Mmap(int(mio.fd.Fd()), 0, int(mio.fileSize), + syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil, err + } + mio.data = b + + return mio, nil +} + +// UnMapUnix Unmapping between memory and files +func (mio *MMapIO) unmap() error { + if mio.data == nil { + return nil + } + + err := syscall.Munmap(mio.data) + mio.data = nil + + return err +} + +// Read Copy data from the mapping area to byte slice +func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { + return copy(b, mio.data[offset:]), nil +} + +// Write Copy data from byte slice to the mapping area +func (mio *MMapIO) Write(b []byte) (int, error) { + oldOffset := mio.offset + newOffset := mio.offset + int64(len(b)) + if newOffset > mio.fileSize { + return 0, errors.New("exceed file max content length") + } + + mio.offset = newOffset + return copy(mio.data[oldOffset:newOffset], b), nil +} + +// Sync Synchronize data from memory to disk +func (mio *MMapIO) Sync() error { + _, _, err := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&mio.data[0])), uintptr(mio.offset), uintptr(syscall.MS_SYNC)) + if err != 0 { + return err + } + return nil +} + +// Size return the size of current file +func (mio *MMapIO) Size() (int64, error) { + return mio.offset, nil +} + +// Close file +func (mio *MMapIO) Close() (err error) { + controller.lock.Lock() + defer controller.lock.Unlock() + + mio.SubCount() + if mio.GetCount() > 0 { + return nil + } + + delete(controller.files, mio.fileName) + + if err = mio.fd.Truncate(mio.offset); err != nil { + return err + } + if err = mio.Sync(); err != nil { + return err + } + if err = mio.unmap(); err != nil { + panic(err) + } + return mio.fd.Close() +} + +func (mio *MMapIO) GetCount() int32 { + return mio.count.Load() +} + +func (mio *MMapIO) AddCount() { + mio.count.Add(1) +} + +func (mio *MMapIO) SubCount() { + mio.count.Add(-1) +} diff --git a/engine/fileio/mmap_io_windows.go b/engine/fileio/mmap_io_windows.go new file mode 100644 index 00000000..3b950388 --- /dev/null +++ b/engine/fileio/mmap_io_windows.go @@ -0,0 +1,131 @@ +//go:build windows + +package fileio + +import ( + "errors" + "os" + "sync/atomic" + "syscall" + "unsafe" +) + +type MMapIO struct { + fd *os.File // system file descriptor + handle syscall.Handle + data []byte // the mapping area corresponding to the file + offset int64 // next write location + fileSize int64 // max file size + fileName string + count atomic.Uint32 // the count of dbs using this mmap io +} + +func (mio *MMapIO) Init() (*MMapIO, error) { + fd, err := os.OpenFile(mio.fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, DataFilePerm) + if err != nil { + return nil, err + } + mio.fd = fd + + info, _ := fd.Stat() + mio.offset = info.Size() + + // Expand files to maximum file size, crop when saving + if err := fd.Truncate(mio.fileSize); err != nil { + return nil, err + } + + // Building mappings between memory and disk files + h, err := syscall.CreateFileMapping(syscall.Handle(mio.fd.Fd()), nil, + syscall.PAGE_READWRITE, 0, uint32(mio.fileSize), nil) + if err != nil { + return nil, err + } + mio.handle = h + + addr, _ := syscall.MapViewOfFile(h, syscall.FILE_MAP_WRITE, 0, + 0, uintptr(mio.fileSize)) + if err != nil { + return nil, err + } + mio.data = *(*[]byte)(unsafe.Pointer(addr)) + + return mio, nil +} + +func (mio *MMapIO) unmap() error { + if mio.data == nil { + return nil + } + + addr := (uintptr)(unsafe.Pointer(&mio.data[0])) + err := syscall.UnmapViewOfFile(addr) + mio.data = nil + + return err +} + +// Read Copy data from the mapping area to byte slice +func (mio *MMapIO) Read(b []byte, offset int64) (int, error) { + return copy(b, mio.data[offset:]), nil +} + +// Write Copy data from byte slice to the mapping area +func (mio *MMapIO) Write(b []byte) (int, error) { + oldOffset := mio.offset + newOffset := mio.offset + int64(len(b)) + if newOffset > mio.fileSize { + return 0, errors.New("exceed file max content length") + } + + mio.offset = newOffset + return copy(mio.data[oldOffset:newOffset], b), nil +} + +func (mio *MMapIO) Sync() error { + err := syscall.FlushFileBuffers(mio.handle) + if err != nil { + return err + } + return nil +} + +// Size return the size of current file +func (mio *MMapIO) Size() (int64, error) { + return mio.offset, nil +} + +func (mio *MMapIO) Close() (err error) { + controller.lock.Lock() + defer controller.lock.Unlock() + + mio.SubCount() + if mio.GetCount() > 0 { + return nil + } + + delete(controller.files, mio.fileName) + + if err = mio.fd.Truncate(mio.offset); err != nil { + return err + } + if err = mio.Sync(); err != nil { + return err + } + if err = mio.unmap(); err != nil { + panic(err) + } + return syscall.CloseHandle(mio.handle) +} + +func (mio *MMapIO) GetCount() uint32 { + return mio.count.Load() +} + +func (mio *MMapIO) AddCount() { + mio.count.Add(1) +} + +func (mio *MMapIO) SubCount() { + mio.count.Add(-1) +} diff --git a/go.mod b/go.mod index e4f14199..55514983 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.8.0 // indirect - golang.org/x/sys v0.6.0 // indirect + golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.8.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index 5e8e2daf..f90ae34f 100644 --- a/go.sum +++ b/go.sum @@ -236,6 +236,8 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= From 9b2e23809462bc7bc0347ac1b3ede2c95c3b58ca Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Sat, 29 Jul 2023 21:28:42 +0800 Subject: [PATCH 6/7] controll the activate file offset --- engine/benchmark/benchmark_test.go | 19 +++++++++++++------ engine/data/data_file.go | 3 +-- engine/db.go | 11 +++++------ 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/engine/benchmark/benchmark_test.go b/engine/benchmark/benchmark_test.go index b8708ebf..6dc2356d 100644 --- a/engine/benchmark/benchmark_test.go +++ b/engine/benchmark/benchmark_test.go @@ -8,7 +8,7 @@ import ( "github.com/ByteStorage/FlyDB/flydb" _const "github.com/ByteStorage/FlyDB/lib/const" "math/rand" - "path/filepath" + "os" "testing" "time" ) @@ -34,18 +34,17 @@ func GetValue() []byte { return str.Bytes() } -func init() { +func Benchmark_PutValue_FlyDB(b *testing.B) { opts := config.DefaultOptions - opts.DirPath = filepath.Join("benchmark", "flydbtest") + dir, _ := os.MkdirTemp("", "flydbtest") + opts.DirPath = dir FlyDB, err = flydb.NewFlyDB(opts) defer FlyDB.Clean() if err != nil { panic(err) } -} -func Benchmark_PutValue_FlyDB(b *testing.B) { b.ResetTimer() b.ReportAllocs() @@ -58,6 +57,15 @@ func Benchmark_PutValue_FlyDB(b *testing.B) { } func Benchmark_GetValue_FlyDB(b *testing.B) { + opts := config.DefaultOptions + opts.DirPath = "/tmp/FlyDB" + + FlyDB, err = flydb.NewFlyDB(opts) + defer FlyDB.Close() + if err != nil { + panic(err) + } + for i := 0; i < 500000; i++ { err = FlyDB.Put(GetKey(i), GetValue()) if err != nil { @@ -74,5 +82,4 @@ func Benchmark_GetValue_FlyDB(b *testing.B) { panic(err) } } - } diff --git a/engine/data/data_file.go b/engine/data/data_file.go index 2c26a6fe..5db26ee3 100644 --- a/engine/data/data_file.go +++ b/engine/data/data_file.go @@ -110,11 +110,10 @@ func (df *DataFile) ReadLogRecord(offset int64) (*LogRecord, int64, error) { } func (df *DataFile) Write(buf []byte) error { - size, err := df.IoManager.Write(buf) + _, err := df.IoManager.Write(buf) if err != nil { return err } - df.WriteOff += int64(size) return nil } diff --git a/engine/db.go b/engine/db.go index dec262db..2e47c4b8 100644 --- a/engine/db.go +++ b/engine/db.go @@ -244,7 +244,9 @@ func (db *DB) setActiveDataFile() error { return err } db.activeFile = dataFile - SingleOffset().AddNew(initialFileID, 0) + + size, _ := dataFile.IoManager.Size() + SingleOffset().AddNew(initialFileID, size) return nil } @@ -468,7 +470,7 @@ func (db *DB) loadIndexFromDataFiles() error { var currentSeqNo = nonTransactionSeqNo // Iterate through all file ids, processing records in the file - for i, fid := range db.fileIds { + for _, fid := range db.fileIds { var fileID = uint32(fid) // If the id is smaller than that of the file that did not participate in the merge recently, // the hint file has been loaded @@ -533,10 +535,7 @@ func (db *DB) loadIndexFromDataFiles() error { offset += size } - // If it is a current active file, update writeOff for this file - if i == len(db.fileIds)-1 { - db.activeFile.WriteOff = offset - } + SingleOffset().AddNew(fileID, offset) } // Update the transaction sequence number to the database field From 9c1eeb8ffe0f2a535c3821cbb2c8ce7d679e4ffe Mon Sep 17 00:00:00 2001 From: ZhaoShuai <202122280606@std.uestc.edu.cn> Date: Sat, 29 Jul 2023 22:05:52 +0800 Subject: [PATCH 7/7] fix --- README.md | 4 ++-- README_CN.md | 2 +- engine/fileio/mmap_io_unix.go | 4 ++-- engine/fileio/mmap_io_windows.go | 5 ++--- go.mod | 1 - go.sum | 4 ---- structure/string.go | 4 ++-- structure/string_test.go | 7 +++++-- 8 files changed, 14 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index e16e7e55..220632e0 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ English | [中文](https://github.com/ByteStorage/flydb/blob/master/README_CN.md **FlyDB** is a high-performance key-value (KV) storage engine based on the efficient bitcask model. It offers fast and reliable data retrieval and storage capabilities. By leveraging the simplicity and effectiveness of the bitcask model, **FlyDB** ensures efficient read and write operations, resulting in improved overall performance. It provides a streamlined approach to storing and accessing key-value pairs, making it an excellent choice for scenarios that require fast and responsive data access. **FlyDB's** focus on speed and simplicity makes it a valuable alternative for applications that prioritize performance while balancing storage costs.  -## 🏁 Fast Start: FlyDB +## 🏁 Fast Start: FlyDB You can install FlyDB using the Go command line tool: @@ -195,7 +195,7 @@ FlyDB is released under the Apache license. For details, see LICENSE file. ## Thanks To JetBrains -> Thanks to `JetBrains` for the free open source license. +> Thanks to `JetBrains` for the free open source license. FlyDB-logo diff --git a/README_CN.md b/README_CN.md index 5b8fe8fa..9709dcd7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -165,7 +165,7 @@ PASS V1.0.4: 短期支持版本v1.0.4使用mmap来优化索引部分,与v1.0.3版本相比,显著提高了读写性能。50w数据写入速度从1.35秒加快到0.56秒,读取速度从1.06秒加快到0.355秒。 -V1.0.5: 短期支持版本v1.0.5支持String、List类型的数据结构。并对部分代码进行了优化。 +V1.0.5: 短期支持版本v1.0.5支持String、List类型的数据结构。并对部分代码进行了优化。 V1.0.6: 短期支持版本v1.0.6支持Hash类型的数据结构。并对部分代码进行了优化。 diff --git a/engine/fileio/mmap_io_unix.go b/engine/fileio/mmap_io_unix.go index 96aa862f..711d2db3 100644 --- a/engine/fileio/mmap_io_unix.go +++ b/engine/fileio/mmap_io_unix.go @@ -4,8 +4,8 @@ package fileio import ( "errors" + atomic2 "go.uber.org/atomic" "os" - "sync/atomic" "syscall" "unsafe" ) @@ -16,7 +16,7 @@ type MMapIO struct { offset int64 // next write location fileSize int64 // max file size fileName string - count atomic.Int32 // the count of dbs using this mmap io + count atomic2.Int32 // the count of dbs using this mmap io } func (mio *MMapIO) Init() (*MMapIO, error) { diff --git a/engine/fileio/mmap_io_windows.go b/engine/fileio/mmap_io_windows.go index 3b950388..58de9440 100644 --- a/engine/fileio/mmap_io_windows.go +++ b/engine/fileio/mmap_io_windows.go @@ -5,7 +5,6 @@ package fileio import ( "errors" "os" - "sync/atomic" "syscall" "unsafe" ) @@ -17,7 +16,7 @@ type MMapIO struct { offset int64 // next write location fileSize int64 // max file size fileName string - count atomic.Uint32 // the count of dbs using this mmap io + count atomic.Int32 // the count of dbs using this mmap io } func (mio *MMapIO) Init() (*MMapIO, error) { @@ -118,7 +117,7 @@ func (mio *MMapIO) Close() (err error) { return syscall.CloseHandle(mio.handle) } -func (mio *MMapIO) GetCount() uint32 { +func (mio *MMapIO) GetCount() int32 { return mio.count.Load() } diff --git a/go.mod b/go.mod index b9e6e372..9ad78fe0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/bwmarrin/snowflake v0.3.0 github.com/chen3feng/stl4go v0.1.1 github.com/desertbit/grumble v1.1.3 - github.com/edsrzf/mmap-go v1.1.0 github.com/fatih/color v1.13.0 github.com/golang/protobuf v1.5.3 github.com/google/btree v1.1.2 diff --git a/go.sum b/go.sum index 4c53d6e4..20a11a80 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,6 @@ github.com/desertbit/grumble v1.1.3 h1:gbdgVGWsHmNraJ7Gn6Q4TiUEIHU/UHfbc1KUSbBlg github.com/desertbit/grumble v1.1.3/go.mod h1:r7j3ShNy5EmOsegRD2DzTutIaGiLiA3M5yBTXXeLwcs= github.com/desertbit/readline v1.5.1 h1:/wOIZkWYl1s+IvJm/5bOknfUgs6MhS9svRNZpFM53Os= github.com/desertbit/readline v1.5.1/go.mod h1:pHQgTsCFs9Cpfh5mlSUFi9Xa5kkL4d8L1Jo4UVWzPw0= -github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= -github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= @@ -238,8 +236,6 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/structure/string.go b/structure/string.go index 7362ef62..792464e3 100644 --- a/structure/string.go +++ b/structure/string.go @@ -182,13 +182,13 @@ func (s *StringStructure) Append(key string, v interface{}, ttl time.Duration) e } // Convert the old value to a byte slice - oldValueType := oldValue.([]byte) + oldValueType := []byte(oldValue.(string)) // Append the value newValue := append(oldValueType, value...) // Set the value - return s.Set(key, newValue, ttl) + return s.Set(key, string(newValue), ttl) } // Incr increments the integer value of a key by 1 diff --git a/structure/string_test.go b/structure/string_test.go index 34e2967a..c0dc009d 100644 --- a/structure/string_test.go +++ b/structure/string_test.go @@ -101,11 +101,14 @@ func TestStringStructure_Append(t *testing.T) { str, _ := initdb() defer str.db.Clean() - err = str.Set("1", randkv.RandomValue(10), 0) + err = str.Set("1", "msg", 0) assert.Nil(t, err) - err = str.Append("1", randkv.RandomValue(5), 0) + err = str.Append("1", "123", 0) assert.Nil(t, err) + + value, _ := str.Get("1") + assert.Equal(t, value, "msg123") } func TestStringStructure_Incr(t *testing.T) {