From c86203535352fad715aa3c04a5de971ac580d177 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 20:06:17 +0900 Subject: [PATCH 1/7] Add support for list storage and operations --- adapter/dynamodb_transcoder.go | 69 ++++- adapter/redis.go | 543 ++++++++++++++++++++++++++------- store/list_store.go | 272 +++++++++++++++++ store/list_store_test.go | 82 +++++ 4 files changed, 840 insertions(+), 126 deletions(-) create mode 100644 store/list_store.go create mode 100644 store/list_store_test.go diff --git a/adapter/dynamodb_transcoder.go b/adapter/dynamodb_transcoder.go index 3b6734a..8e995b6 100644 --- a/adapter/dynamodb_transcoder.go +++ b/adapter/dynamodb_transcoder.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" ) @@ -14,7 +15,8 @@ func newDynamoDBTranscoder() *dynamodbTranscoder { // create new transcoder } type attributeValue struct { - S string `json:"S"` + S string `json:"S,omitempty"` + L []attributeValue `json:"L,omitempty"` } type putItemInput struct { @@ -43,16 +45,8 @@ func (t *dynamodbTranscoder) PutItemToRequest(b []byte) (*kv.OperationGroup[kv.O if !ok { return nil, errors.New("missing value attribute") } - return &kv.OperationGroup[kv.OP]{ - IsTxn: false, - Elems: []*kv.Elem[kv.OP]{ - { - Op: kv.Put, - Key: []byte(keyAttr.S), - Value: []byte(valAttr.S), - }, - }, - }, nil + + return t.valueAttrToOps([]byte(keyAttr.S), valAttr) } func (t *dynamodbTranscoder) TransactWriteItemsToRequest(b []byte) (*kv.OperationGroup[kv.OP], error) { @@ -74,11 +68,12 @@ func (t *dynamodbTranscoder) TransactWriteItemsToRequest(b []byte) (*kv.Operatio if !ok { return nil, errors.New("missing value attribute") } - elems = append(elems, &kv.Elem[kv.OP]{ - Op: kv.Put, - Key: []byte(keyAttr.S), - Value: []byte(valAttr.S), - }) + + ops, err := t.valueAttrToOps([]byte(keyAttr.S), valAttr) + if err != nil { + return nil, err + } + elems = append(elems, ops.Elems...) } return &kv.OperationGroup[kv.OP]{ @@ -86,3 +81,45 @@ func (t *dynamodbTranscoder) TransactWriteItemsToRequest(b []byte) (*kv.Operatio Elems: elems, }, nil } + +func (t *dynamodbTranscoder) valueAttrToOps(key []byte, val attributeValue) (*kv.OperationGroup[kv.OP], error) { + // List handling: only lists of scalar strings are supported. + if len(val.L) > 0 { + var elems []*kv.Elem[kv.OP] + for i, item := range val.L { + if item.S == "" { + return nil, errors.New("only string list items are supported") + } + elems = append(elems, &kv.Elem[kv.OP]{ + Op: kv.Put, + Key: store.ListItemKey(key, int64(i)), + Value: []byte(item.S), + }) + } + meta := store.ListMeta{ + Head: 0, + Tail: int64(len(val.L)), + Len: int64(len(val.L)), + } + b, err := json.Marshal(meta) + if err != nil { + return nil, errors.WithStack(err) + } + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: store.ListMetaKey(key), Value: b}) + + return &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems}, nil + } + + // Default: simple string + if val.S == "" { + return nil, errors.New("unsupported attribute type (only S or L of S)") + } + return &kv.OperationGroup[kv.OP]{ + IsTxn: false, + Elems: []*kv.Elem[kv.OP]{{ + Op: kv.Put, + Key: key, + Value: []byte(val.S), + }}, + }, nil +} diff --git a/adapter/redis.go b/adapter/redis.go index c71de71..0207b56 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -41,6 +41,7 @@ type RedisServer struct { store store.ScanStore coordinator kv.Coordinator redisTranscoder *redisTranscoder + listStore *store.ListStore // TODO manage membership from raft log leaderRedis map[raft.ServerAddress]string @@ -72,12 +73,15 @@ type redisResult struct { err error } +func store2list(st store.ScanStore) *store.ListStore { return store.NewListStore(st) } + func NewRedisServer(listen net.Listener, store store.ScanStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer { r := &RedisServer{ listen: listen, store: store, coordinator: coordinate, redisTranscoder: newRedisTranscoder(), + listStore: store2list(store), leaderRedis: leaderRedis, } @@ -172,6 +176,20 @@ func (r *RedisServer) ping(conn redcon.Conn, _ redcon.Command) { } func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { + // Prevent overwriting list keys with string values without cleanup. + isList, err := r.isListKey(context.Background(), cmd.Args[1]) + if err != nil { + conn.WriteError(err.Error()) + return + } + if isList { + // delete list metadata so this becomes a plain string key + if err := r.deleteList(context.Background(), cmd.Args[1]); err != nil { + conn.WriteError(err.Error()) + return + } + } + res, err := r.redisTranscoder.SetToRequest(cmd.Args[1], cmd.Args[2]) if err != nil { conn.WriteError(err.Error()) @@ -188,6 +206,14 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { } func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { + if ok, err := r.isListKey(context.Background(), cmd.Args[1]); err != nil { + conn.WriteError(err.Error()) + return + } else if ok { + conn.WriteError("WRONGTYPE Operation against a key holding the wrong kind of value") + return + } + if r.coordinator.IsLeader() { v, err := r.store.Get(context.Background(), cmd.Args[1]) if err != nil { @@ -218,6 +244,18 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) { } func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) { + if ok, err := r.isListKey(context.Background(), cmd.Args[1]); err != nil { + conn.WriteError(err.Error()) + return + } else if ok { + if err := r.deleteList(context.Background(), cmd.Args[1]); err != nil { + conn.WriteError(err.Error()) + return + } + conn.WriteInt(1) + return + } + res, err := r.redisTranscoder.DeleteToRequest(cmd.Args[1]) if err != nil { conn.WriteError(err.Error()) @@ -234,6 +272,14 @@ func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) { } func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) { + if ok, err := r.isListKey(context.Background(), cmd.Args[1]); err != nil { + conn.WriteError(err.Error()) + return + } else if ok { + conn.WriteInt(1) + return + } + ok, err := r.store.Exists(context.Background(), cmd.Args[1]) if err != nil { conn.WriteError(err.Error()) @@ -276,39 +322,61 @@ func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) { } func (r *RedisServer) localKeys(pattern []byte) ([][]byte, error) { - // If an asterisk (*) is not included, the match will be exact, - // so check if the key exists. if !bytes.Contains(pattern, []byte("*")) { - res, err := r.store.Exists(context.Background(), pattern) - if err != nil { - return nil, errors.WithStack(err) - } - if res { - return [][]byte{bytes.Clone(pattern)}, nil - } - return [][]byte{}, nil + return r.localKeysExact(pattern) } + return r.localKeysPattern(pattern) +} - var start []byte - switch { - case bytes.Equal(pattern, []byte("*")): - start = nil - default: - start = bytes.ReplaceAll(pattern, []byte("*"), nil) +func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) { + res, err := r.store.Exists(context.Background(), pattern) + if err != nil { + return nil, errors.WithStack(err) } + if res { + return [][]byte{bytes.Clone(pattern)}, nil + } + return [][]byte{}, nil +} + +func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) { + start := r.patternStart(pattern) keys, err := r.store.Scan(context.Background(), start, nil, math.MaxInt) if err != nil { return nil, errors.WithStack(err) } - out := make([][]byte, 0, len(keys)) - for _, kvPair := range keys { - out = append(out, kvPair.Key) + keyset := r.collectUserKeys(keys) + + out := make([][]byte, 0, len(keyset)) + for _, v := range keyset { + out = append(out, v) } return out, nil } +func (r *RedisServer) patternStart(pattern []byte) []byte { + if bytes.Equal(pattern, []byte("*")) { + return nil + } + return bytes.ReplaceAll(pattern, []byte("*"), nil) +} + +func (r *RedisServer) collectUserKeys(kvs []*store.KVPair) map[string][]byte { + keyset := map[string][]byte{} + for _, kvPair := range kvs { + if store.IsListMetaKey(kvPair.Key) || store.IsListItemKey(kvPair.Key) { + if userKey := store.ExtractListUserKey(kvPair.Key); userKey != nil { + keyset[string(userKey)] = userKey + } + continue + } + keyset[string(kvPair.Key)] = kvPair.Key + } + return keyset +} + func (r *RedisServer) proxyKeys(pattern []byte) ([]string, error) { leader := r.coordinator.RaftLeader() if leader == "" { @@ -374,16 +442,22 @@ func (r *RedisServer) exec(conn redcon.Conn, _ redcon.Command) { type txnValue struct { raw []byte - list []string - isList bool deleted bool dirty bool loaded bool } type txnContext struct { - server *RedisServer - working map[string]*txnValue + server *RedisServer + working map[string]*txnValue + listStates map[string]*listTxnState +} + +type listTxnState struct { + meta store.ListMeta + metaExists bool + appends [][]byte + deleted bool } func (t *txnContext) load(key []byte) (*txnValue, error) { @@ -402,6 +476,30 @@ func (t *txnContext) load(key []byte) (*txnValue, error) { return tv, nil } +func (t *txnContext) loadListState(key []byte) (*listTxnState, error) { + k := string(key) + if st, ok := t.listStates[k]; ok { + return st, nil + } + + meta, exists, err := t.server.loadListMeta(context.Background(), key) + if err != nil { + return nil, err + } + + st := &listTxnState{ + meta: meta, + metaExists: exists, + appends: [][]byte{}, + } + t.listStates[k] = st + return st, nil +} + +func (t *txnContext) listLength(st *listTxnState) int64 { + return st.meta.Len + int64(len(st.appends)) +} + func (t *txnContext) apply(cmd redcon.Command) (redisResult, error) { switch strings.ToUpper(string(cmd.Args[0])) { case "SET": @@ -422,18 +520,36 @@ func (t *txnContext) apply(cmd redcon.Command) (redisResult, error) { } func (t *txnContext) applySet(cmd redcon.Command) (redisResult, error) { + if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + return redisResult{}, err + } else if isList { + return redisResult{typ: resultError, err: errors.New("WRONGTYPE Operation against a key holding the wrong kind of value")}, nil + } + tv, err := t.load(cmd.Args[1]) if err != nil { return redisResult{}, err } tv.raw = cmd.Args[2] - tv.isList = false tv.deleted = false tv.dirty = true return redisResult{typ: resultString, str: "OK"}, nil } func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) { + // handle list delete separately + if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + return redisResult{}, err + } else if isList { + st, err := t.loadListState(cmd.Args[1]) + if err != nil { + return redisResult{}, err + } + st.deleted = true + st.appends = nil + return redisResult{typ: resultInt, integer: 1}, nil + } + tv, err := t.load(cmd.Args[1]) if err != nil { return redisResult{}, err @@ -444,6 +560,12 @@ func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) { } func (t *txnContext) applyGet(cmd redcon.Command) (redisResult, error) { + if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + return redisResult{}, err + } else if isList { + return redisResult{typ: resultError, err: errors.New("WRONGTYPE Operation against a key holding the wrong kind of value")}, nil + } + tv, err := t.load(cmd.Args[1]) if err != nil { return redisResult{}, err @@ -455,6 +577,12 @@ func (t *txnContext) applyGet(cmd redcon.Command) (redisResult, error) { } func (t *txnContext) applyExists(cmd redcon.Command) (redisResult, error) { + if isList, err := t.server.isListKey(context.Background(), cmd.Args[1]); err != nil { + return redisResult{}, err + } else if isList { + return redisResult{typ: resultInt, integer: 1}, nil + } + tv, err := t.load(cmd.Args[1]) if err != nil { return redisResult{}, err @@ -465,63 +593,101 @@ func (t *txnContext) applyExists(cmd redcon.Command) (redisResult, error) { return redisResult{typ: resultInt, integer: 1}, nil } -func (t *txnContext) ensureList(tv *txnValue) error { - if tv.isList { - return nil - } - list, err := decodeList(tv.raw) - if err != nil { - return err - } - tv.list = list - tv.isList = true - return nil -} - func (t *txnContext) applyRPush(cmd redcon.Command) (redisResult, error) { - tv, err := t.load(cmd.Args[1]) + st, err := t.loadListState(cmd.Args[1]) if err != nil { return redisResult{}, err } - if err := t.ensureList(tv); err != nil { - return redisResult{}, err - } + for _, v := range cmd.Args[2:] { - tv.list = append(tv.list, string(v)) + st.appends = append(st.appends, bytes.Clone(v)) } - tv.dirty = true - tv.deleted = false - return redisResult{typ: resultInt, integer: int64(len(tv.list))}, nil + + return redisResult{typ: resultInt, integer: t.listLength(st)}, nil } func (t *txnContext) applyLRange(cmd redcon.Command) (redisResult, error) { - tv, err := t.load(cmd.Args[1]) + st, err := t.loadListState(cmd.Args[1]) if err != nil { return redisResult{}, err } - if err := t.ensureList(tv); err != nil { + + s, e, err := parseRangeBounds(cmd.Args[2], cmd.Args[3], int(t.listLength(st))) + if err != nil { + return redisResult{}, err + } + if e < s { + return redisResult{typ: resultArray, arr: []string{}}, nil + } + + out, err := t.listRangeValues(cmd.Args[1], st, s, e) + if err != nil { return redisResult{}, err } - start, err := strconv.Atoi(string(cmd.Args[2])) + + return redisResult{typ: resultArray, arr: out}, nil +} + +func parseRangeBounds(startRaw, endRaw []byte, total int) (int, int, error) { + start, err := strconv.Atoi(string(startRaw)) if err != nil { - return redisResult{}, errors.WithStack(err) + return 0, 0, errors.WithStack(err) } - end, err := strconv.Atoi(string(cmd.Args[3])) + end, err := strconv.Atoi(string(endRaw)) if err != nil { - return redisResult{}, errors.WithStack(err) + return 0, 0, errors.WithStack(err) } - s, e := clampRange(start, end, len(tv.list)) - if e < s { - return redisResult{typ: resultArray, arr: []string{}}, nil + s, e := clampRange(start, end, total) + return s, e, nil +} + +func (t *txnContext) listRangeValues(key []byte, st *listTxnState, s, e int) ([]string, error) { + persistedLen := int(st.meta.Len) + + switch { + case e < persistedLen: + return t.server.fetchListRange(context.Background(), key, st.meta, int64(s), int64(e)) + case s >= persistedLen: + return appendValues(st.appends, s-persistedLen, e-persistedLen), nil + default: + head, err := t.server.fetchListRange(context.Background(), key, st.meta, int64(s), int64(persistedLen-1)) + if err != nil { + return nil, err + } + tail := appendValues(st.appends, 0, e-persistedLen) + return append(head, tail...), nil } - return redisResult{typ: resultArray, arr: tv.list[s : e+1]}, nil +} + +func appendValues(buf [][]byte, start, end int) []string { + out := make([]string, 0, end-start+1) + for i := start; i <= end; i++ { + out = append(out, string(buf[i])) + } + return out } func (t *txnContext) commit() error { - if len(t.working) == 0 { + elems := t.buildKeyElems() + + listElems, err := t.buildListElems() + if err != nil { + return err + } + + elems = append(elems, listElems...) + if len(elems) == 0 { return nil } + group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems} + if _, err := t.server.coordinator.Dispatch(group); err != nil { + return errors.WithStack(err) + } + return nil +} + +func (t *txnContext) buildKeyElems() []*kv.Elem[kv.OP] { keys := make([]string, 0, len(t.working)) for k := range t.working { keys = append(keys, k) @@ -539,34 +705,56 @@ func (t *txnContext) commit() error { elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: key}) continue } - var val []byte - if tv.isList { - enc, err := encodeList(tv.list) - if err != nil { - return err - } - val = enc - } else { - val = tv.raw - } - elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: key, Value: val}) + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: key, Value: tv.raw}) } + return elems +} - if len(elems) == 0 { - return nil +func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) { + listKeys := make([]string, 0, len(t.listStates)) + for k := range t.listStates { + listKeys = append(listKeys, k) } + sort.Strings(listKeys) - group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems} - if _, err := t.server.coordinator.Dispatch(group); err != nil { - return errors.WithStack(err) + var elems []*kv.Elem[kv.OP] + for _, k := range listKeys { + st := t.listStates[k] + userKey := []byte(k) + + if st.deleted { + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(userKey)}) + continue + } + if len(st.appends) == 0 { + continue + } + + startSeq := st.meta.Head + st.meta.Len + for i, v := range st.appends { + elems = append(elems, &kv.Elem[kv.OP]{ + Op: kv.Put, + Key: listItemKey(userKey, startSeq+int64(i)), + Value: v, + }) + } + + st.meta.Len += int64(len(st.appends)) + st.meta.Tail = st.meta.Head + st.meta.Len + metaBytes, err := json.Marshal(st.meta) + if err != nil { + return nil, errors.WithStack(err) + } + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: listMetaKey(userKey), Value: metaBytes}) } - return nil + return elems, nil } func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) { ctx := &txnContext{ - server: r, - working: map[string]*txnValue{}, + server: r, + working: map[string]*txnValue{}, + listStates: map[string]*listTxnState{}, } results := make([]redisResult, 0, len(queue)) @@ -610,21 +798,14 @@ func (r *RedisServer) writeResults(conn redcon.Conn, results []redisResult) { } } -// list helpers -func decodeList(b []byte) ([]string, error) { - if b == nil { - return []string{}, nil - } - var out []string - if err := json.Unmarshal(b, &out); err != nil { - return nil, errors.WithStack(err) - } - return out, nil +// --- list helpers ---------------------------------------------------- + +func listMetaKey(userKey []byte) []byte { + return store.ListMetaKey(userKey) } -func encodeList(list []string) ([]byte, error) { - b, err := json.Marshal(list) - return b, errors.WithStack(err) +func listItemKey(userKey []byte, seq int64) []byte { + return store.ListItemKey(userKey, seq) } func clampRange(start, end, length int) (int, int) { @@ -646,15 +827,115 @@ func clampRange(start, end, length int) (int, int) { return start, end } -func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string, error) { - val, err := r.getValue(key) - if err != nil && !errors.Is(err, store.ErrKeyNotFound) { - return nil, errors.WithStack(err) +func (r *RedisServer) loadListMeta(ctx context.Context, key []byte) (store.ListMeta, bool, error) { + meta, exists, err := r.listStore.LoadMeta(ctx, key) + return meta, exists, errors.WithStack(err) +} + +func (r *RedisServer) isListKey(ctx context.Context, key []byte) (bool, error) { + isList, err := r.listStore.IsList(ctx, key) + return isList, errors.WithStack(err) +} + +func (r *RedisServer) buildRPushOps(meta store.ListMeta, key []byte, values [][]byte) ([]*kv.Elem[kv.OP], store.ListMeta, error) { + if len(values) == 0 { + return nil, meta, nil + } + + elems := make([]*kv.Elem[kv.OP], 0, len(values)+1) + seq := meta.Head + meta.Len + for _, v := range values { + vCopy := bytes.Clone(v) + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: listItemKey(key, seq), Value: vCopy}) + seq++ + } + + meta.Len += int64(len(values)) + meta.Tail = meta.Head + meta.Len + + b, err := json.Marshal(meta) + if err != nil { + return nil, meta, errors.WithStack(err) + } + + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Put, Key: listMetaKey(key), Value: b}) + return elems, meta, nil +} + +func (r *RedisServer) listRPush(ctx context.Context, key []byte, values [][]byte) (int64, error) { + meta, _, err := r.loadListMeta(ctx, key) + if err != nil { + return 0, err + } + + ops, newMeta, err := r.buildRPushOps(meta, key, values) + if err != nil { + return 0, err + } + if len(ops) == 0 { + return newMeta.Len, nil + } + + group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: ops} + if _, err := r.coordinator.Dispatch(group); err != nil { + return 0, errors.WithStack(err) + } + return newMeta.Len, nil +} + +func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { + _, exists, err := r.loadListMeta(ctx, key) + if err != nil { + return err } - list, err := decodeList(val) + if !exists { + return nil + } + + ops := []*kv.Elem[kv.OP]{ + {Op: kv.Del, Key: listMetaKey(key)}, + } + group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: ops} + _, err = r.coordinator.Dispatch(group) + return errors.WithStack(err) +} + +func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store.ListMeta, startIdx, endIdx int64) ([]string, error) { + if endIdx < startIdx { + return []string{}, nil + } + + startSeq := meta.Head + startIdx + endSeq := meta.Head + endIdx + + startKey := listItemKey(key, startSeq) + endKey := listItemKey(key, endSeq+1) // exclusive + + kvs, err := r.store.Scan(ctx, startKey, endKey, int(endIdx-startIdx+1)) if err != nil { return nil, errors.WithStack(err) } + + out := make([]string, 0, len(kvs)) + for _, kvp := range kvs { + out = append(out, string(kvp.Value)) + } + return out, nil +} + +func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string, error) { + if !r.coordinator.IsLeader() { + return r.proxyLRange(key, startRaw, endRaw) + } + + meta, exists, err := r.loadListMeta(context.Background(), key) + if err != nil { + return nil, err + } + if !exists || meta.Len == 0 { + return []string{}, nil + } + start, err := strconv.Atoi(string(startRaw)) if err != nil { return nil, errors.WithStack(err) @@ -663,11 +944,57 @@ func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string, if err != nil { return nil, errors.WithStack(err) } - s, e := clampRange(start, end, len(list)) + + s, e := clampRange(start, end, int(meta.Len)) if e < s { return []string{}, nil } - return list[s : e+1], nil + + return r.fetchListRange(context.Background(), key, meta, int64(s), int64(e)) +} + +func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string, error) { + leader := r.coordinator.RaftLeader() + if leader == "" { + return nil, ErrLeaderNotFound + } + leaderAddr, ok := r.leaderRedis[leader] + if !ok || leaderAddr == "" { + return nil, errors.WithStack(errors.Newf("leader redis address unknown for %s", leader)) + } + + cli := redis.NewClient(&redis.Options{Addr: leaderAddr}) + defer func() { _ = cli.Close() }() + + res, err := cli.LRange(context.Background(), string(key), parseInt(startRaw), parseInt(endRaw)).Result() + return res, errors.WithStack(err) +} + +func (r *RedisServer) proxyRPush(key []byte, values [][]byte) (int64, error) { + leader := r.coordinator.RaftLeader() + if leader == "" { + return 0, ErrLeaderNotFound + } + leaderAddr, ok := r.leaderRedis[leader] + if !ok || leaderAddr == "" { + return 0, errors.WithStack(errors.Newf("leader redis address unknown for %s", leader)) + } + + cli := redis.NewClient(&redis.Options{Addr: leaderAddr}) + defer func() { _ = cli.Close() }() + + args := make([]interface{}, 0, len(values)) + for _, v := range values { + args = append(args, string(v)) + } + + res, err := cli.RPush(context.Background(), string(key), args...).Result() + return res, errors.WithStack(err) +} + +func parseInt(b []byte) int64 { + i, _ := strconv.ParseInt(string(b), 10, 64) + return i } // tryLeaderGet proxies a GET to the current Raft leader, returning the value and @@ -705,25 +1032,21 @@ func (r *RedisServer) getValue(key []byte) ([]byte, error) { } func (r *RedisServer) rpush(conn redcon.Conn, cmd redcon.Command) { - results, err := r.runTransaction([]redcon.Command{cmd}) + ctx := context.Background() + + var length int64 + var err error + if r.coordinator.IsLeader() { + length, err = r.listRPush(ctx, cmd.Args[1], cmd.Args[2:]) + } else { + length, err = r.proxyRPush(cmd.Args[1], cmd.Args[2:]) + } + if err != nil { conn.WriteError(err.Error()) return } - if len(results) != 1 { - conn.WriteError("ERR internal error: rpush should have one result") - return - } - res := results[0] - if res.err != nil { - conn.WriteError(res.err.Error()) - return - } - if res.typ != resultInt { - conn.WriteError("ERR internal error: rpush result should be an integer") - return - } - conn.WriteInt64(res.integer) + conn.WriteInt64(length) } func (r *RedisServer) lrange(conn redcon.Conn, cmd redcon.Command) { diff --git a/store/list_store.go b/store/list_store.go new file mode 100644 index 0000000..2525121 --- /dev/null +++ b/store/list_store.go @@ -0,0 +1,272 @@ +package store + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math" + + "github.com/cockroachdb/errors" +) + +// Wide-column style list storage using per-element keys. +// Item keys: !lst|itm||%020d +// Meta key : !lst|meta| -> {"h":head,"t":tail,"l":len} + +const ( + ListMetaPrefix = "!lst|meta|" + ListItemPrefix = "!lst|itm|" + ListSeqWidth = 20 +) + +type ListMeta struct { + Head int64 `json:"h"` + Tail int64 `json:"t"` + Len int64 `json:"l"` +} + +// ListStore requires ScanStore to fetch ranges efficiently. +type ListStore struct { + store ScanStore +} + +func NewListStore(base ScanStore) *ListStore { + return &ListStore{store: base} +} + +// IsList reports whether the given key has list metadata. +func (s *ListStore) IsList(ctx context.Context, key []byte) (bool, error) { + _, exists, err := s.LoadMeta(ctx, key) + return exists, err +} + +// PutList replaces the entire list. +func (s *ListStore) PutList(ctx context.Context, key []byte, list []string) error { + // delete existing meta/items (best-effort) + if err := s.deleteList(ctx, key); err != nil { + return err + } + + meta := ListMeta{Head: 0, Tail: int64(len(list)), Len: int64(len(list))} + metaBytes, err := json.Marshal(meta) + if err != nil { + return errors.WithStack(err) + } + + return errors.WithStack(s.store.Txn(ctx, func(ctx context.Context, txn Txn) error { + for i, v := range list { + if err := txn.Put(ctx, ListItemKey(key, int64(i)), []byte(v)); err != nil { + return errors.WithStack(err) + } + } + if err := txn.Put(ctx, ListMetaKey(key), metaBytes); err != nil { + return errors.WithStack(err) + } + return nil + })) +} + +// GetList returns the whole list. It reconstructs via Scan; avoid for huge lists. +func (s *ListStore) GetList(ctx context.Context, key []byte) ([]string, error) { + meta, exists, err := s.LoadMeta(ctx, key) + if err != nil { + return nil, err + } + if !exists || meta.Len == 0 { + return nil, ErrKeyNotFound + } + return s.Range(ctx, key, 0, int(meta.Len)-1) +} + +// RPush appends values and returns new length. +func (s *ListStore) RPush(ctx context.Context, key []byte, values ...string) (int, error) { + if len(values) == 0 { + return 0, nil + } + + newLen := 0 + + err := s.store.Txn(ctx, func(ctx context.Context, txn Txn) error { + // load meta inside txn for correctness + meta, exists, err := s.loadMetaTxn(ctx, txn, key) + if err != nil { + return err + } + if !exists { + meta = ListMeta{Head: 0, Tail: 0, Len: 0} + } + + startSeq := meta.Head + meta.Len + + for i, v := range values { + seq := startSeq + int64(i) + if err := txn.Put(ctx, ListItemKey(key, seq), []byte(v)); err != nil { + return errors.WithStack(err) + } + } + meta.Len += int64(len(values)) + meta.Tail = meta.Head + meta.Len + metaBytes, err := json.Marshal(meta) + if err != nil { + return errors.WithStack(err) + } + newLen = int(meta.Len) + return errors.WithStack(txn.Put(ctx, ListMetaKey(key), metaBytes)) + }) + if err != nil { + return 0, errors.WithStack(err) + } + + return newLen, nil +} + +// Range returns elements between start and end (inclusive). +// Negative indexes follow Redis semantics. +func (s *ListStore) Range(ctx context.Context, key []byte, start, end int) ([]string, error) { + meta, exists, err := s.LoadMeta(ctx, key) + if err != nil { + return nil, err + } + if !exists || meta.Len == 0 { + return nil, ErrKeyNotFound + } + + si, ei := clampRange(start, end, int(meta.Len)) + if ei < si { + return []string{}, nil + } + + startSeq := meta.Head + int64(si) + endSeq := meta.Head + int64(ei) + startKey := ListItemKey(key, startSeq) + endKey := ListItemKey(key, endSeq+1) // exclusive + + kvs, err := s.store.Scan(ctx, startKey, endKey, ei-si+1) + if err != nil { + return nil, errors.WithStack(err) + } + + out := make([]string, 0, len(kvs)) + for _, kvp := range kvs { + out = append(out, string(kvp.Value)) + } + return out, nil +} + +// --- helpers --- + +// LoadMeta returns metadata and whether the list exists. +func (s *ListStore) LoadMeta(ctx context.Context, key []byte) (ListMeta, bool, error) { + val, err := s.store.Get(ctx, ListMetaKey(key)) + if err != nil { + if errors.Is(err, ErrKeyNotFound) { + return ListMeta{}, false, nil + } + return ListMeta{}, false, errors.WithStack(err) + } + if len(val) == 0 { + return ListMeta{}, false, nil + } + var meta ListMeta + if err := json.Unmarshal(val, &meta); err != nil { + return ListMeta{}, false, errors.WithStack(err) + } + return meta, true, nil +} + +func (s *ListStore) loadMetaTxn(ctx context.Context, txn Txn, key []byte) (ListMeta, bool, error) { + val, err := txn.Get(ctx, ListMetaKey(key)) + if err != nil { + if errors.Is(err, ErrKeyNotFound) { + return ListMeta{}, false, nil + } + return ListMeta{}, false, errors.WithStack(err) + } + if len(val) == 0 { + return ListMeta{}, false, nil + } + var meta ListMeta + if err := json.Unmarshal(val, &meta); err != nil { + return ListMeta{}, false, errors.WithStack(err) + } + return meta, true, nil +} + +func (s *ListStore) deleteList(ctx context.Context, key []byte) error { + start := ListItemKey(key, mathMinInt64) // use smallest seq + end := ListItemKey(key, mathMaxInt64) + + items, err := s.store.Scan(ctx, start, end, math.MaxInt) + if err != nil && !errors.Is(err, ErrKeyNotFound) { + return errors.WithStack(err) + } + + return errors.WithStack(s.store.Txn(ctx, func(ctx context.Context, txn Txn) error { + for _, kvp := range items { + if err := txn.Delete(ctx, kvp.Key); err != nil { + return errors.WithStack(err) + } + } + _ = txn.Delete(ctx, ListMetaKey(key)) + return nil + })) +} + +// ListMetaKey builds the metadata key for a user key. +func ListMetaKey(userKey []byte) []byte { + return append([]byte(ListMetaPrefix), userKey...) +} + +// ListItemKey builds the item key for a user key and sequence number. +func ListItemKey(userKey []byte, seq int64) []byte { + return []byte(fmt.Sprintf("%s%s|%0*d", ListItemPrefix, userKey, ListSeqWidth, seq)) +} + +func clampRange(start, end, length int) (int, int) { + if start < 0 { + start = length + start + } + if end < 0 { + end = length + end + } + if start < 0 { + start = 0 + } + if end >= length { + end = length - 1 + } + if end < start { + return 0, -1 + } + return start, end +} + +// sentinel seq for scan bounds +const ( + mathMinInt64 = -1 << 63 + mathMaxInt64 = 1<<63 - 1 +) + +// Exported helpers for other packages (e.g., Redis adapter). +func IsListMetaKey(key []byte) bool { return bytes.HasPrefix(key, []byte(ListMetaPrefix)) } + +func IsListItemKey(key []byte) bool { return bytes.HasPrefix(key, []byte(ListItemPrefix)) } + +// ExtractListUserKey returns the logical user key from a list meta or item key. +// If the key is not a list key, it returns nil. +func ExtractListUserKey(key []byte) []byte { + switch { + case IsListMetaKey(key): + return bytes.TrimPrefix(key, []byte(ListMetaPrefix)) + case IsListItemKey(key): + trimmed := bytes.TrimPrefix(key, []byte(ListItemPrefix)) + idx := bytes.LastIndexByte(trimmed, '|') + if idx == -1 { + return trimmed + } + return trimmed[:idx] + default: + return nil + } +} diff --git a/store/list_store_test.go b/store/list_store_test.go new file mode 100644 index 0000000..0be4c01 --- /dev/null +++ b/store/list_store_test.go @@ -0,0 +1,82 @@ +package store + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestListStore_PutGet(t *testing.T) { + t.Parallel() + + ctx := context.Background() + ls := NewListStore(NewRbMemoryStore()) + + in := []string{"a", "b", "c"} + assert.NoError(t, ls.PutList(ctx, []byte("k"), in)) + + out, err := ls.GetList(ctx, []byte("k")) + assert.NoError(t, err) + assert.Equal(t, in, out) +} + +func TestListStore_GetList_NotFound(t *testing.T) { + t.Parallel() + + _, err := NewListStore(NewRbMemoryStore()).GetList(context.Background(), []byte("missing")) + assert.ErrorIs(t, err, ErrKeyNotFound) +} + +func TestListStore_RPushAndRange(t *testing.T) { + t.Parallel() + + ctx := context.Background() + ls := NewListStore(NewRbMemoryStore()) + + n, err := ls.RPush(ctx, []byte("numbers"), "zero", "one", "two", "three", "four") + assert.NoError(t, err) + assert.Equal(t, 5, n) + + // Range with positive indexes. + res, err := ls.Range(ctx, []byte("numbers"), 1, 3) + assert.NoError(t, err) + assert.Equal(t, []string{"one", "two", "three"}, res) + + // Range with negative end index. + res, err = ls.Range(ctx, []byte("numbers"), 2, -1) + assert.NoError(t, err) + assert.Equal(t, []string{"two", "three", "four"}, res) +} + +func TestListStore_Range_NotFound(t *testing.T) { + t.Parallel() + + _, err := NewListStore(NewRbMemoryStore()).Range(context.Background(), []byte("nope"), 0, -1) + assert.ErrorIs(t, err, ErrKeyNotFound) +} + +func TestListStore_RPushConcurrent(t *testing.T) { + t.Parallel() + + ctx := context.Background() + ls := NewListStore(NewRbMemoryStore()) + + wg := &sync.WaitGroup{} + const n = 50 + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _, err := ls.RPush(ctx, []byte("k"), fmt.Sprintf("v-%d", i)) + assert.NoError(t, err) + }(i) + } + wg.Wait() + + list, err := ls.GetList(ctx, []byte("k")) + assert.NoError(t, err) + assert.Len(t, list, n) +} From 74c078f614914ffdb0f8519fd071449a20d74740 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 20:16:48 +0900 Subject: [PATCH 2/7] Handle port conflicts and track first error --- adapter/test_util.go | 58 ++++++++++++++++++++++++++++++++------------ kv/shard_router.go | 9 ++++++- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/adapter/test_util.go b/adapter/test_util.go index fd38dad..396dff0 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "golang.org/x/sys/unix" + "github.com/Jille/raft-grpc-leader-rpc/leaderhealth" transport "github.com/Jille/raft-grpc-transport" "github.com/Jille/raftadmin" @@ -258,17 +260,50 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c var nodes []Node var lc net.ListenConfig - leaderRedis := make(map[raft.ServerAddress]string, n) - for i := 0; i < n; i++ { - leaderRedis[raft.ServerAddress(ports[i].raftAddress)] = ports[i].redisAddress - } - for i := 0; i < n; i++ { st := store.NewRbMemoryStore() trxSt := store.NewMemoryStoreDefaultTTL() fsm := kv.NewKvFSM(st, trxSt) - port := ports[i] + var port portsAdress + var grpcSock, redisSock, dynamoSock net.Listener + + for { + port = ports[i] + var err error + + grpcSock, err = lc.Listen(ctx, "tcp", port.grpcAddress) + if err != nil && errors.Is(err, unix.EADDRINUSE) { + ports[i] = portAssigner() + cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress) + continue + } + require.NoError(t, err) + + redisSock, err = lc.Listen(ctx, "tcp", port.redisAddress) + if err != nil && errors.Is(err, unix.EADDRINUSE) { + _ = grpcSock.Close() + ports[i] = portAssigner() + cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress) + continue + } + require.NoError(t, err) + + dynamoSock, err = lc.Listen(ctx, "tcp", port.dynamoAddress) + if err != nil && errors.Is(err, unix.EADDRINUSE) { + _ = grpcSock.Close() + _ = redisSock.Close() + ports[i] = portAssigner() + cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress) + continue + } + require.NoError(t, err) + break + } + + leaderRedis := map[raft.ServerAddress]string{ + raft.ServerAddress(ports[i].raftAddress): ports[i].redisAddress, + } // リーダーが先に投票を開始させる electionTimeout := leaderElectionTimeout @@ -291,25 +326,18 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c leaderhealth.Setup(r, s, []string{"Example"}) raftadmin.Register(s, r) - grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress) - require.NoError(t, err) - grpcAdders = append(grpcAdders, port.grpcAddress) redisAdders = append(redisAdders, port.redisAddress) go func(srv *grpc.Server, lis net.Listener) { assert.NoError(t, srv.Serve(lis)) }(s, grpcSock) - l, err := lc.Listen(ctx, "tcp", port.redisAddress) - require.NoError(t, err) - rd := NewRedisServer(l, st, coordinator, leaderRedis) + rd := NewRedisServer(redisSock, st, coordinator, leaderRedis) go func(server *RedisServer) { assert.NoError(t, server.Run()) }(rd) - dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress) - assert.NoError(t, err) - ds := NewDynamoDBServer(dl, st, coordinator) + ds := NewDynamoDBServer(dynamoSock, st, coordinator) go func() { assert.NoError(t, ds.Run()) }() diff --git a/kv/shard_router.go b/kv/shard_router.go index 556860d..3068707 100644 --- a/kv/shard_router.go +++ b/kv/shard_router.go @@ -58,6 +58,7 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re return nil, errors.WithStack(err) } + var firstErr error var max uint64 for gid, rs := range grouped { g, ok := s.getGroup(gid) @@ -66,12 +67,18 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re } r, err := fn(g, rs) if err != nil { - return nil, errors.WithStack(err) + if firstErr == nil { + firstErr = errors.WithStack(err) + } + continue } if r.CommitIndex > max { max = r.CommitIndex } } + if firstErr != nil { + return nil, firstErr + } return &TransactionResponse{CommitIndex: max}, nil } From 378c31cc31232de0fc5dbb2e766b39a843a489bf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 20:33:45 +0900 Subject: [PATCH 3/7] Improve error handling in list operations --- adapter/dynamodb_transcoder.go | 8 ++--- adapter/redis.go | 59 ++++++++++++++++++++++++---------- adapter/test_util.go | 3 +- store/list_store.go | 31 ++++++++++++++++-- 4 files changed, 75 insertions(+), 26 deletions(-) diff --git a/adapter/dynamodb_transcoder.go b/adapter/dynamodb_transcoder.go index 8e995b6..68e9f44 100644 --- a/adapter/dynamodb_transcoder.go +++ b/adapter/dynamodb_transcoder.go @@ -87,8 +87,8 @@ func (t *dynamodbTranscoder) valueAttrToOps(key []byte, val attributeValue) (*kv if len(val.L) > 0 { var elems []*kv.Elem[kv.OP] for i, item := range val.L { - if item.S == "" { - return nil, errors.New("only string list items are supported") + if len(item.L) > 0 { + return nil, errors.New("nested lists are not supported") } elems = append(elems, &kv.Elem[kv.OP]{ Op: kv.Put, @@ -110,8 +110,8 @@ func (t *dynamodbTranscoder) valueAttrToOps(key []byte, val attributeValue) (*kv return &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems}, nil } - // Default: simple string - if val.S == "" { + // Default: simple string (allow empty string). Reject only when both S and L are absent. + if val.S == "" && len(val.L) == 0 { return nil, errors.New("unsupported attribute type (only S or L of S)") } return &kv.OperationGroup[kv.OP]{ diff --git a/adapter/redis.go b/adapter/redis.go index 0207b56..6cc04f8 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -183,11 +183,8 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) { return } if isList { - // delete list metadata so this becomes a plain string key - if err := r.deleteList(context.Background(), cmd.Args[1]); err != nil { - conn.WriteError(err.Error()) - return - } + conn.WriteError("WRONGTYPE Operation against a key holding the wrong kind of value") + return } res, err := r.redisTranscoder.SetToRequest(cmd.Args[1], cmd.Args[2]) @@ -629,15 +626,15 @@ func (t *txnContext) applyLRange(cmd redcon.Command) (redisResult, error) { } func parseRangeBounds(startRaw, endRaw []byte, total int) (int, int, error) { - start, err := strconv.Atoi(string(startRaw)) + start, err := parseInt(startRaw) if err != nil { - return 0, 0, errors.WithStack(err) + return 0, 0, err } - end, err := strconv.Atoi(string(endRaw)) + end, err := parseInt(endRaw) if err != nil { - return 0, 0, errors.WithStack(err) + return 0, 0, err } - s, e := clampRange(start, end, total) + s, e := clampRange(int(start), int(end), total) return s, e, nil } @@ -723,6 +720,10 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) { userKey := []byte(k) if st.deleted { + // delete all persisted list items + for seq := st.meta.Head; seq < st.meta.Tail; seq++ { + elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listItemKey(userKey, seq)}) + } elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(userKey)}) continue } @@ -884,7 +885,7 @@ func (r *RedisServer) listRPush(ctx context.Context, key []byte, values [][]byte } func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { - _, exists, err := r.loadListMeta(ctx, key) + meta, exists, err := r.loadListMeta(ctx, key) if err != nil { return err } @@ -892,9 +893,24 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { return nil } - ops := []*kv.Elem[kv.OP]{ - {Op: kv.Del, Key: listMetaKey(key)}, + start := listItemKey(key, math.MinInt64) + end := listItemKey(key, math.MaxInt64) + + kvs, err := r.store.Scan(ctx, start, end, math.MaxInt) + if err != nil { + return errors.WithStack(err) + } + + ops := make([]*kv.Elem[kv.OP], 0, len(kvs)+1) + for _, kvp := range kvs { + ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: kvp.Key}) } + // delete meta last + ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(key)}) + + // ensure meta bounds consistent even if scan missed (in case of empty list) + _ = meta + group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: ops} _, err = r.coordinator.Dispatch(group) return errors.WithStack(err) @@ -966,7 +982,16 @@ func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string cli := redis.NewClient(&redis.Options{Addr: leaderAddr}) defer func() { _ = cli.Close() }() - res, err := cli.LRange(context.Background(), string(key), parseInt(startRaw), parseInt(endRaw)).Result() + start, err := parseInt(startRaw) + if err != nil { + return nil, err + } + end, err := parseInt(endRaw) + if err != nil { + return nil, err + } + + res, err := cli.LRange(context.Background(), string(key), start, end).Result() return res, errors.WithStack(err) } @@ -992,9 +1017,9 @@ func (r *RedisServer) proxyRPush(key []byte, values [][]byte) (int64, error) { return res, errors.WithStack(err) } -func parseInt(b []byte) int64 { - i, _ := strconv.ParseInt(string(b), 10, 64) - return i +func parseInt(b []byte) (int64, error) { + i, err := strconv.ParseInt(string(b), 10, 64) + return i, errors.WithStack(err) } // tryLeaderGet proxies a GET to the current Raft leader, returning the value and diff --git a/adapter/test_util.go b/adapter/test_util.go index 396dff0..a9b6cdf 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -10,8 +10,6 @@ import ( "testing" "time" - "golang.org/x/sys/unix" - "github.com/Jille/raft-grpc-leader-rpc/leaderhealth" transport "github.com/Jille/raft-grpc-transport" "github.com/Jille/raftadmin" @@ -23,6 +21,7 @@ import ( "github.com/hashicorp/raft" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) diff --git a/store/list_store.go b/store/list_store.go index 2525121..e2ad6d2 100644 --- a/store/list_store.go +++ b/store/list_store.go @@ -3,8 +3,8 @@ package store import ( "bytes" "context" + "encoding/hex" "encoding/json" - "fmt" "math" "github.com/cockroachdb/errors" @@ -220,7 +220,32 @@ func ListMetaKey(userKey []byte) []byte { // ListItemKey builds the item key for a user key and sequence number. func ListItemKey(userKey []byte, seq int64) []byte { - return []byte(fmt.Sprintf("%s%s|%0*d", ListItemPrefix, userKey, ListSeqWidth, seq)) + // Offset sign bit (seq ^ minInt64) to preserve order, then big-endian encode and hex. + var raw [8]byte + encodeSortableInt64(raw[:], seq) + hexSeq := make([]byte, hex.EncodedLen(len(raw))) + hex.Encode(hexSeq, raw[:]) + + buf := make([]byte, 0, len(ListItemPrefix)+len(userKey)+1+len(hexSeq)) + buf = append(buf, ListItemPrefix...) + buf = append(buf, userKey...) + buf = append(buf, '|') + buf = append(buf, hexSeq...) + return buf +} + +// encodeSortableInt64 writes seq with sign bit flipped (seq ^ minInt64) in big-endian order. +const sortableInt64Bytes = 8 + +func encodeSortableInt64(dst []byte, seq int64) { + if len(dst) < sortableInt64Bytes { + return + } + sortable := seq ^ math.MinInt64 + for i := sortableInt64Bytes - 1; i >= 0; i-- { + dst[i] = byte(sortable) + sortable >>= 8 + } } func clampRange(start, end, length int) (int, int) { @@ -263,7 +288,7 @@ func ExtractListUserKey(key []byte) []byte { trimmed := bytes.TrimPrefix(key, []byte(ListItemPrefix)) idx := bytes.LastIndexByte(trimmed, '|') if idx == -1 { - return trimmed + return nil } return trimmed[:idx] default: From 52337a3752ed5eb3108748d8a47bcedfe17e0cd5 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 20:53:02 +0900 Subject: [PATCH 4/7] Add defaultTxnLockTTLSeconds constant and use it --- kv/coordinator.go | 6 ++++++ kv/fsm.go | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index 95b9265..0031295 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -121,6 +121,8 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request { panic("unreachable") } +const defaultTxnLockTTLSeconds = uint64(30) + func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request { switch req.Op { case Put: @@ -128,6 +130,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request { { IsTxn: true, Phase: pb.Phase_PREPARE, + Ts: defaultTxnLockTTLSeconds, Mutations: []*pb.Mutation{ { Key: req.Key, @@ -138,6 +141,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request { { IsTxn: true, Phase: pb.Phase_COMMIT, + Ts: defaultTxnLockTTLSeconds, Mutations: []*pb.Mutation{ { Key: req.Key, @@ -152,6 +156,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request { { IsTxn: true, Phase: pb.Phase_PREPARE, + Ts: defaultTxnLockTTLSeconds, Mutations: []*pb.Mutation{ { Key: req.Key, @@ -161,6 +166,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request { { IsTxn: true, Phase: pb.Phase_COMMIT, + Ts: defaultTxnLockTTLSeconds, Mutations: []*pb.Mutation{ { Key: req.Key, diff --git a/kv/fsm.go b/kv/fsm.go index 485e201..26c585e 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -209,7 +209,13 @@ func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { } if !ok { - return errors.WithStack(ErrKeyNotLocked) + // Lock missing (e.g., expired). Try to reacquire to make progress. + err := f.lockStore.TxnWithTTL(ctx, func(ctx context.Context, ttlTxn store.TTLTxn) error { + return f.lock(ttlTxn, mut.Key, r.Ts) + }) + if err != nil { + return errors.WithStack(err) + } } err = f.commit(ctx, txn, mut) From 91a29886dc450f14be869bfda293eb773c99b8e9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 20:57:45 +0900 Subject: [PATCH 5/7] Update Redis functions to use int for range parsing --- adapter/redis.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/adapter/redis.go b/adapter/redis.go index 6cc04f8..3cb594b 100644 --- a/adapter/redis.go +++ b/adapter/redis.go @@ -634,7 +634,7 @@ func parseRangeBounds(startRaw, endRaw []byte, total int) (int, int, error) { if err != nil { return 0, 0, err } - s, e := clampRange(int(start), int(end), total) + s, e := clampRange(start, end, total) return s, e, nil } @@ -991,7 +991,7 @@ func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string return nil, err } - res, err := cli.LRange(context.Background(), string(key), start, end).Result() + res, err := cli.LRange(context.Background(), string(key), int64(start), int64(end)).Result() return res, errors.WithStack(err) } @@ -1017,8 +1017,8 @@ func (r *RedisServer) proxyRPush(key []byte, values [][]byte) (int64, error) { return res, errors.WithStack(err) } -func parseInt(b []byte) (int64, error) { - i, err := strconv.ParseInt(string(b), 10, 64) +func parseInt(b []byte) (int, error) { + i, err := strconv.Atoi(string(b)) return i, errors.WithStack(err) } From 55ff1069d0455cfb0f820a6dd29e05161f4a4d7a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 21:11:20 +0900 Subject: [PATCH 6/7] Handle missing lock as conflict and abort transaction --- kv/fsm.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/kv/fsm.go b/kv/fsm.go index 26c585e..63d34ff 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -209,13 +209,8 @@ func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { } if !ok { - // Lock missing (e.g., expired). Try to reacquire to make progress. - err := f.lockStore.TxnWithTTL(ctx, func(ctx context.Context, ttlTxn store.TTLTxn) error { - return f.lock(ttlTxn, mut.Key, r.Ts) - }) - if err != nil { - return errors.WithStack(err) - } + // Lock already gone: treat as conflict and abort. + return errors.WithStack(ErrKeyNotLocked) } err = f.commit(ctx, txn, mut) From d6c4fd7fc81c8620ac40e07cc9ee1b5dcaf4a958 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 27 Dec 2025 21:22:54 +0900 Subject: [PATCH 7/7] Refactor node setup with listener binding --- adapter/test_util.go | 107 ++++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/adapter/test_util.go b/adapter/test_util.go index a9b6cdf..5d023a4 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -136,8 +136,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { ctx := context.Background() ports := assignPorts(n) - cfg := buildRaftConfig(n, ports) - nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg) + nodes, grpcAdders, redisAdders, cfg := setupNodes(t, ctx, n, ports) waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval) waitForConfigReplication(t, cfg, nodes, waitTimeout, waitInterval) @@ -146,6 +145,47 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { return nodes, grpcAdders, redisAdders } +type listeners struct { + grpc net.Listener + redis net.Listener + dynamo net.Listener +} + +func bindListeners(ctx context.Context, lc *net.ListenConfig, port portsAdress) (portsAdress, listeners, bool, error) { + grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress) + if err != nil { + if errors.Is(err, unix.EADDRINUSE) { + return port, listeners{}, true, nil + } + return port, listeners{}, false, errors.WithStack(err) + } + + redisSock, err := lc.Listen(ctx, "tcp", port.redisAddress) + if err != nil { + _ = grpcSock.Close() + if errors.Is(err, unix.EADDRINUSE) { + return port, listeners{}, true, nil + } + return port, listeners{}, false, errors.WithStack(err) + } + + dynamoSock, err := lc.Listen(ctx, "tcp", port.dynamoAddress) + if err != nil { + _ = grpcSock.Close() + _ = redisSock.Close() + if errors.Is(err, unix.EADDRINUSE) { + return port, listeners{}, true, nil + } + return port, listeners{}, false, errors.WithStack(err) + } + + return port, listeners{ + grpc: grpcSock, + redis: redisSock, + dynamo: dynamoSock, + }, false, nil +} + func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitTimeout, waitInterval time.Duration) { t.Helper() d := &net.Dialer{Timeout: time.Second} @@ -252,53 +292,44 @@ func buildRaftConfig(n int, ports []portsAdress) raft.Configuration { const leaderElectionTimeout = 0 * time.Second -func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) { +func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) ([]Node, []string, []string, raft.Configuration) { t.Helper() var grpcAdders []string var redisAdders []string var nodes []Node - var lc net.ListenConfig - + lc := net.ListenConfig{} + lis := make([]listeners, n) for i := 0; i < n; i++ { - st := store.NewRbMemoryStore() - trxSt := store.NewMemoryStoreDefaultTTL() - fsm := kv.NewKvFSM(st, trxSt) - - var port portsAdress - var grpcSock, redisSock, dynamoSock net.Listener - + var ( + bound portsAdress + l listeners + retry bool + err error + ) for { - port = ports[i] - var err error - - grpcSock, err = lc.Listen(ctx, "tcp", port.grpcAddress) - if err != nil && errors.Is(err, unix.EADDRINUSE) { - ports[i] = portAssigner() - cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress) - continue - } + bound, l, retry, err = bindListeners(ctx, &lc, ports[i]) require.NoError(t, err) - - redisSock, err = lc.Listen(ctx, "tcp", port.redisAddress) - if err != nil && errors.Is(err, unix.EADDRINUSE) { - _ = grpcSock.Close() - ports[i] = portAssigner() - cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress) - continue - } - require.NoError(t, err) - - dynamoSock, err = lc.Listen(ctx, "tcp", port.dynamoAddress) - if err != nil && errors.Is(err, unix.EADDRINUSE) { - _ = grpcSock.Close() - _ = redisSock.Close() + if retry { ports[i] = portAssigner() - cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress) continue } - require.NoError(t, err) + ports[i] = bound + lis[i] = l break } + } + + cfg := buildRaftConfig(n, ports) + + for i := 0; i < n; i++ { + st := store.NewRbMemoryStore() + trxSt := store.NewMemoryStoreDefaultTTL() + fsm := kv.NewKvFSM(st, trxSt) + + port := ports[i] + grpcSock := lis[i].grpc + redisSock := lis[i].redis + dynamoSock := lis[i].dynamo leaderRedis := map[raft.ServerAddress]string{ raft.ServerAddress(ports[i].raftAddress): ports[i].redisAddress, @@ -354,7 +385,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c )) } - return nodes, grpcAdders, redisAdders + return nodes, grpcAdders, redisAdders, cfg } func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration, electionTimeout time.Duration) (*raft.Raft, *transport.Manager, error) {