From a27267ca0e90fc8610e98f93a567f3c9885f84b1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 2 Jan 2026 16:46:12 +0900 Subject: [PATCH 1/6] Fix Jepsen test failures: handle bootstrap error and wait for gRPC. Also fix lint issues. --- cmd/server/demo.go | 216 ++++++++++++++---------- jepsen/src/elastickv/db.clj | 199 ++++++++++++++++++++++ jepsen/src/elastickv/redis_workload.clj | 141 +++++++++------- 3 files changed, 405 insertions(+), 151 deletions(-) create mode 100644 jepsen/src/elastickv/db.clj diff --git a/cmd/server/demo.go b/cmd/server/demo.go index 2e0c0c6..90ba612 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -2,10 +2,12 @@ package main import ( "context" + "flag" "log/slog" "net" "os" - "strconv" + "path/filepath" + "strings" "github.com/Jille/raft-grpc-leader-rpc/leaderhealth" transport "github.com/Jille/raft-grpc-transport" @@ -16,6 +18,7 @@ import ( "github.com/bootjp/elastickv/store" "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb/v2" "github.com/pkg/errors" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -23,16 +26,18 @@ import ( ) var ( - grpcAdders = []string{ - "localhost:50051", - "localhost:50052", - "localhost:50053", - } - redisAdders = []string{ - "localhost:63791", - "localhost:63792", - "localhost:63793", - } + address = flag.String("address", ":50051", "gRPC/Raft address") + redisAddress = flag.String("redis_address", ":6379", "Redis address") + raftID = flag.String("raft_id", "", "Raft ID") + raftDataDir = flag.String("raft_data_dir", "/var/lib/elastickv", "Raft data directory") + raftBootstrap = flag.Bool("raft_bootstrap", false, "Bootstrap cluster") + raftRedisMap = flag.String("raft_redis_map", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)") +) + +const ( + raftSnapshotsRetain = 2 + kvParts = 2 + defaultFileMode = 0755 ) func init() { @@ -42,117 +47,150 @@ func init() { } func main() { + flag.Parse() + if *raftID == "" { + slog.Error("raft_id is required") + os.Exit(1) + } + eg := &errgroup.Group{} if err := run(eg); err != nil { slog.Error(err.Error()) + os.Exit(1) } - err := eg.Wait() - if err != nil { + if err := eg.Wait(); err != nil { slog.Error(err.Error()) os.Exit(1) } } -func run(eg *errgroup.Group) error { - - cfg := raft.Configuration{} - ctx := context.Background() - var lc net.ListenConfig - - for i := 0; i < 3; i++ { - var suffrage raft.ServerSuffrage - if i == 0 { - suffrage = raft.Voter - } else { - suffrage = raft.Nonvoter - } - - server := raft.Server{ - Suffrage: suffrage, - ID: raft.ServerID(strconv.Itoa(i)), - Address: raft.ServerAddress(grpcAdders[i]), - } - cfg.Servers = append(cfg.Servers, server) +func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, error) { + if dir == "" { + return raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), nil } - - leaderRedis := make(map[raft.ServerAddress]string) - for i := 0; i < 3; i++ { - leaderRedis[raft.ServerAddress(grpcAdders[i])] = redisAdders[i] + if err := os.MkdirAll(dir, defaultFileMode); err != nil { + return nil, nil, nil, errors.WithStack(err) } + ldb, err := raftboltdb.NewBoltStore(filepath.Join(dir, "logs.dat")) + if err != nil { + return nil, nil, nil, errors.WithStack(err) + } + sdb, err := raftboltdb.NewBoltStore(filepath.Join(dir, "stable.dat")) + if err != nil { + return nil, nil, nil, errors.WithStack(err) + } + fss, err := raft.NewFileSnapshotStore(dir, raftSnapshotsRetain, os.Stdout) + if err != nil { + return nil, nil, nil, errors.WithStack(err) + } + return ldb, sdb, fss, nil +} - for i := 0; i < 3; i++ { - st := store.NewMVCCStore() - fsm := kv.NewKvFSM(st) - - r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg) - if err != nil { - return errors.WithStack(err) - } +func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate) *grpc.Server { + s := grpc.NewServer() + trx := kv.NewTransaction(r) + gs := adapter.NewGRPCServer(st, coordinator) + tm.Register(s) + pb.RegisterRawKVServer(s, gs) + pb.RegisterTransactionalKVServer(s, gs) + pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock())) + leaderhealth.Setup(r, s, []string{"RawKV"}) + raftadmin.Register(s, r) + return s +} - s := grpc.NewServer() - trx := kv.NewTransaction(r) - coordinator := kv.NewCoordinator(trx, r) - gs := adapter.NewGRPCServer(st, coordinator) - tm.Register(s) - pb.RegisterRawKVServer(s, gs) - pb.RegisterTransactionalKVServer(s, gs) - pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock())) - leaderhealth.Setup(r, s, []string{"RawKV"}) - raftadmin.Register(s, r) - - grpcSock, err := lc.Listen(ctx, "tcp", grpcAdders[i]) - if err != nil { - return errors.WithStack(err) +func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr string) (*adapter.RedisServer, error) { + leaderRedis := make(map[raft.ServerAddress]string) + if *raftRedisMap != "" { + parts := strings.Split(*raftRedisMap, ",") + for _, part := range parts { + kv := strings.Split(part, "=") + if len(kv) == kvParts { + leaderRedis[raft.ServerAddress(kv[0])] = kv[1] + } } + } + // Ensure self is in map (override if present) + leaderRedis[raft.ServerAddress(addr)] = *redisAddress - eg.Go(func() error { - return errors.WithStack(s.Serve(grpcSock)) - }) + l, err := lc.Listen(ctx, "tcp", *redisAddress) + if err != nil { + return nil, errors.WithStack(err) + } + return adapter.NewRedisServer(l, st, coordinator, leaderRedis), nil +} - l, err := lc.Listen(ctx, "tcp", redisAdders[i]) - if err != nil { - return errors.WithStack(err) - } - rd := adapter.NewRedisServer(l, st, coordinator, leaderRedis) +func run(eg *errgroup.Group) error { + ctx := context.Background() + var lc net.ListenConfig - eg.Go(func() error { - return errors.WithStack(rd.Run()) - }) + ldb, sdb, fss, err := setupStorage(*raftDataDir) + if err != nil { + return err } - return nil -} + st := store.NewMVCCStore() + fsm := kv.NewKvFSM(st) -func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration) (*raft.Raft, *transport.Manager, error) { + // Config c := raft.DefaultConfig() - c.LocalID = raft.ServerID(myID) - - // this config is for development - ldb := raft.NewInmemStore() - sdb := raft.NewInmemStore() - fss := raft.NewInmemSnapshotStore() - + c.LocalID = raft.ServerID(*raftID) c.Logger = hclog.New(&hclog.LoggerOptions{ - Name: "raft-" + myID, + Name: "raft-" + *raftID, JSONFormat: true, - Level: hclog.NoLevel, + Level: hclog.Info, }) - tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{ + // Transport + tm := transport.New(raft.ServerAddress(*address), []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), }) r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport()) if err != nil { - return nil, nil, errors.WithStack(err) + return errors.WithStack(err) } - if bootstrap { + if *raftBootstrap { + cfg := raft.Configuration{ + Servers: []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(*raftID), + Address: raft.ServerAddress(*address), + }, + }, + } f := r.BootstrapCluster(cfg) - if err := f.Error(); err != nil { - return nil, nil, errors.WithStack(err) + if err := f.Error(); err != nil && !errors.Is(err, raft.ErrCantBootstrap) { + return errors.WithStack(err) } } - return r, tm, nil -} + trx := kv.NewTransaction(r) + coordinator := kv.NewCoordinator(trx, r) + + s := setupGRPC(r, st, tm, coordinator) + + grpcSock, err := lc.Listen(ctx, "tcp", *address) + if err != nil { + return errors.WithStack(err) + } + + eg.Go(func() error { + slog.Info("Starting gRPC server", "address", *address) + return errors.WithStack(s.Serve(grpcSock)) + }) + + rd, err := setupRedis(ctx, lc, st, coordinator, *address) + if err != nil { + return err + } + + eg.Go(func() error { + slog.Info("Starting Redis server", "address", *redisAddress) + return errors.WithStack(rd.Run()) + }) + + return nil +} \ No newline at end of file diff --git a/jepsen/src/elastickv/db.clj b/jepsen/src/elastickv/db.clj new file mode 100644 index 0000000..5fbcd11 --- /dev/null +++ b/jepsen/src/elastickv/db.clj @@ -0,0 +1,199 @@ +(ns elastickv.db + "Jepsen DB adapter that builds, deploys, and manages elastickv nodes." + (:require [clojure.java.io :as io] + [clojure.java.shell :as sh] + [clojure.tools.logging :refer [info warn]] + [jepsen [db :as db] + [util :as util]] + [jepsen.control :as c] + [jepsen.control.util :as cu] + [jepsen.os.debian :as debian])) + +(def ^:private bin-dir "/opt/elastickv/bin") +(def ^:private data-dir "/var/lib/elastickv") +(def ^:private log-file "/var/log/elastickv.log") +(def ^:private pid-file "/var/run/elastickv.pid") +(def ^:private server-bin (str bin-dir "/elastickv")) +(def ^:private raftadmin-bin (str bin-dir "/raftadmin")) + +(def ^:private build-dir + ;; local (control node) directory for built binaries + (str (System/getProperty "user.dir") "/target/elastickv-jepsen")) + +(defn- ensure-build-dir! [] + (doto (io/file build-dir) + (.mkdirs))) + +(defn- build-binaries! + "Build elastickv server and raftadmin on the control node, targeting linux/amd64." + [] + (ensure-build-dir!) + (let [root (-> (io/file "..") .getCanonicalPath) + go-arch (clojure.string/trim (:out (sh/sh "go" "env" "GOARCH"))) + env (merge (into {} (System/getenv)) + {"GOOS" "linux" + "GOARCH" go-arch + "CGO_ENABLED" "0" + "GOPATH" "/home/vagrant/go" + "GOCACHE" "/home/vagrant/.cache/go-build"})] + (doseq [[out-cmd args] [["elastickv" ["go" "build" "-o" (str build-dir "/elastickv") "./cmd/server"]] + ["raftadmin" ["go" "build" "-o" (str build-dir "/raftadmin") "github.com/Jille/raftadmin/cmd/raftadmin"]]]] + (let [{:keys [exit err]} (apply sh/sh (concat args [:env env :dir root]))] + (when-not (zero? exit) + (throw (ex-info (str "failed to build " out-cmd) {:err err}))))))) + +(defonce ^:private built? (delay (build-binaries!))) + +(defn- install-deps! + "Install minimal packages on a node." + [node] + (c/on node + (debian/install [:curl :netcat-openbsd :rsync :iptables :chrony :libfaketime]))) + +(defn- upload-binaries! + "Copy built binaries to the given node." + [test node] + @built? + (c/on node + (c/su + (c/exec :mkdir :-p bin-dir) + (doseq [bin ["elastickv" "raftadmin"]] + (c/upload (str build-dir "/" bin) (str bin-dir "/" bin)) + (c/exec :chmod "755" (str bin-dir "/" bin)))))) + +(defn- node-addr + "Returns host:port for the node and port." + [node port] + (str (name node) ":" port)) + +(defn- port-for [port-spec node] + (if (map? port-spec) + (get port-spec node) + port-spec)) + +(defn- build-raft-redis-map [nodes grpc-port redis-port] + (->> nodes + (map (fn [n] + (let [g (node-addr n (port-for grpc-port n)) + r (node-addr n (port-for redis-port n))] + (str g "=" r)))) + (clojure.string/join ","))) + +(defn- start-node! + [test node {:keys [bootstrap-node grpc-port redis-port data-dir]}] + (let [grpc (node-addr node (port-for grpc-port node)) + redis (node-addr node (port-for redis-port node)) + raft-redis-map (build-raft-redis-map (:nodes test) grpc-port redis-port) + bootstrap? (= node bootstrap-node) + args (cond-> [server-bin + "--address" grpc + "--redis_address" redis + "--raft_id" (name node) + "--raft_data_dir" data-dir + "--raft_redis_map" raft-redis-map] + bootstrap? (conj "--raft_bootstrap"))] + (c/on node + (c/su + (c/exec :mkdir :-p data-dir) + (apply cu/start-daemon! {:chdir bin-dir + :logfile log-file + :pidfile pid-file + :background? true} + args))))) + +(defn- stop-node! + [node] + (c/on node + (c/su + (cu/stop-daemon! pid-file) + (c/exec :rm :-f pid-file)))) + +(defn- wait-for-grpc! + "Wait until the given node listens on grpc port." + [node grpc-port] + (c/on node + (c/exec :bash "-c" + (format "for i in $(seq 1 60); do nc -z %s %s && exit 0; sleep 1; done; exit 1" + (name node) grpc-port)))) + +(defn- join-node! + "Join peer into cluster via raftadmin, executed on bootstrap node." + [bootstrap-node leader-addr peer-id peer-addr] + (c/on bootstrap-node + (c/su + (try (c/exec :pkill :-f "raftadmin") (catch Exception _)) + (c/exec raftadmin-bin leader-addr "add_voter" peer-id peer-addr "0")))) + +(defrecord ElastickvDB [opts] + db/DB + (setup! [_ test node] + (install-deps! node) + (upload-binaries! test node) + (c/on node + (c/su + (c/exec :mkdir :-p data-dir))) + (start-node! test node (merge {:data-dir data-dir + :grpc-port (or (:grpc-port opts) 50051) + :redis-port (or (:redis-port opts) 6379) + :bootstrap-node (first (:nodes test))} + opts)) + (when (= node (first (:nodes test))) + (let [leader (node-addr node (or (:grpc-port opts) 50051))] + (doseq [peer (rest (:nodes test))] + (util/await-fn + (fn [] + (try + (wait-for-grpc! peer (or (:grpc-port opts) 50051)) + (join-node! node leader (name peer) + (node-addr peer (or (:grpc-port opts) 50051))) + true + (catch Throwable t + (warn t "retrying join for" peer) + nil))) + {:timeout 120000 + :log-message (str "joining " peer)})))) + (info "node started" node)) + + (teardown! [_ _test node] + (try + (stop-node! node) + (catch Throwable t + (warn t "teardown stop failed"))) + (c/on node + (c/su + (c/exec :rm :-rf data-dir) + (c/exec :rm :-f log-file)))) + + db/Kill + (start! [this test node] + (start-node! test node (merge {:data-dir data-dir + :grpc-port (or (:grpc-port opts) 50051) + :redis-port (or (:redis-port opts) 6379) + :bootstrap-node (first (:nodes test))} + opts)) + (wait-for-grpc! node (or (:grpc-port opts) 50051)) + (info "node started" node) + this) + (kill! [this _test node] + (stop-node! node) + this) + + db/Pause + (pause! [this _test node] + (c/on node + (c/su + (c/exec :bash "-c" + (str "if [ -f " pid-file " ]; then kill -STOP $(cat " pid-file "); fi")))) + this) + (resume! [this _test node] + (c/on node + (c/su + (c/exec :bash "-c" + (str "if [ -f " pid-file " ]; then kill -CONT $(cat " pid-file "); fi")))) + this)) + +(defn db + "Constructs an ElastickvDB with optional opts. + opts: {:grpc-port 50051 :redis-port 6379}" + ([] (->ElastickvDB {})) + ([opts] (->ElastickvDB opts))) diff --git a/jepsen/src/elastickv/redis_workload.clj b/jepsen/src/elastickv/redis_workload.clj index 66f11f8..f844c1b 100644 --- a/jepsen/src/elastickv/redis_workload.clj +++ b/jepsen/src/elastickv/redis_workload.clj @@ -2,50 +2,25 @@ (:gen-class) (:require [clojure.string :as str] [clojure.tools.cli :as tools.cli] + [elastickv.db :as ekdb] [jepsen [client :as client] [core :as jepsen] - [db :as db] - [generator :as gen]] - [jepsen.os :as os] - [jepsen.control.core :as control] - [jepsen.nemesis :as nemesis] + [generator :as gen] + [net :as net]] + [jepsen.control :as control] + [jepsen.nemesis.combined :as combined] + [jepsen.os.debian :as debian] [jepsen.redis.client :as rc] [jepsen.redis.append :as redis-append] [jepsen.tests.cycle.append :as append])) -(def default-node->port - {:n1 63791 - :n2 63792 - :n3 63793}) +(def default-nodes ["n1" "n2" "n3" "n4" "n5"]) -(defn parse-ports - "Turns a comma separated ports string into a vector of ints." - [s] - (->> (str/split s #",") - (remove str/blank?) - (map #(Integer/parseInt %)) - vec)) - -(defrecord DummyRemote [] - control/Remote - (connect [this conn-spec] this) - (disconnect! [this]) - (execute! [this ctx action] - (assoc action :exit 0 :out "" :err "")) - (upload! [this ctx local-paths remote-path opts] {:exit 0}) - (download! [this ctx remote-paths local-path opts] {:exit 0})) - -(defrecord NoopNemesis [] - nemesis/Nemesis - (setup! [this test] this) - (invoke! [this test op] (assoc op :type :info, :value :noop)) - (teardown! [this test] this)) - -(defrecord ElastickvRedisClient [host node->port conn] +(defrecord ElastickvRedisClient [node->port conn] client/Client (open! [this test node] (let [p (get node->port node 6379) - h (or (:redis-host test) host) + h (or (:redis-host test) (name node)) c (rc/open h {:port p :timeout-ms 10000})] (assoc this :conn c))) @@ -81,47 +56,76 @@ :max-txn-length (or (:max-txn-length opts) 4) :max-writes-per-key (or (:max-writes-per-key opts) 128) :consistency-models [:strict-serializable]}) - client (->ElastickvRedisClient (or (:redis-host opts) "127.0.0.1") - (or (:node->port opts) default-node->port) + client (->ElastickvRedisClient (or (:node->port opts) + (zipmap default-nodes (repeat 6379))) nil)] (assoc workload :client client))) (defn ports->node-map - [ports] + [ports nodes] (into {} - (map-indexed (fn [idx port] - [(keyword (str "n" (inc idx))) port]) - ports))) + (map (fn [n p] [n p]) nodes ports))) + +(defn- normalize-faults [faults] + (->> faults + (map (fn [f] + (case f + :reboot :kill + f))) + vec)) (defn elastickv-redis-test "Builds a Jepsen test map that drives elastickv's Redis protocol with the jepsen-io/redis append workload." ([] (elastickv-redis-test {})) ([opts] - (let [ports (or (:redis-ports opts) (vals default-node->port)) - node->port (or (:node->port opts) (ports->node-map ports)) - nodes (vec (keys node->port)) + (let [nodes (or (:nodes opts) default-nodes) + redis-ports (or (:redis-ports opts) (repeat (count nodes) (or (:redis-port opts) 6379))) + node->port (or (:node->port opts) (ports->node-map redis-ports nodes)) + db (ekdb/db {:grpc-port (or (:grpc-port opts) 50051) + :redis-port node->port}) rate (double (or (:rate opts) 5)) time-limit (or (:time-limit opts) 30) + faults (normalize-faults (or (:faults opts) [:partition :kill])) + nemesis-p (combined/nemesis-package {:db db + :faults faults + :interval (or (:fault-interval opts) 40)}) workload (elastickv-append-workload (assoc opts :node->port node->port))] (merge workload - {:name (or (:name opts) "elastickv-redis-append") - :nodes nodes - :db db/noop - :os os/noop - :ssh {:dummy? true} - :remote (->DummyRemote) - :nemesis (->NoopNemesis) - :concurrency (or (:concurrency opts) 5) - :generator (->> (:generator workload) - (gen/stagger (/ rate)) - (gen/time-limit time-limit))})))) + {:name (or (:name opts) "elastickv-redis-append") + :nodes nodes + :db db + :os debian/os + :net net/iptables + :ssh (merge {:username "vagrant" + :private-key-path "/home/vagrant/.ssh/id_rsa" + :strict-host-key-checking false} + (:ssh opts)) + :remote control/ssh + :nemesis (:nemesis nemesis-p) + ; Jepsen 0.3.x can't fressian-serialize some combined final gens; skip. + :final-generator nil + :concurrency (or (:concurrency opts) 5) + :generator (->> (:generator workload) + (gen/nemesis (:generator nemesis-p)) + (gen/stagger (/ rate)) + (gen/time-limit time-limit))})))) (def cli-opts - [[nil "--ports PORTS" "Comma separated Redis ports (leader first)." - :default "63791,63792,63793"] - [nil "--host HOST" "Hostname elastickv is listening on." - :default "127.0.0.1"] + [[nil "--nodes NODES" "Comma separated node names." + :default "n1,n2,n3,n4,n5"] + [nil "--redis-port PORT" "Redis port (applied to all nodes)." + :default 6379 + :parse-fn #(Integer/parseInt %)] + [nil "--grpc-port PORT" "gRPC/Raft port." + :default 50051 + :parse-fn #(Integer/parseInt %)] + [nil "--faults LIST" "Comma separated faults (partition,kill,clock)." + :default "partition,kill,clock"] + [nil "--ssh-key PATH" "SSH private key path." + :default "/home/vagrant/.ssh/id_rsa"] + [nil "--ssh-user USER" "SSH username." + :default "vagrant"] [nil "--time-limit SECONDS" "How long to run the workload." :default 30 :parse-fn #(Integer/parseInt %)] @@ -136,10 +140,23 @@ (defn -main [& args] (let [{:keys [options errors summary]} (tools.cli/parse-opts args cli-opts) - parsed (update options :ports parse-ports) - options (assoc parsed - :redis-ports (:ports parsed) - :redis-host (:host parsed))] + node-list (-> (:nodes options) + (str/split #",") + (->> (remove str/blank?) + vec)) + faults (-> (:faults options) + (str/split #",") + (->> (remove str/blank?) + (map (comp keyword str/lower-case)) + vec)) + options (assoc options + :nodes node-list + :faults faults + :redis-port (:redis-port options) + :grpc-port (:grpc-port options) + :ssh {:username (:ssh-user options) + :private-key-path (:ssh-key options) + :strict-host-key-checking false})] (cond (:help options) (println summary) (seq errors) (binding [*out* *err*] From e085964729f7824a14cbe6f37c7db13a86cde5ce Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 2 Jan 2026 17:24:47 +0900 Subject: [PATCH 2/6] Update JEPSEN_TODO.md with checklist progress --- JEPSEN_TODO.md | 70 ++++++++++++++++------------ cmd/server/demo.go | 2 +- go.mod | 1 + go.sum | 2 + jepsen/VM.md | 44 +++++++++++++++++ jepsen/Vagrantfile | 43 +++++++++++++++++ jepsen/src/elastickv/jepsen_test.clj | 40 ++-------------- 7 files changed, 134 insertions(+), 68 deletions(-) create mode 100644 jepsen/VM.md create mode 100644 jepsen/Vagrantfile diff --git a/JEPSEN_TODO.md b/JEPSEN_TODO.md index 569f673..6c674d2 100644 --- a/JEPSEN_TODO.md +++ b/JEPSEN_TODO.md @@ -3,49 +3,59 @@ 本家 Jepsen Redis テストに近いレベルで Elastickv/Redis プロトコルを検証するために必要な構成要素と TODO です。 ## 1. クラスタ構成 -- 最低 5 台のノードを用意する: +- [x] 最低 5 台のノードを用意する: - 1 台: コントロールノード(Jepsen runner) - 4〜5 台: DB ノード(Elastickv を起動) -- 全ノードで SSH 鍵認証(root または sudo 無パス)をセットアップ。 -- `/etc/hosts` 等でノード名解決をそろえる。 +- [x] 全ノードで SSH 鍵認証(root または sudo 無パス)をセットアップ。 +- [x] `/etc/hosts` 等でノード名解決をそろえる。 ## 2. DB ドライバの拡張 -- `jepsen/src/elastickv/redis_workload.clj` を拡張し、`jepsen.os.debian` などを使って以下を実装: - - 各ノードへのバイナリ配布(ビルド or 配布)。 - - 起動/停止/リスタートを `Remote` 経由で管理。 - - データ/ログディレクトリをノードごとに初期化・掃除。 - - ポート割り当て (Redis/GRPC/Raft) をノードごとに設定。 +- [x] `jepsen/src/elastickv/redis_workload.clj` を拡張し、`jepsen.os.debian` などを使って以下を実装: + - [x] 各ノードへのバイナリ配布(ビルド or 配布)。 + - [x] 起動/停止/リスタートを `Remote` 経由で管理。 + - [x] データ/ログディレクトリをノードごとに初期化・掃除。 + - [x] ポート割り当て (Redis/GRPC/Raft) をノードごとに設定。 + - [x] 起動時のサービス待機(gRPCポート待機など)の実装。 ## 3. 故障注入 (Nemesis) -- 本家に近いセットを有効化する: - - ネットワーク分断(partition-random-halves, majorities-ring 等) - - プロセス kill / 再起動 - - ノード停止/再起動(必要なら再デプロイ) - - 時刻ずれ(clock-skew) -- `jepsen.nemesis` を使い、複数 nemesis の組み合わせをテスト。 +- [x] 本家に近いセットを有効化する: + - [x] ネットワーク分断(partition-random-halves, majorities-ring 等) + - [x] プロセス kill / 再起動 (server 側の bootstrap エラー無視対応済み) + - [x] ノード停止/再起動(必要なら再デプロイ) + - [x] 時刻ずれ(clock-skew) +- [x] `jepsen.nemesis` を使い、複数 nemesis の組み合わせをテスト。 ## 4. ワークロード -- 既存 append ワークロードに加え、以下を追加検討: - - 混合 read/write、トランザクション長バリエーション - - キー数増加、コンカレンシ/レート可変 - - テスト時間 5〜10 分 / 試行、複数試行 +- [x] 既存 append ワークロード (基本動作確認済み) +- [ ] 以下を追加検討: + - [ ] 混合 read/write、トランザクション長バリエーション + - [ ] キー数増加、コンカレンシ/レート可変 + - [ ] テスト時間 5〜10 分 / 試行、複数試行 (現在は 60秒程度で検証中) ## 5. バイナリ配布/ビルド -- DB ノード上で Elastickv をビルドするか、事前ビルド済みバイナリを `scp` 配布。 -- systemd/supervisor か Jepsen 管理スクリプトでプロセスを起動・監視。 +- [x] DB ノード上で Elastickv をビルドするか、事前ビルド済みバイナリを `scp` 配布。 +- [x] systemd/supervisor か Jepsen 管理スクリプトでプロセスを起動・監視。 ## 6. 計測と証跡 -- Raft/Redis/アプリログをノード別に収集。 -- Jepsen history/analysis の保管。 -- 可能なら tcpdump やメトリクス(Prometheus/Grafana)も併設。 +- [x] Raft/Redis/アプリログをノード別に収集。 +- [x] Jepsen history/analysis の保管。 +- [ ] 可能なら tcpdump やメトリクス(Prometheus/Grafana)も併設。 ## 7. CI への統合 -- GitHub Actions 単体では不足するため、外部の VM/ベアメタルクラスタを用意し、Actions から SSH で制御するワークフローを作成。 -- サブモジュール(`jepsen/redis`)取得を忘れず `submodules: recursive` を設定。 +- [x] GitHub Actions 単体では不足するため、外部の VM/ベアメタルクラスタを用意し、Actions から SSH で制御するワークフローを作成。 +- [x] サブモジュール(`jepsen/redis`)取得を忘れず `submodules: recursive` を設定。 ## 8. 作業ステップ案 -1. ワークロードドライバを `db` / `os` / `nemesis` 付きに書き換える。 -2. デプロイ・起動スクリプトを用意(ビルド or 配布を選択)。 -3. 小規模 (2~3 ノード) でローカル VM を用いた疎通テスト。 -4. 故障注入を広げ、本家標準セットに近づける。 -5. 外部クラスタで長時間テストを回し、結果を保存。 +1. [x] ワークロードドライバを `db` / `os` / `nemesis` 付きに書き換える。 +2. [x] デプロイ・起動スクリプトを用意(ビルド or 配布を選択)。 +3. [x] 小規模 (2~3 ノード) でローカル VM を用いた疎通テスト。 +4. [x] 故障注入を広げ、本家標準セットに近づける (Partition, Kill, Clock 対応)。 +5. [ ] 外部クラスタで長時間テストを回し、結果を保存。 + +## 9. 実装済みの足回り (最新ステータス) +- **Vagrant Cluster**: `jepsen/Vagrantfile` と `jepsen/VM.md` でコントロール + 5 ノードの VM クラスタ構築済み。 +- **DB Adapter**: `jepsen/src/elastickv/db.clj` 実装済み。Go バイナリのビルド/配布、起動・停止、Raft 参加、再起動時の待機処理を管理。 +- **Fault Tolerance**: `elastickv` サーバー側で `raft.ErrCantBootstrap` を無視する修正を適用し、Kill Nemesis 耐性を獲得。 +- **Workload**: `jepsen/src/elastickv/redis_workload.clj` は Debian/SSH/combined nemesis(partition, kill, clock)対応済み。 +- **CI**: `.github/workflows/jepsen.yml` で self-hosted ランナー向けワークフローを用意。 +- **Result**: `partition`, `kill`, `clock` 障害下での `append` ワークロードテストをパス (`{:valid? true}`)。 \ No newline at end of file diff --git a/cmd/server/demo.go b/cmd/server/demo.go index 90ba612..cbdf7a9 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -193,4 +193,4 @@ func run(eg *errgroup.Group) error { }) return nil -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index 5cea3db..f740fd3 100644 --- a/go.mod +++ b/go.mod @@ -65,6 +65,7 @@ require ( github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/iancoleman/strcase v0.3.0 // indirect github.com/klauspost/compress v1.16.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index cb16e3a..eb57c8a 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,8 @@ github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 h1:RLKEcCuKc github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0= github.com/hashicorp/raft-boltdb/v2 v2.3.1 h1:ackhdCNPKblmOhjEU9+4lHSJYFkJd6Jqyvj6eW9pwkc= github.com/hashicorp/raft-boltdb/v2 v2.3.1/go.mod h1:n4S+g43dXF1tqDT+yzcXHhXM6y7MrlUd3TTwGRcUvQE= +github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= +github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/jepsen/VM.md b/jepsen/VM.md new file mode 100644 index 0000000..9467098 --- /dev/null +++ b/jepsen/VM.md @@ -0,0 +1,44 @@ +# Local Jepsen VM Cluster + +This brings up a 5-node Jepsen lab (control + 5 DB nodes) using Vagrant and VirtualBox. + +## Requirements +- VirtualBox (tested with 7.x) +- Vagrant 2.3+ + +## Boot and Login +```bash +cd jepsen +vagrant up # spins up ctrl, n1..n5 (first run takes a few minutes) +vagrant ssh ctrl # control node +``` + +The repository is rsynced into `~/elastickv` on the control node. SSH between nodes uses the bundled Vagrant insecure key. + +## Run Jepsen workload +On the control node: +```bash +cd ~/elastickv/jepsen +HOME=$(pwd)/tmp-home LEIN_HOME=$(pwd)/.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/tmp-home" \ + lein run -m elastickv.redis-workload \ + --nodes n1,n2,n3,n4,n5 \ + --time-limit 60 --rate 10 --concurrency 10 \ + --faults partition,kill,clock +``` + +The test will: +- build Linux/amd64 elastickv + raftadmin binaries on the control node, +- deploy them to each VM under `/opt/elastickv/bin`, +- start the cluster (bootstrap on `n1`, join others), +- run the Redis append workload with the Jepsen combined nemesis (partitions + process kills by default). + +## Tear down +```bash +cd jepsen +vagrant destroy -f +``` + +## Notes +- Ports: Redis 6379, gRPC/Raft 50051 on each node. +- SSH defaults: user `vagrant`, key `~/.ssh/id_rsa` inside `ctrl`. +- Adjust fault mix via `--faults partition,kill,clock` (aliases: `reboot` maps to `kill`). diff --git a/jepsen/Vagrantfile b/jepsen/Vagrantfile new file mode 100644 index 0000000..d96d3bf --- /dev/null +++ b/jepsen/Vagrantfile @@ -0,0 +1,43 @@ +NODES = { + ctrl: "192.168.56.10", + n1: "192.168.56.11", + n2: "192.168.56.12", + n3: "192.168.56.13", + n4: "192.168.56.14", + n5: "192.168.56.15" +}.freeze + +Vagrant.configure("2") do |config| + config.ssh.insert_key = false + #config.vm.box = "debian/bookworm64" + config.vm.box = "bento/debian-12" + + NODES.each do |name, ip| + config.vm.define name do |node| + node.vm.hostname = name.to_s + node.vm.network "private_network", ip: ip + + # VirtualBox (Intel) defaults + node.vm.provider "virtualbox" do |vb| + vb.memory = name == :ctrl ? 4096 : 2048 + vb.cpus = 2 + end + + # UTM (Apple Silicon) defaults + node.vm.provider "utm" do |utm| + utm.memory = name == :ctrl ? 4096 : 2048 + utm.cpus = 2 + end + + if name == :ctrl + node.vm.synced_folder "..", "/home/vagrant/elastickv", + type: "rsync", + rsync__exclude: [".git", "target", "build", ".cache", "tmp-home", "jepsen/target", "jepsen/tmp-home"] + else + node.vm.synced_folder ".", "/vagrant", disabled: true + end + + node.vm.provision "shell", path: "provision/base.sh", args: name.to_s + end + end +end diff --git a/jepsen/src/elastickv/jepsen_test.clj b/jepsen/src/elastickv/jepsen_test.clj index 7e7c482..01e09bb 100644 --- a/jepsen/src/elastickv/jepsen_test.clj +++ b/jepsen/src/elastickv/jepsen_test.clj @@ -1,44 +1,10 @@ (ns elastickv.jepsen-test (:gen-class) - (:require [jepsen - [core :as jepsen] - [cli :as cli] - [db :as db] - [client :as client] - [checker :as checker]] - [jepsen.checker.timeline :as timeline] - [jepsen.tests.linearizable-register :as register] - [jepsen.nemesis :as nemesis]) - (:import (redis.clients.jedis Jedis))) - -(defrecord RedisClient [port] - client/Client - (open! [this test node] - (assoc this :conn (Jedis. (name node) port))) - (close! [this test] - (.close (:conn this)) - this) - (setup! [this test]) - (teardown! [this test]) - (invoke! [this test op] - (let [conn (:conn this) - value (:value op)] - (case (:f op) - :write (do (.set conn "k" (pr-str value)) - (assoc op :type :ok)) - :read (let [v (.get conn "k")] - (assoc op :type :ok - :value (when v (read-string v)))) - (assoc op :type :fail :error :unknown-op))))) + (:require [elastickv.redis-workload :as redis-workload] + [jepsen.cli :as cli])) (defn elastickv-test [] - (register/test - {:name "elastickv-register" - :nodes ["n1" "n2" "n3"] - :db db/noop - :client (->RedisClient 63791) - :concurrency 5 - :nemesis (nemesis/partition-random-halves)})) + (redis-workload/elastickv-redis-test {})) (defn -main [& args] From d160d991c1495bc1451e9bf5943f703929c29822 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 2 Jan 2026 17:35:24 +0900 Subject: [PATCH 3/6] Update JEPSEN_TODO and add CI workflow --- .github/workflows/jepsen.yml | 72 ++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 .github/workflows/jepsen.yml diff --git a/.github/workflows/jepsen.yml b/.github/workflows/jepsen.yml new file mode 100644 index 0000000..dca0504 --- /dev/null +++ b/.github/workflows/jepsen.yml @@ -0,0 +1,72 @@ +name: Jepsen VM Run + +on: + workflow_dispatch: + inputs: + time-limit: + description: "Workload runtime seconds" + required: false + default: "60" + rate: + description: "Ops/sec per worker" + required: false + default: "10" + faults: + description: "Comma-separated faults (partition,kill,clock)" + required: false + default: "partition,kill,clock" + +jobs: + jepsen: + # Requires a self-hosted runner with VirtualBox + Vagrant and at least 12GB RAM free. + runs-on: [self-hosted, virtualbox] + timeout-minutes: 120 + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.25.5" + + - name: Bring up Jepsen VMs + working-directory: jepsen + run: vagrant up + + - name: Run Jepsen workload + working-directory: jepsen + env: + HOME: ${{ github.workspace }}/jepsen/tmp-home + LEIN_HOME: ${{ github.workspace }}/jepsen/.lein + LEIN_JVM_OPTS: -Duser.home=${{ github.workspace }}/jepsen/tmp-home + run: | + vagrant ssh ctrl -c "cd ~/elastickv/jepsen && \ + lein run -m elastickv.redis-workload \ + --nodes n1,n2,n3,n4,n5 \ + --time-limit ${{ github.event.inputs['time-limit'] || github.event.inputs.time-limit }} \ + --rate ${{ github.event.inputs['rate'] || github.event.inputs.rate }} \ + --faults ${{ github.event.inputs['faults'] || github.event.inputs.faults }} \ + --concurrency 10" + + - name: Collect Jepsen artifacts + if: always() + working-directory: jepsen + run: | + mkdir -p $GITHUB_WORKSPACE/artifacts + vagrant ssh ctrl -c "cd ~/elastickv/jepsen && tar czf /home/vagrant/results.tgz store/ target/ tmp-home/jepsen.store || true" + vagrant scp ctrl:/home/vagrant/results.tgz $GITHUB_WORKSPACE/artifacts/results.tgz || true + + - name: Destroy VMs + if: always() + working-directory: jepsen + run: vagrant destroy -f + + - name: Upload artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: jepsen-results + path: artifacts/results.tgz From 095a3287afa4a681318c0eae4530f5951ff5a867 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 2 Jan 2026 17:43:02 +0900 Subject: [PATCH 4/6] Fix Jepsen CI: add missing provision script and improve wait logic --- jepsen/.gitignore | 7 +++ jepsen/provision/base.sh | 97 +++++++++++++++++++++++++++++++++++++ jepsen/src/elastickv/db.clj | 4 +- 3 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 jepsen/.gitignore create mode 100755 jepsen/provision/base.sh diff --git a/jepsen/.gitignore b/jepsen/.gitignore new file mode 100644 index 0000000..881c49b --- /dev/null +++ b/jepsen/.gitignore @@ -0,0 +1,7 @@ +/target +/tmp-home +/.lein +/.vagrant +/.vagrant.d +/store +/results.tgz diff --git a/jepsen/provision/base.sh b/jepsen/provision/base.sh new file mode 100755 index 0000000..567d562 --- /dev/null +++ b/jepsen/provision/base.sh @@ -0,0 +1,97 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROLE="${1:-db}" + +echo "[jepsen] provisioning role=${ROLE}" +sudo apt-get update -y +sudo apt-get install -y --no-install-recommends \ + curl netcat-openbsd rsync iptables chrony libfaketime \ + openjdk-17-jre-headless leiningen git build-essential jq + +sudo systemctl enable --now chrony + +# Append Jepsen nodes to /etc/hosts if not already present +if ! grep -q "192.168.56.10 ctrl" /etc/hosts; then +cat <<'EOF' | sudo tee -a /etc/hosts >/dev/null + +# Jepsen Cluster +192.168.56.10 ctrl +192.168.56.11 n1 +192.168.56.12 n2 +192.168.56.13 n3 +192.168.56.14 n4 +192.168.56.15 n5 +EOF +fi + +install -d -m700 /home/vagrant/.ssh +if [ "$ROLE" = "ctrl" ]; then + GO_VERSION=1.25.5 + ARCH=$(dpkg --print-architecture) + if [ "$ARCH" = "arm64" ]; then + GO_ARCH="arm64" + else + GO_ARCH="amd64" + fi + + if ! command -v go >/dev/null 2>&1 || [[ "$(go version | awk '{print $3}')" != "go${GO_VERSION}" ]]; then + echo "[jepsen] installing go ${GO_VERSION} for ${GO_ARCH}" + curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz" -o /tmp/go.tar.gz + sudo rm -rf /usr/local/go + sudo tar -C /usr/local -xzf /tmp/go.tar.gz + fi + echo 'export PATH=$PATH:/usr/local/go/bin:$HOME/go/bin' | sudo tee /etc/profile.d/go.sh >/dev/null + + if [ ! -f /home/vagrant/.ssh/id_rsa ]; then + cat <<'KEY' > /home/vagrant/.ssh/id_rsa +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEA6NF8iallvQVp22WDkTkyrtvp9eWW6A8YVr+kz4TjGYe7gHzI +w+niNltGEFHzD8+v1I2YJ6oXevct1YeS0o9HZyN1Q9qgCgzUFtdOKLv6IedplqoP +kcmF0aYet2PkEDo3MlTBckFXPITAMzF8dJSIFo9D8HfdOV0IAdx4O7PtixWKn5y2 +hMNG0zQPyUecp4pzC6kivAIhyfHilFR61RGL+GPXQ2MWZWFYbAGjyiYJnAmCP3NO +Td0jMZEnDkbUvxhMmBYSdETk1rRgm+R4LOzFUGaHqHDLKLX+FIPKcF96hrucXzcW +yLbIbEgE98OHlnVYCzRdK8jlqm8tehUc9c9WhQIBIwKCAQEA4iqWPJXtzZA68mKd +ELs4jJsdyky+ewdZeNds5tjcnHU5zUYE25K+ffJED9qUWICcLZDc81TGWjHyAqD1 +Bw7XpgUwFgeUJwUlzQurAv+/ySnxiwuaGJfhFM1CaQHzfXphgVml+fZUvnJUTvzf +TK2Lg6EdbUE9TarUlBf/xPfuEhMSlIE5keb/Zz3/LUlRg8yDqz5w+QWVJ4utnKnK +iqwZN0mwpwU7YSyJhlT4YV1F3n4YjLswM5wJs2oqm0jssQu/BT0tyEXNDYBLEF4A +sClaWuSJ2kjq7KhrrYXzagqhnSei9ODYFShJu8UWVec3Ihb5ZXlzO6vdNQ1J9Xsf +4m+2ywKBgQD6qFxx/Rv9CNN96l/4rb14HKirC2o/orApiHmHDsURs5rUKDx0f9iP +cXN7S1uePXuJRK/5hsubaOCx3Owd2u9gD6Oq0CsMkE4CUSiJcYrMANtx54cGH7Rk +EjFZxK8xAv1ldELEyxrFqkbE4BKd8QOt414qjvTGyAK+OLD3M2QdCQKBgQDtx8pN +CAxR7yhHbIWT1AH66+XWN8bXq7l3RO/ukeaci98JfkbkxURZhtxV/HHuvUhnPLdX +3TwygPBYZFNo4pzVEhzWoTtnEtrFueKxyc3+LjZpuo+mBlQ6ORtfgkr9gBVphXZG +YEzkCD3lVdl8L4cw9BVpKrJCs1c5taGjDgdInQKBgHm/fVvv96bJxc9x1tffXAcj +3OVdUN0UgXNCSaf/3A/phbeBQe9xS+3mpc4r6qvx+iy69mNBeNZ0xOitIjpjBo2+ +dBEjSBwLk5q5tJqHmy/jKMJL4n9ROlx93XS+njxgibTvU6Fp9w+NOFD/HvxB3Tcz +6+jJF85D5BNAG3DBMKBjAoGBAOAxZvgsKN+JuENXsST7F89Tck2iTcQIT8g5rwWC +P9Vt74yboe2kDT531w8+egz7nAmRBKNM751U/95P9t88EDacDI/Z2OwnuFQHCPDF +llYOUI+SpLJ6/vURRbHSnnn8a/XG+nzedGH5JGqEJNQsz+xT2axM0/W/CRknmGaJ +kda/AoGANWrLCz708y7VYgAtW2Uf1DPOIYMdvo6fxIB5i9ZfISgcJ/bbCUkFrhoH ++vq/5CIWxCPp0f85R4qxxQ5ihxJ0YDQT9Jpx4TMss4PSavPaBH3RXow5Ohe+bYoQ +NE5OgEXk2wVfZczCZpigBKbKZHNYcelXtTt/nP3rsCuGcM4h53s= +-----END RSA PRIVATE KEY----- +KEY + chmod 600 /home/vagrant/.ssh/id_rsa + chown vagrant:vagrant /home/vagrant/.ssh/id_rsa + fi + cat <<'EOF' > /home/vagrant/.ssh/config +Host n1 n2 n3 n4 n5 + User vagrant + IdentityFile /home/vagrant/.ssh/id_rsa + StrictHostKeyChecking no + UserKnownHostsFile /dev/null + LogLevel QUIET +EOF + chown vagrant:vagrant /home/vagrant/.ssh/config +fi + +# authorize the same key on all nodes +cat <<'PUB' >> /home/vagrant/.ssh/authorized_keys +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDo0XyJqWW9BWnbZYOROSyu2+n15ZbrgPGFa/pM+E4xmHu4B8yMPp4jbWRhR8w/Pr9SNmCeqF3r3LdWHktKPR2cjduPaoAoM1BbXTii7+iHnaZaqD5HJhXQhr3Y+QQOjcYVMFyQU8hMAzMF8dJSIFo9D8HfdOV0IAdx4O7PtixWKn5y2hMNG0zQPyUecp4pzC6kivAIhyfHilFR61RGL+GPXQ2MWZWFYbAGjyiYJnAmCP3NOTd0jMZEnDkbUvxhMmBYSdETk1rRgm+R4LOzFUGaHqHDLKLX+FIPKcF96hrucXzcWyLbIbEgE98OHlnVYCzRdK8jlqm8tehUc9c9WhQ== vagrant insecure public key +PUB +chown vagrant:vagrant /home/vagrant/.ssh/authorized_keys +chmod 600 /home/vagrant/.ssh/authorized_keys + +sudo mkdir -p /opt/elastickv/bin /var/lib/elastickv diff --git a/jepsen/src/elastickv/db.clj b/jepsen/src/elastickv/db.clj index 5fbcd11..2ed5ba3 100644 --- a/jepsen/src/elastickv/db.clj +++ b/jepsen/src/elastickv/db.clj @@ -113,8 +113,8 @@ [node grpc-port] (c/on node (c/exec :bash "-c" - (format "for i in $(seq 1 60); do nc -z %s %s && exit 0; sleep 1; done; exit 1" - (name node) grpc-port)))) + (format "for i in $(seq 1 60); do if nc -z -w 1 %s %s; then exit 0; fi; sleep 1; done; echo 'Timed out waiting for %s:%s'; exit 1" + (name node) grpc-port (name node) grpc-port)))) (defn- join-node! "Join peer into cluster via raftadmin, executed on bootstrap node." From 3a52b71b5d663d559274493c7a10c6a24adb7d4a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 2 Jan 2026 22:42:29 +0900 Subject: [PATCH 5/6] Potential fix for code scanning alert no. 13: Workflow does not contain permissions Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- .github/workflows/jepsen.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/jepsen.yml b/.github/workflows/jepsen.yml index dca0504..b60c7c4 100644 --- a/.github/workflows/jepsen.yml +++ b/.github/workflows/jepsen.yml @@ -1,5 +1,8 @@ name: Jepsen VM Run +permissions: + contents: read + on: workflow_dispatch: inputs: From 967556fedb0aabd79731265d440b3b825aa31850 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 2 Jan 2026 22:44:40 +0900 Subject: [PATCH 6/6] Refactor: change flag names to camelCase and improve demo cluster startup --- cmd/server/demo.go | 196 ++++++++++++++++++++++++++++++------ jepsen/src/elastickv/db.clj | 10 +- main.go | 10 +- 3 files changed, 177 insertions(+), 39 deletions(-) diff --git a/cmd/server/demo.go b/cmd/server/demo.go index cbdf7a9..7b828c2 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -3,15 +3,18 @@ package main import ( "context" "flag" + "fmt" "log/slog" "net" "os" "path/filepath" "strings" + "time" "github.com/Jille/raft-grpc-leader-rpc/leaderhealth" transport "github.com/Jille/raft-grpc-transport" "github.com/Jille/raftadmin" + raftadminpb "github.com/Jille/raftadmin/proto" "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/kv" pb "github.com/bootjp/elastickv/proto" @@ -27,17 +30,20 @@ import ( var ( address = flag.String("address", ":50051", "gRPC/Raft address") - redisAddress = flag.String("redis_address", ":6379", "Redis address") - raftID = flag.String("raft_id", "", "Raft ID") - raftDataDir = flag.String("raft_data_dir", "/var/lib/elastickv", "Raft data directory") - raftBootstrap = flag.Bool("raft_bootstrap", false, "Bootstrap cluster") - raftRedisMap = flag.String("raft_redis_map", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)") + redisAddress = flag.String("redisAddress", ":6379", "Redis address") + raftID = flag.String("raftId", "", "Raft ID") + raftDataDir = flag.String("raftDataDir", "/var/lib/elastickv", "Raft data directory") + raftBootstrap = flag.Bool("raftBootstrap", false, "Bootstrap cluster") + raftRedisMap = flag.String("raftRedisMap", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)") ) const ( raftSnapshotsRetain = 2 kvParts = 2 defaultFileMode = 0755 + joinRetries = 20 + joinWait = 3 * time.Second + joinRetryInterval = 1 * time.Second ) func init() { @@ -46,24 +52,156 @@ func init() { }))) } +type config struct { + address string + redisAddress string + raftID string + raftDataDir string + raftBootstrap bool + raftRedisMap string +} + func main() { flag.Parse() - if *raftID == "" { - slog.Error("raft_id is required") - os.Exit(1) - } eg := &errgroup.Group{} - if err := run(eg); err != nil { - slog.Error(err.Error()) - os.Exit(1) + + if *raftID != "" { + // Single node mode + cfg := config{ + address: *address, + redisAddress: *redisAddress, + raftID: *raftID, + raftDataDir: *raftDataDir, + raftBootstrap: *raftBootstrap, + raftRedisMap: *raftRedisMap, + } + if err := run(eg, cfg); err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + } else { + // Demo cluster mode (3 nodes) + slog.Info("Starting demo cluster with 3 nodes...") + nodes := []config{ + { + address: "127.0.0.1:50051", + redisAddress: "127.0.0.1:63791", + raftID: "n1", + raftDataDir: "", // In-memory + raftBootstrap: true, + }, + { + address: "127.0.0.1:50052", + redisAddress: "127.0.0.1:63792", + raftID: "n2", + raftDataDir: "", + raftBootstrap: false, + }, + { + address: "127.0.0.1:50053", + redisAddress: "127.0.0.1:63793", + raftID: "n3", + raftDataDir: "", + raftBootstrap: false, + }, + } + + // Build raftRedisMap string + var mapParts []string + for _, n := range nodes { + mapParts = append(mapParts, n.address+"="+n.redisAddress) + } + raftRedisMapStr := strings.Join(mapParts, ",") + + for _, n := range nodes { + n.raftRedisMap = raftRedisMapStr + cfg := n // capture loop variable + if err := run(eg, cfg); err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + } + + // Wait for n1 to be ready then join others? + // Actually, standard bootstrap expects a configuration. + // If we only bootstrap n1, we need to join n2 and n3. + // For simplicity in this demo, let's bootstrap n1 with just n1, and have n2/n3 join. + // Or better: bootstrap n1 with {n1, n2, n3}. + // But run() logic for bootstrap only adds *raftID to configuration. + + // Let's modify bootstrapping logic in run() slightly or just rely on manual join? + // The original demo likely used raftadmin to join or predefined bootstrap. + // Since we can't easily change run() logic too much without breaking Jepsen, + // let's use a separate goroutine to join n2/n3 to n1 after a delay. + + eg.Go(func() error { + // Wait a bit for n1 to start + // This is hacky but sufficient for a demo + // Better would be to wait for gRPC readiness + // But standard 'sleep' is unavailable here without import time + // We can use a simple retry loop to join. + + // Actually, let's keep it simple: just start them. + // If n1 bootstraps as a single node cluster, n2 and n3 won't be part of it automatically. + // We need to issue 'add_voter' commands. + // Let's rely on an external script or add a helper here? + + // For this specific demo restoration, we'll assume the external script might handle joins + // OR we check if the CI script does it. + // The CI script just waits for ports. It runs `lein run ...` which assumes a cluster. + // If the cluster isn't formed, the tests might fail. + // BUT, looking at the previous demo.go (if I could), it probably did the joins. + + // Let's add a joiner goroutine. + return joinCluster(nodes) + }) } + if err := eg.Wait(); err != nil { slog.Error(err.Error()) os.Exit(1) } } +func joinCluster(nodes []config) error { + leader := nodes[0] + // Give servers some time to start + time.Sleep(joinWait) + + // Connect to leader + conn, err := grpc.NewClient(leader.address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to dial leader: %w", err) + } + defer conn.Close() + client := raftadminpb.NewRaftAdminClient(conn) + + ctx := context.Background() + for _, n := range nodes[1:] { + var joined bool + for i := 0; i < joinRetries; i++ { + slog.Info("Attempting to join node", "id", n.raftID, "address", n.address) + _, err := client.AddVoter(ctx, &raftadminpb.AddVoterRequest{ + Id: n.raftID, + Address: n.address, + PreviousIndex: 0, + }) + if err == nil { + slog.Info("Successfully joined node", "id", n.raftID) + joined = true + break + } + slog.Warn("Failed to join node, retrying...", "id", n.raftID, "err", err) + time.Sleep(joinRetryInterval) + } + if !joined { + return fmt.Errorf("failed to join node %s after retries", n.raftID) + } + } + return nil +} + func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, error) { if dir == "" { return raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), nil @@ -99,10 +237,10 @@ func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordina return s } -func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr string) (*adapter.RedisServer, error) { +func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr, redisAddr, raftRedisMapStr string) (*adapter.RedisServer, error) { leaderRedis := make(map[raft.ServerAddress]string) - if *raftRedisMap != "" { - parts := strings.Split(*raftRedisMap, ",") + if raftRedisMapStr != "" { + parts := strings.Split(raftRedisMapStr, ",") for _, part := range parts { kv := strings.Split(part, "=") if len(kv) == kvParts { @@ -111,20 +249,20 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co } } // Ensure self is in map (override if present) - leaderRedis[raft.ServerAddress(addr)] = *redisAddress + leaderRedis[raft.ServerAddress(addr)] = redisAddr - l, err := lc.Listen(ctx, "tcp", *redisAddress) + l, err := lc.Listen(ctx, "tcp", redisAddr) if err != nil { return nil, errors.WithStack(err) } return adapter.NewRedisServer(l, st, coordinator, leaderRedis), nil } -func run(eg *errgroup.Group) error { +func run(eg *errgroup.Group, cfg config) error { ctx := context.Background() var lc net.ListenConfig - ldb, sdb, fss, err := setupStorage(*raftDataDir) + ldb, sdb, fss, err := setupStorage(cfg.raftDataDir) if err != nil { return err } @@ -134,15 +272,15 @@ func run(eg *errgroup.Group) error { // Config c := raft.DefaultConfig() - c.LocalID = raft.ServerID(*raftID) + c.LocalID = raft.ServerID(cfg.raftID) c.Logger = hclog.New(&hclog.LoggerOptions{ - Name: "raft-" + *raftID, + Name: "raft-" + cfg.raftID, JSONFormat: true, Level: hclog.Info, }) // Transport - tm := transport.New(raft.ServerAddress(*address), []grpc.DialOption{ + tm := transport.New(raft.ServerAddress(cfg.address), []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), }) @@ -151,13 +289,13 @@ func run(eg *errgroup.Group) error { return errors.WithStack(err) } - if *raftBootstrap { + if cfg.raftBootstrap { cfg := raft.Configuration{ Servers: []raft.Server{ { Suffrage: raft.Voter, - ID: raft.ServerID(*raftID), - Address: raft.ServerAddress(*address), + ID: raft.ServerID(cfg.raftID), + Address: raft.ServerAddress(cfg.address), }, }, } @@ -172,23 +310,23 @@ func run(eg *errgroup.Group) error { s := setupGRPC(r, st, tm, coordinator) - grpcSock, err := lc.Listen(ctx, "tcp", *address) + grpcSock, err := lc.Listen(ctx, "tcp", cfg.address) if err != nil { return errors.WithStack(err) } eg.Go(func() error { - slog.Info("Starting gRPC server", "address", *address) + slog.Info("Starting gRPC server", "address", cfg.address) return errors.WithStack(s.Serve(grpcSock)) }) - rd, err := setupRedis(ctx, lc, st, coordinator, *address) + rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap) if err != nil { return err } eg.Go(func() error { - slog.Info("Starting Redis server", "address", *redisAddress) + slog.Info("Starting Redis server", "address", cfg.redisAddress) return errors.WithStack(rd.Run()) }) diff --git a/jepsen/src/elastickv/db.clj b/jepsen/src/elastickv/db.clj index 2ed5ba3..63917a4 100644 --- a/jepsen/src/elastickv/db.clj +++ b/jepsen/src/elastickv/db.clj @@ -87,11 +87,11 @@ bootstrap? (= node bootstrap-node) args (cond-> [server-bin "--address" grpc - "--redis_address" redis - "--raft_id" (name node) - "--raft_data_dir" data-dir - "--raft_redis_map" raft-redis-map] - bootstrap? (conj "--raft_bootstrap"))] + "--redisAddress" redis + "--raftId" (name node) + "--raftDataDir" data-dir + "--raftRedisMap" raft-redis-map] + bootstrap? (conj "--raftBootstrap"))] (c/on node (c/su (c/exec :mkdir :-p data-dir) diff --git a/main.go b/main.go index fdf00a1..0013cb1 100644 --- a/main.go +++ b/main.go @@ -34,17 +34,17 @@ const ( var ( myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node") - redisAddr = flag.String("redis_address", "localhost:6379", "TCP host+port for redis") - raftId = flag.String("raft_id", "", "Node id used by Raft") - raftDir = flag.String("raft_data_dir", "data/", "Raft data dir") - raftBootstrap = flag.Bool("raft_bootstrap", false, "Whether to bootstrap the Raft cluster") + redisAddr = flag.String("redisAddress", "localhost:6379", "TCP host+port for redis") + raftId = flag.String("raftId", "", "Node id used by Raft") + raftDir = flag.String("raftDataDir", "data/", "Raft data dir") + raftBootstrap = flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") ) func main() { flag.Parse() if *raftId == "" { - log.Fatalf("flag --raft_id is required") + log.Fatalf("flag --raftId is required") } ctx := context.Background()