diff --git a/.github/workflows/go-checks.yml b/.github/workflows/go-checks.yml index e6cddc3..a8d601c 100644 --- a/.github/workflows/go-checks.yml +++ b/.github/workflows/go-checks.yml @@ -23,5 +23,7 @@ jobs: with: version: latest skip-pkg-cache: true + - name : Test + run: go test -v ./... - name: Build run: go build -v ./... diff --git a/.gitignore b/.gitignore index 944744e..1fda42d 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ # vendor/ diagnostics + +# IDE configuration +.idea \ No newline at end of file diff --git a/README.md b/README.md index d249387..2185a04 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ - [Command line arguments](#command-line-arguments) - [Logs](#logs) - [Reorg scanner](#reorg-scanner) + - [Sync stages](#sync-stages) - [Block body download](#block-body-download) - [Ideas for Possible improvements](#ideas-for-possible-improvements) @@ -238,7 +239,7 @@ for the content fetched by the `fetchContent` javascript function and inserted i ## Flags Operator can look at the flags that are set in cli context by the user to launch Erigon node. The corresponding code in Erigon is in the file `diagnostics/flags.go`. This is particularly useful when user launches Erigon using a config file with `--config` and [Command line arguments](#command-line-arguments) cannot fully capture the true state of the 'launch setting'. The returned flags are the result after parsing command line argument and config file by Erigon. -The code on the side of the diagnostics system is spread across files `cmd/ui_handler.go` (invocation of `processFlags` function), `cmd/flags.go`, `assests/template/session.html` (html template the part where the button `Fetch Flags` is defined with the javascript handler), `assests/script/session.js` (function `fetchContent`), `assets/template/flags.html` (html template for the content fetched by the `fetchContent` javascript function and inserted into the HTML div element). +The code on the side of the diagnostics system is spread across files `cmd/ui_handler.go` (invocation of `processFlags` function), `cmd/flags.go`, `assets/template/session.html` (html template the part where the button `Fetch Flags` is defined with the javascript handler), `assets/script/session.js` (function `fetchContent`), `assets/template/flags.html` (html template for the content fetched by the `fetchContent` javascript function and inserted into the HTML div element). ![flags](/images/flags.png) @@ -278,6 +279,18 @@ one for each reorged block found). ![scan reorgs](/images/scan_reorgs.png) +## Sync stages + +This is another example of how the diagnostics system can access the Erigon node's database remotely, via `erigon support` tunnel. +This feature adds an ability to see the node's sync stage, by returning the number of synced blocks per stage. + + +The code on the side of the diagnostics system is spread across files `cmd/ui_handler.go` (invocation of `findSyncStages` function), `cmd/sync_stages.go`, `cmd/remote_db.org` (using the same remote database access logic as [Reorg Scanner](#reorg-scanner)), `assets/template/session.html` +(HTML template the part where the button `Fetch Sync Stages` is defined with the javascript handler), `assets/script/session.js` (function `fetchContent`), `assets/template/sync_stages.html` +(HTML template for the content fetched by the `fetchContent` javascript function and inserted into the HTML table). + +![sync_stage](/images/sync_stages.png) + ## Block Body Download This is the first crude example of monitoring an algorithms involving many items (in that case block bodies) transitioning through the series of states. diff --git a/assets/template/session.html b/assets/template/session.html index 67dfe48..0e93b35 100644 --- a/assets/template/session.html +++ b/assets/template/session.html @@ -52,6 +52,10 @@

SESSION: {{.SessionName}} PIN: {{.SessionPin}}

+
+ +
+
diff --git a/assets/template/sync_stages.html b/assets/template/sync_stages.html new file mode 100644 index 0000000..d683db7 --- /dev/null +++ b/assets/template/sync_stages.html @@ -0,0 +1,12 @@ + + + + + + {{range $key, $value := .}} + + + + + {{end}} +
StageProgress (blocks)
{{$key}}{{$value}}
diff --git a/cmd/bodies_download.go b/cmd/bodies_download.go index 0eb8382..7233715 100644 --- a/cmd/bodies_download.go +++ b/cmd/bodies_download.go @@ -49,17 +49,18 @@ func (uih *UiHandler) bodiesDownload(ctx context.Context, w http.ResponseWriter, default: } // First, fetch list of DB paths - success, result := uih.fetch(fmt.Sprintf("/block_body_download?sincetick=%d\n", tick), requestChannel) + success, result := uih.remoteApi.fetch(fmt.Sprintf("/block_body_download?sincetick=%d\n", tick), requestChannel) if !success { fmt.Fprintf(w, "Fetching list of changes: %s", result) return } - lines := strings.Split(result, "\n") - if len(lines) == 0 || !strings.HasPrefix(lines[0], successLine) { - fmt.Fprintf(w, "incorrect response (first line needs to be SUCCESS)\n") + + lines, resultExtractErr := uih.remoteApi.getResultLines(result) + if resultExtractErr != nil { + fmt.Fprintf(w, "incorrect response: %v\n", resultExtractErr) return } - lines = lines[1:] + var changesMode bool var err error changes := map[uint64]struct{}{} diff --git a/cmd/bridge_handler.go b/cmd/bridge_handler.go index f9b65b2..7642297 100644 --- a/cmd/bridge_handler.go +++ b/cmd/bridge_handler.go @@ -98,7 +98,7 @@ func (bh *BridgeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { request.response = nil request.err = fmt.Sprintf("writing metrics request: %v", err) request.retries++ - if request.retries < 16 { + if request.retries < MaxRequestRetries { select { case nodeSession.requestCh <- request: default: @@ -117,7 +117,7 @@ func (bh *BridgeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { request.response = nil request.err = fmt.Sprintf("reading size of metrics response: %v", err) request.retries++ - if request.retries < 16 { + if request.retries < MaxRequestRetries { select { case nodeSession.requestCh <- request: default: @@ -134,7 +134,7 @@ func (bh *BridgeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { request.response = nil request.err = fmt.Sprintf("reading metrics response: %v", err) request.retries++ - if request.retries < 16 { + if request.retries < MaxRequestRetries { select { case nodeSession.requestCh <- request: default: diff --git a/cmd/remote_api.go b/cmd/remote_api.go new file mode 100644 index 0000000..e9bc446 --- /dev/null +++ b/cmd/remote_api.go @@ -0,0 +1,62 @@ +package cmd + +import ( + "fmt" + "strings" + "time" +) + +type RemoteApiReader interface { + fetch(url string, requestChannel chan *NodeRequest) (bool, string) + getResultLines(result string) ([]string, error) +} + +type RemoteApi struct{} + +func (ra *RemoteApi) fetch(url string, requestChannel chan *NodeRequest) (bool, string) { + if requestChannel == nil { + return false, "ERROR: Node is not allocated\n" + } + // Request command line arguments + nodeRequest := &NodeRequest{url: url} + requestChannel <- nodeRequest + var sb strings.Builder + var success bool + for nodeRequest != nil { + nodeRequest.lock.Lock() + clear := nodeRequest.served + if nodeRequest.served { + if nodeRequest.err == "" { + sb.Reset() + sb.Write(nodeRequest.response) + success = true + } else { + success = false + fmt.Fprintf(&sb, "ERROR: %s\n", nodeRequest.err) + if nodeRequest.retries < MaxRequestRetries { + clear = false + } + } + } + nodeRequest.lock.Unlock() + if clear { + nodeRequest = nil + } else { + time.Sleep(100 * time.Millisecond) + } + } + return success, sb.String() +} + +func (ra *RemoteApi) getResultLines(result string) ([]string, error) { + lines := strings.Split(result, "\n") + if len(lines) == 0 || !strings.HasPrefix(lines[0], successLine) { + return nil, fmt.Errorf("incorrect response (first line needs to be SUCCESS): %s", result) + } + + if len(lines) > 0 && len(lines[len(lines)-1]) == 0 { + lines = lines[:len(lines)-1] + } + + return lines[1:], nil +} diff --git a/cmd/remote_api_test.go b/cmd/remote_api_test.go new file mode 100644 index 0000000..a2d4e6f --- /dev/null +++ b/cmd/remote_api_test.go @@ -0,0 +1,56 @@ +package cmd + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetResultLines(t *testing.T) { + tt := []struct { + name string + result string + assert func(lines []string) + wantErrMsg string + }{ + { + name: "should successfully get data lines from multi-line result", + result: "SUCCESS\nfirst_line", + assert: func(lines []string) { + assert.Equal(t, []string{"first_line"}, lines) + }, + }, + { + name: "should remove the last empty line from the result", + result: "SUCCESS\nfirst_line\n", + assert: func(lines []string) { + assert.Equal(t, []string{"first_line"}, lines) + }, + }, + { + name: "should return first line needs to be SUCCESS error", + result: "FAILURE\nfirst_line", + wantErrMsg: fmt.Sprintf("incorrect response (first line needs to be SUCCESS): %s", "FAILURE\nfirst_line"), + }, + { + name: "should return first line needs to be SUCCESS error when no lines are returned", + result: "", + wantErrMsg: fmt.Sprintf("incorrect response (first line needs to be SUCCESS): %s", ""), + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + remoteApi := &RemoteApi{} + + syncStageProgress, err := remoteApi.getResultLines(tc.result) + + if tc.wantErrMsg != "" { + assert.EqualErrorf(t, err, tc.wantErrMsg, "expected error %q, got %s", tc.wantErrMsg, err) + return + } + + tc.assert(syncStageProgress) + }) + } +} diff --git a/cmd/remote_db.go b/cmd/remote_db.go index 55fbfd4..36bd03c 100644 --- a/cmd/remote_db.go +++ b/cmd/remote_db.go @@ -6,35 +6,77 @@ import ( "strings" ) +type RemoteDbReader interface { + Init(db string, table string, initialKey []byte) error + Next() ([]byte, []byte, error) +} + type RemoteCursor struct { - uih *UiHandler + remoteApi RemoteApiReader requestChannel chan *NodeRequest dbPath string table string lines []string // Parsed response } -func NewRemoteCursor(dbPath string, table string, requestChannel chan *NodeRequest, initialKey []byte) (*RemoteCursor, error) { - rc := &RemoteCursor{dbPath: dbPath, table: table, requestChannel: requestChannel} +func NewRemoteCursor(remoteApi RemoteApiReader, requestChannel chan *NodeRequest) *RemoteCursor { + rc := &RemoteCursor{remoteApi: remoteApi, requestChannel: requestChannel} + + return rc +} + +func (rc *RemoteCursor) Init(db string, table string, initialKey []byte) error { + dbPath, dbPathErr := rc.findFullDbPath(db) + + if dbPathErr != nil { + return dbPathErr + } + + rc.dbPath = dbPath + rc.table = table + if err := rc.nextTableChunk(initialKey); err != nil { - return nil, err + return err + } + + return nil +} + +func (rc *RemoteCursor) findFullDbPath(db string) (string, error) { + success, dbListResponse := rc.remoteApi.fetch("/db/list\n", rc.requestChannel) + if !success { + return "", fmt.Errorf("unable to fetch database list: %s", dbListResponse) + } + + lines, err := rc.remoteApi.getResultLines(dbListResponse) + if err != nil { + return "", err + } + + var dbPath string + for _, line := range lines { + if strings.HasSuffix(line, fmt.Sprintf("/%s", db)) { + dbPath = line + } + } + + if dbPath == "" { + return "", fmt.Errorf("database %s not found: %v", db, dbListResponse) } - return rc, nil + + return dbPath, nil } func (rc *RemoteCursor) nextTableChunk(startKey []byte) error { - success, result := rc.uih.fetch(fmt.Sprintf("/db/read?path=%s&table=%s&key=%x\n", rc.dbPath, rc.table, startKey), rc.requestChannel) + success, result := rc.remoteApi.fetch(fmt.Sprintf("/db/read?path=%s&table=%s&key=%x\n", rc.dbPath, rc.table, startKey), rc.requestChannel) if !success { return fmt.Errorf("reading %s table: %s", rc.table, result) } - lines := strings.Split(result, "\n") - if len(lines) == 0 || !strings.HasPrefix(lines[0], successLine) { - return fmt.Errorf("incorrect response (first line needs to be SUCCESS): %v", lines) - } - lines = lines[1:] - if len(lines) > 0 && len(lines[len(lines)-1]) == 0 { - lines = lines[:len(lines)-1] + lines, err := rc.remoteApi.getResultLines(result) + if err != nil { + return err } + rc.lines = lines return nil } @@ -61,6 +103,10 @@ func advance(key []byte) []byte { } func (rc *RemoteCursor) Next() ([]byte, []byte, error) { + if rc.dbPath == "" || rc.table == "" { + return nil, nil, fmt.Errorf("cursor not initialized") + } + if len(rc.lines) == 0 { return nil, nil, nil } @@ -78,6 +124,7 @@ func (rc *RemoteCursor) Next() ([]byte, []byte, error) { return nil, nil, fmt.Errorf("could not parse the value [%s]: %v", line[sepIndex+3:], e) } rc.lines = rc.lines[1:] + if len(rc.lines) == 0 { if e = rc.nextTableChunk(advance(k)); e != nil { return k, v, e diff --git a/cmd/remote_db_test.go b/cmd/remote_db_test.go new file mode 100644 index 0000000..d9586bb --- /dev/null +++ b/cmd/remote_db_test.go @@ -0,0 +1,141 @@ +package cmd + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "testing" +) + +type mockRemoteApiReader struct { + mock.Mock +} + +func (ra *mockRemoteApiReader) fetch(url string, requestChannel chan *NodeRequest) (bool, string) { + args := ra.Called(url, requestChannel) + return args.Bool(0), args.String(1) +} +func (ra *mockRemoteApiReader) getResultLines(result string) ([]string, error) { + args := ra.Called(result) + return args.Get(0).([]string), args.Error(1) +} + +type remoteCursorDependencies struct { + remoteApi *mockRemoteApiReader + requestChannel chan *NodeRequest +} + +func TestInit(t *testing.T) { + var ( + db = "testDb" + table = "testTable" + initialKey []byte = nil + dbPath = fmt.Sprintf("/full/path/%s", db) + lineKey = "lineKey" + lineValue = "lineValue" + dependencyError = fmt.Errorf("error") + ) + + tt := []struct { + name string + on func(*remoteCursorDependencies) + assert func(rc *RemoteCursor) + wantErrMsg string + }{ + { + name: "should successfully initialize remote cursor", + on: func(df *remoteCursorDependencies) { + dbListResult := fmt.Sprintf("SUCCESS\n/full/path/%s", db) + tableLine := fmt.Sprintf("%s | %s", lineKey, lineValue) + tableLinesResult := fmt.Sprintf("SUCCESS\n%s", tableLine) + + df.remoteApi.On("fetch", "/db/list\n", df.requestChannel).Return(true, dbListResult) + df.remoteApi.On("getResultLines", dbListResult).Return([]string{dbPath}, nil) + df.remoteApi.On("fetch", fmt.Sprintf("/db/read?path=%s&table=%s&key=%x\n", dbPath, table, initialKey), df.requestChannel).Return(true, tableLinesResult) + df.remoteApi.On("getResultLines", tableLinesResult).Return([]string{tableLine}, nil) + }, + assert: func(rc *RemoteCursor) { + assert.Equal(t, dbPath, rc.dbPath) + assert.Equal(t, table, rc.table) + assert.Equal(t, []string{fmt.Sprintf("%s | %s", lineKey, lineValue)}, rc.lines) + }, + }, + { + name: "should return database not found error", + on: func(df *remoteCursorDependencies) { + dbListResult := fmt.Sprintf("SUCCESS\n/full/path/%s", "notFoundDb") + + df.remoteApi.On("fetch", "/db/list\n", df.requestChannel).Return(true, dbListResult) + df.remoteApi.On("getResultLines", dbListResult).Return([]string{"notFoundDb"}, nil) + }, + wantErrMsg: fmt.Sprintf("database %s not found: %s", db, fmt.Sprintf("SUCCESS\n/full/path/%s", "notFoundDb")), + }, + { + name: "should return unable to fetch database list error", + on: func(df *remoteCursorDependencies) { + df.remoteApi.On("fetch", "/db/list\n", df.requestChannel).Return(false, dependencyError.Error()) + }, + wantErrMsg: fmt.Sprintf("unable to fetch database list: %s", dependencyError.Error()), + }, + { + name: "should return error when db list result can not be parsed", + on: func(df *remoteCursorDependencies) { + dbPathResult := fmt.Sprintf("FAILURE\n/full/path/%s", db) + + df.remoteApi.On("fetch", "/db/list\n", df.requestChannel).Return(true, dbPathResult) + df.remoteApi.On("getResultLines", dbPathResult).Return([]string{}, dependencyError) + }, + wantErrMsg: dependencyError.Error(), + }, + { + name: "should return reading table error", + on: func(df *remoteCursorDependencies) { + dbListResult := fmt.Sprintf("SUCCESS\n/full/path/%s", db) + + df.remoteApi.On("fetch", "/db/list\n", df.requestChannel).Return(true, dbListResult) + df.remoteApi.On("getResultLines", dbListResult).Return([]string{dbPath}, nil) + df.remoteApi.On("fetch", fmt.Sprintf("/db/read?path=%s&table=%s&key=%x\n", dbPath, table, initialKey), df.requestChannel).Return(false, "") + }, + wantErrMsg: fmt.Sprintf("reading %s table: %s", table, ""), + }, + { + name: "should return error when table result can not be parsed", + on: func(df *remoteCursorDependencies) { + dbListResult := fmt.Sprintf("SUCCESS\n/full/path/%s", db) + tableLinesResult := fmt.Sprintf("FAILURE\n%s", "") + + df.remoteApi.On("fetch", "/db/list\n", df.requestChannel).Return(true, dbListResult) + df.remoteApi.On("getResultLines", dbListResult).Return([]string{dbPath}, nil) + df.remoteApi.On("fetch", fmt.Sprintf("/db/read?path=%s&table=%s&key=%x\n", dbPath, table, initialKey), df.requestChannel).Return(true, tableLinesResult) + df.remoteApi.On("getResultLines", tableLinesResult).Return([]string{}, dependencyError) + }, + wantErrMsg: dependencyError.Error(), + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + remoteApi := &mockRemoteApiReader{} + requestChannel := make(chan *NodeRequest) + rc := NewRemoteCursor(remoteApi, requestChannel) + + if tc.on != nil { + df := &remoteCursorDependencies{ + remoteApi: remoteApi, + requestChannel: requestChannel, + } + + tc.on(df) + } + + err := rc.Init(db, table, initialKey) + + if tc.wantErrMsg != "" { + assert.EqualErrorf(t, err, tc.wantErrMsg, "expected error %q, got %s", tc.wantErrMsg, err) + return + } + + tc.assert(rc) + }) + } +} diff --git a/cmd/reorgs.go b/cmd/reorgs.go index 6d66e3c..d112580 100644 --- a/cmd/reorgs.go +++ b/cmd/reorgs.go @@ -7,44 +7,27 @@ import ( "fmt" "html/template" "net/http" - "strings" "time" ) // Demonstration of the working with the Erigon database remotely on the example of getting information // about past reorganisation of the chain +const headersDb = "chaindata" +const headersTable = "Header" func (uih *UiHandler) findReorgs(ctx context.Context, w http.ResponseWriter, templ *template.Template, requestChannel chan *NodeRequest) { start := time.Now() - // First, fetch list of DB paths - success, result := uih.fetch("/db/list\n", requestChannel) - if !success { - fmt.Fprintf(w, "Fetching list of db paths: %s", result) - return - } - lines := strings.Split(result, "\n") - if len(lines) == 0 || !strings.HasPrefix(lines[0], successLine) { - fmt.Fprintf(w, "Incorrect response (first line needs to be SUCCESS): %v", lines) - return - } - var chaindataPath string - for _, line := range lines[1:] { - if strings.HasSuffix(line, "/chaindata") { - chaindataPath = line - } - } - if chaindataPath == "" { - fmt.Fprintf(w, "DB path chaindata not found: %v", lines) + + rc := NewRemoteCursor(uih.remoteApi, requestChannel) + + if err := rc.Init(headersDb, headersTable, nil); err != nil { + fmt.Fprintf(w, "Create remote cursor: %v", err) return } + // Go through "Header" table and look for entries with the same block number but different hashes var prevK []byte reorgCount := 0 - rc, err := NewRemoteCursor(chaindataPath, "Header", requestChannel, nil) - if err != nil { - fmt.Fprintf(w, "Create remote cursor: %v", err) - return - } var k []byte var e error var count int diff --git a/cmd/root.go b/cmd/root.go index 47aae93..a42f7c0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -106,10 +106,12 @@ func webServer() error { return fmt.Errorf("failed to create uiSessions: %v", err) } + remoteApi := &RemoteApi{} uih := &UiHandler{ nodeSessions: ns, uiSessions: uis, uiTemplate: uiTemplate, + remoteApi: remoteApi, } mux.Handle("/script/", http.FileServer(http.FS(assets.Scripts))) mux.Handle("/ui/", uih) diff --git a/cmd/sync_stages.go b/cmd/sync_stages.go new file mode 100644 index 0000000..c0f0c74 --- /dev/null +++ b/cmd/sync_stages.go @@ -0,0 +1,80 @@ +package cmd + +import ( + "context" + "encoding/binary" + "fmt" + "html/template" + "net/http" + "strconv" +) + +// Demonstration of the working with the Erigon database remotely on the example of getting information +// about the progress of sync stages +type SyncStages struct { + rc RemoteDbReader +} + +type SyncStageProgress = map[string]string + +const syncStageDb = "chaindata" +const syncStageTable = "SyncStage" +const syncProgressBase = 10 + +func (uih *UiHandler) findSyncStages(ctx context.Context, w http.ResponseWriter, templ *template.Template, requestChannel chan *NodeRequest) { + rc := NewRemoteCursor(uih.remoteApi, requestChannel) + syncStages := &SyncStages{rc: rc} + + syncStageProgress, err := syncStages.fetchSyncStageProgress(ctx) + if err != nil { + fmt.Fprintf(w, "Unable to fetch sync stage progress: %v\n", err) + return + } + + if templateErr := templ.ExecuteTemplate(w, "sync_stages.html", syncStageProgress); templateErr != nil { + fmt.Fprintf(w, "Executing Sync stages template: %v\n", templateErr) + return + } +} + +func (ss *SyncStages) fetchSyncStageProgress(ctx context.Context) (SyncStageProgress, error) { + if cursorError := ss.rc.Init(syncStageDb, syncStageTable, nil); cursorError != nil { + return nil, fmt.Errorf("could not initialize remote cursor: %v", cursorError) + } + + syncStageProgress := make(SyncStageProgress) + + var k, v []byte + var e error + for k, v, e = ss.rc.Next(); e == nil && k != nil; k, v, e = ss.rc.Next() { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context channel interrupted") + default: + } + + syncStage := string(k) + syncProgress, unmarshalError := ss.unmarshal(v) + + if unmarshalError != nil { + return nil, fmt.Errorf("could not unmarshal sync stage data: %v", unmarshalError) + } + + syncStageProgress[syncStage] = strconv.FormatUint(syncProgress, syncProgressBase) + } + if e != nil { + return nil, fmt.Errorf("could not process remote cursor line: %v", e) + } + + return syncStageProgress, nil +} + +func (ss *SyncStages) unmarshal(data []byte) (uint64, error) { + if len(data) == 0 { + return 0, nil + } + if len(data) < 8 { + return 0, fmt.Errorf("value must be at least 8 bytes, got %d", len(data)) + } + return binary.BigEndian.Uint64(data[:8]), nil +} diff --git a/cmd/sync_stages_test.go b/cmd/sync_stages_test.go new file mode 100644 index 0000000..ee91cc3 --- /dev/null +++ b/cmd/sync_stages_test.go @@ -0,0 +1,138 @@ +package cmd + +import ( + "context" + "encoding/binary" + "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "testing" +) + +type mockRemoteCursor struct { + mock.Mock +} + +type syncStagesDep struct { + rc *mockRemoteCursor +} + +func (mrc *mockRemoteCursor) Init(db string, table string, initialKey []byte) error { + args := mrc.Called(db, table, initialKey) + return args.Error(0) +} +func (mrc *mockRemoteCursor) Next() ([]byte, []byte, error) { + args := mrc.Called() + return args.Get(0).([]byte), args.Get(1).([]byte), args.Error(2) +} + +func encodeBigEndian(n uint64) []byte { + if n == 0 { + return []byte{} + } + + var v [8]byte + binary.BigEndian.PutUint64(v[:], n) + return v[:] +} + +func TestFetchSyncStageProgress(t *testing.T) { + var ( + firstStageName = []byte("first_stage") + firstStageProgress = uint64(3458658) + secondStageName = []byte("second_stage") + secondStageProgress = uint64(0) + db = "chaindata" + table = "SyncStage" + depError = fmt.Errorf("error") + ) + + tt := []struct { + name string + ctx context.Context + on func(*syncStagesDep) + assert func(ssp SyncStageProgress) + wantErrMsg string + }{ + { + name: "should successfully fetch and return sync stages with progress", + ctx: context.Background(), + on: func(df *syncStagesDep) { + df.rc.On("Init", db, table, []byte(nil)).Return(nil) + df.rc.On("Next").Return(firstStageName, encodeBigEndian(firstStageProgress), nil).Once() + df.rc.On("Next").Return(secondStageName, encodeBigEndian(secondStageProgress), nil).Once() + df.rc.On("Next").Return([]byte(nil), []byte(nil), nil).Once() + }, + assert: func(ssp SyncStageProgress) { + exp := SyncStageProgress{ + string(firstStageName): fmt.Sprintf("%d", firstStageProgress), + string(secondStageName): fmt.Sprintf("%d", secondStageProgress), + } + + assert.Equal(t, exp, ssp) + }, + }, + { + name: "should return could not initialize remote cursor error", + ctx: context.Background(), + on: func(df *syncStagesDep) { + df.rc.On("Init", db, table, []byte(nil)).Return(depError) + }, + wantErrMsg: fmt.Sprintf("could not initialize remote cursor: %v", depError), + }, + { + name: "should return ctx channel interrupted error", + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }(), + on: func(df *syncStagesDep) { + df.rc.On("Init", db, table, []byte(nil)).Return(nil) + df.rc.On("Next").Return(firstStageName, encodeBigEndian(firstStageProgress), nil).Once() + }, + wantErrMsg: "context channel interrupted", + }, + { + name: "should return could not unmarshal sync stage data error", + ctx: context.Background(), + on: func(df *syncStagesDep) { + df.rc.On("Init", db, table, []byte(nil)).Return(nil) + df.rc.On("Next").Return(firstStageName, []byte{1}, nil).Once() + }, + wantErrMsg: "could not unmarshal sync stage data: value must be at least 8 bytes, got 1", + }, + { + name: "should return could not process remote cursor line error", + ctx: context.Background(), + on: func(df *syncStagesDep) { + df.rc.On("Init", db, table, []byte(nil)).Return(nil) + df.rc.On("Next").Return([]byte{}, []byte{}, depError).Once() + }, + wantErrMsg: fmt.Sprintf("could not process remote cursor line: %v", depError), + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + rc := &mockRemoteCursor{} + syncStages := SyncStages{rc: rc} + if tc.on != nil { + df := &syncStagesDep{ + rc: rc, + } + + tc.on(df) + } + + syncStageProgress, err := syncStages.fetchSyncStageProgress(tc.ctx) + + if tc.wantErrMsg != "" { + assert.EqualErrorf(t, err, tc.wantErrMsg, "expected error %q, got %s", tc.wantErrMsg, err) + return + } + + tc.assert(syncStageProgress) + }) + } +} diff --git a/cmd/ui_handler.go b/cmd/ui_handler.go index 92ee27a..96cde4f 100644 --- a/cmd/ui_handler.go +++ b/cmd/ui_handler.go @@ -4,6 +4,8 @@ import ( "crypto/rand" "encoding/base64" "fmt" + "github.com/google/btree" + lru "github.com/hashicorp/golang-lru/v2" "html/template" "io" "math/big" @@ -13,16 +15,12 @@ import ( "regexp" "strconv" "strings" - "time" - - "github.com/google/btree" - lru "github.com/hashicorp/golang-lru/v2" ) const sessionIdCookieName = "sessionId" const sessionIdCookieDuration = 30 * 24 * 3600 // 30 days -var uiRegex = regexp.MustCompile("^/ui/(cmd_line|flags|log_list|log_head|log_tail|log_download|versions|reorgs|bodies_download|)$") +var uiRegex = regexp.MustCompile("^/ui/(cmd_line|flags|log_list|log_head|log_tail|log_download|versions|reorgs|bodies_download|sync_stages|)$") func (uih *UiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { m := uiRegex.FindStringSubmatch(r.URL.Path) @@ -64,25 +62,25 @@ func (uih *UiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { sessionName := r.Form.Get("current_sessionname") switch m[1] { case "versions": - success, result := uih.fetch("/version\n", requestChannel) + success, result := uih.remoteApi.fetch("/version\n", requestChannel) processVersions(w, uih.uiTemplate, success, result) return case "cmd_line": - success, result := uih.fetch("/cmdline\n", requestChannel) + success, result := uih.remoteApi.fetch("/cmdline\n", requestChannel) processCmdLineArgs(w, uih.uiTemplate, success, result) return case "flags": - versionCallSuccess, versionCallResult := uih.fetch("/version\n", requestChannel) + versionCallSuccess, versionCallResult := uih.remoteApi.fetch("/version\n", requestChannel) versions := processVersions(w, uih.uiTemplate, versionCallSuccess, versionCallResult, true) - success, result := uih.fetch("/flags\n", requestChannel) + success, result := uih.remoteApi.fetch("/flags\n", requestChannel) processFlags(w, uih.uiTemplate, success, result, versions) return case "log_list": - success, result := uih.fetch("/logs/list\n", requestChannel) + success, result := uih.remoteApi.fetch("/logs/list\n", requestChannel) processLogList(w, uih.uiTemplate, success, uiSession.SessionName, result) return case "log_head": - success, result := uih.fetch(fmt.Sprintf("/logs/read?file=%s&offset=0\n", url.QueryEscape(filename)), requestChannel) + success, result := uih.remoteApi.fetch(fmt.Sprintf("/logs/read?file=%s&offset=0\n", url.QueryEscape(filename)), requestChannel) processLogPart(w, uih.uiTemplate, success, uiSession.SessionName, result) return case "log_tail": @@ -95,7 +93,7 @@ func (uih *UiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if size > 16*1024 { offset = size - 16*1024 } - success, result := uih.fetch(fmt.Sprintf("/logs/read?file=%s&offset=%d\n", url.QueryEscape(filename), offset), requestChannel) + success, result := uih.remoteApi.fetch(fmt.Sprintf("/logs/read?file=%s&offset=%d\n", url.QueryEscape(filename), offset), requestChannel) processLogPart(w, uih.uiTemplate, success, uiSession.SessionName, result) return case "log_download": @@ -112,6 +110,9 @@ func (uih *UiHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case "bodies_download": uih.bodiesDownload(r.Context(), w, uih.uiTemplate, requestChannel) return + case "sync_stages": + uih.findSyncStages(r.Context(), w, uih.uiTemplate, requestChannel) + return } uiSession.lock.Lock() defer func() { @@ -211,6 +212,7 @@ type UiHandler struct { nodeSessions *lru.ARCCache[uint64, *NodeSession] uiSessions *lru.ARCCache[string, *UiSession] uiTemplate *template.Template + remoteApi RemoteApiReader } func (uih *UiHandler) allocateNewNodeSession() (uint64, *NodeSession, error) { @@ -268,41 +270,6 @@ func (uih *UiHandler) validSessionName(sessionName string, uiSession *UiSession) return true } -func (uih *UiHandler) fetch(url string, requestChannel chan *NodeRequest) (bool, string) { - if requestChannel == nil { - return false, "ERROR: Node is not allocated\n" - } - // Request command line arguments - nodeRequest := &NodeRequest{url: url} - requestChannel <- nodeRequest - var sb strings.Builder - var success bool - for nodeRequest != nil { - nodeRequest.lock.Lock() - clear := nodeRequest.served - if nodeRequest.served { - if nodeRequest.err == "" { - sb.Reset() - sb.Write(nodeRequest.response) - success = true - } else { - success = false - fmt.Fprintf(&sb, "ERROR: %s\n", nodeRequest.err) - if nodeRequest.retries < 16 { - clear = false - } - } - } - nodeRequest.lock.Unlock() - if clear { - nodeRequest = nil - } else { - time.Sleep(100 * time.Millisecond) - } - } - return success, sb.String() -} - func generatePIN() (uint64, error) { if insecure { return uint64(weakrand.Int63n(100_000_000)), nil diff --git a/go.mod b/go.mod index 5715a75..90909c6 100644 --- a/go.mod +++ b/go.mod @@ -7,20 +7,24 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.2 github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.15.0 + github.com/stretchr/testify v1.8.1 golang.org/x/exp v0.0.0-20230420155640-133eef4313cb ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect diff --git a/go.sum b/go.sum index fe0931f..8a32f4b 100644 --- a/go.sum +++ b/go.sum @@ -167,6 +167,7 @@ github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= diff --git a/images/sync_stages.png b/images/sync_stages.png new file mode 100644 index 0000000..75232aa Binary files /dev/null and b/images/sync_stages.png differ