From 0e1c33b87b3c2322dc5a18fc99402af3fec97edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AB=A5=E5=89=91?= <1045931706@qq.com> Date: Tue, 4 Nov 2025 21:04:36 +0800 Subject: [PATCH 1/4] cluster support router MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 童剑 <1045931706@qq.com> --- components/cluster/command/upgrade.go | 4 +- components/playground/command.go | 5 + components/playground/instance/instance.go | 2 +- components/playground/instance/pd.go | 18 + components/playground/main.go | 7 + components/playground/playground.go | 37 +++ embed/templates/config/prometheus.yml.tpl | 15 + embed/templates/scripts/run_router.sh.tpl | 22 ++ pkg/cluster/api/pdapi.go | 96 +++++- pkg/cluster/manager/manager_test.go | 2 + pkg/cluster/manager/transfer_test.go | 14 + pkg/cluster/operation/upgrade.go | 4 +- pkg/cluster/spec/instance.go | 1 + pkg/cluster/spec/monitoring.go | 7 + pkg/cluster/spec/router.go | 363 +++++++++++++++++++++ pkg/cluster/spec/spec.go | 11 +- pkg/cluster/spec/validate.go | 17 + pkg/cluster/task/update_meta.go | 9 + pkg/cluster/template/config/prometheus.go | 7 + pkg/cluster/template/scripts/pdms_test.go | 29 ++ pkg/cluster/template/scripts/router.go | 58 ++++ 21 files changed, 719 insertions(+), 9 deletions(-) create mode 100644 embed/templates/scripts/run_router.sh.tpl create mode 100644 pkg/cluster/spec/router.go create mode 100644 pkg/cluster/template/scripts/router.go diff --git a/components/cluster/command/upgrade.go b/components/cluster/command/upgrade.go index 3c398b19e1..3033ead1d4 100644 --- a/components/cluster/command/upgrade.go +++ b/components/cluster/command/upgrade.go @@ -24,7 +24,8 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false ignoreVersionCheck := false - var tidbVer, tikvVer, pdVer, tsoVer, schedulingVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string + var tidbVer, tikvVer, pdVer, tsoVer, schedulingVer, routerVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, + alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string var restartTimeout time.Duration cmd := &cobra.Command{ @@ -49,6 +50,7 @@ func newUpgradeCmd() *cobra.Command { spec.ComponentPD: pdVer, spec.ComponentTSO: tsoVer, spec.ComponentScheduling: schedulingVer, + spec.ComponentRouter: routerVer, spec.ComponentTiFlash: tiflashVer, spec.ComponentTiKVCDC: kvcdcVer, spec.ComponentCDC: cdcVer, diff --git a/components/playground/command.go b/components/playground/command.go index 62a1b9232b..879e7ea254 100644 --- a/components/playground/command.go +++ b/components/playground/command.go @@ -53,6 +53,7 @@ func buildCommands(tp CommandType, opt *BootOptions) (cmds []Command) { {"pd", opt.PD}, {"tso", opt.TSO}, {"scheduling", opt.Scheduling}, + {"router", opt.Router}, {"tikv", opt.TiKV}, {"pump", opt.Pump}, {"tiflash", opt.TiFlash}, @@ -104,6 +105,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().IntVarP(&opt.PD.Num, "pd", "", opt.PD.Num, "PD instance number") cmd.Flags().IntVarP(&opt.TSO.Num, "tso", "", opt.TSO.Num, "TSO instance number") cmd.Flags().IntVarP(&opt.Scheduling.Num, "scheduling", "", opt.Scheduling.Num, "Scheduling instance number") + cmd.Flags().IntVarP(&opt.Router.Num, "router", "", opt.Router.Num, "Router instance number") cmd.Flags().IntVarP(&opt.TiFlash.Num, "tiflash", "", opt.TiFlash.Num, "TiFlash instance number") cmd.Flags().IntVarP(&opt.TiProxy.Num, "tiproxy", "", opt.TiProxy.Num, "TiProxy instance number") cmd.Flags().IntVarP(&opt.TiCDC.Num, "ticdc", "", opt.TiCDC.Num, "TiCDC instance number") @@ -114,6 +116,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.PD.Host, "pd.host", "", opt.PD.Host, "Playground PD host. If not provided, PD will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.TSO.Host, "tso.host", "", opt.TSO.Host, "Playground TSO host. If not provided, TSO will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.Scheduling.Host, "scheduling.host", "", opt.Scheduling.Host, "Playground Scheduling host. If not provided, Scheduling will still use `host` flag as its host") + cmd.Flags().StringVarP(&opt.Router.Host, "router.host", "", opt.Router.Host, "Playground router host. If not provided, Router will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.TiProxy.Host, "tiproxy.host", "", opt.PD.Host, "Playground TiProxy host. If not provided, TiProxy will still use `host` flag as its host") cmd.Flags().IntVarP(&opt.DMMaster.Num, "dm-master", "", opt.DMMaster.Num, "DM-master instance number") cmd.Flags().IntVarP(&opt.DMWorker.Num, "dm-worker", "", opt.DMWorker.Num, "DM-worker instance number") @@ -123,6 +126,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.PD.ConfigPath, "pd.config", "", opt.PD.ConfigPath, "PD instance configuration file") cmd.Flags().StringVarP(&opt.TSO.ConfigPath, "tso.config", "", opt.TSO.ConfigPath, "TSO instance configuration file") cmd.Flags().StringVarP(&opt.Scheduling.ConfigPath, "scheduling.config", "", opt.Scheduling.ConfigPath, "Scheduling instance configuration file") + cmd.Flags().StringVarP(&opt.Router.ConfigPath, "router.config", "", opt.Router.ConfigPath, "Router instance configuration file") cmd.Flags().StringVarP(&opt.TiFlash.ConfigPath, "tiflash.config", "", opt.TiFlash.ConfigPath, "TiFlash instance configuration file") cmd.Flags().StringVarP(&opt.TiProxy.ConfigPath, "tiproxy.config", "", opt.TiProxy.ConfigPath, "TiProxy instance configuration file") cmd.Flags().StringVarP(&opt.Pump.ConfigPath, "pump.config", "", opt.Pump.ConfigPath, "Pump instance configuration file") @@ -135,6 +139,7 @@ func newScaleOut() *cobra.Command { cmd.Flags().StringVarP(&opt.PD.BinPath, "pd.binpath", "", opt.PD.BinPath, "PD instance binary path") cmd.Flags().StringVarP(&opt.TSO.BinPath, "tso.binpath", "", opt.TSO.BinPath, "TSO instance binary path") cmd.Flags().StringVarP(&opt.Scheduling.BinPath, "scheduling.binpath", "", opt.Scheduling.BinPath, "Scheduling instance binary path") + cmd.Flags().StringVarP(&opt.Router.BinPath, "router.binpath", "", opt.Router.BinPath, "Router instance binary path") cmd.Flags().StringVarP(&opt.TiFlash.BinPath, "tiflash.binpath", "", opt.TiFlash.BinPath, "TiFlash instance binary path") cmd.Flags().StringVarP(&opt.TiProxy.BinPath, "tiproxy.binpath", "", opt.TiProxy.BinPath, "TiProxy instance binary path") cmd.Flags().StringVarP(&opt.TiCDC.BinPath, "ticdc.binpath", "", opt.TiCDC.BinPath, "TiCDC instance binary path") diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 8a3d88c57e..4bfb23f151 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -200,7 +200,7 @@ func AdvertiseHost(listen string) string { func pdEndpoints(pds []*PDInstance, isHTTP bool) []string { var endpoints []string for _, pd := range pds { - if pd.Role() == PDRoleTSO || pd.Role() == PDRoleScheduling { + if pd.Role() == PDRoleTSO || pd.Role() == PDRoleScheduling || pd.Role() == PDRoleRouter { continue } if isHTTP { diff --git a/components/playground/instance/pd.go b/components/playground/instance/pd.go index de5974d41b..374430f5d1 100644 --- a/components/playground/instance/pd.go +++ b/components/playground/instance/pd.go @@ -36,6 +36,8 @@ const ( PDRoleTSO PDRole = "tso" // PDRoleScheduling is the role of PD scheduling PDRoleScheduling PDRole = "scheduling" + // PDRoleRouter is the role of PD router + PDRoleRouter PDRole = "router" ) // PDInstance represent a running pd-server @@ -91,6 +93,8 @@ func (inst *PDInstance) Name() string { return fmt.Sprintf("tso-%d", inst.ID) case PDRoleScheduling: return fmt.Sprintf("scheduling-%d", inst.ID) + case PDRoleRouter: + return fmt.Sprintf("router-%d", inst.ID) default: return fmt.Sprintf("pd-%d", inst.ID) } @@ -169,6 +173,20 @@ func (inst *PDInstance) Start(ctx context.Context) error { if tidbver.PDSupportMicroservicesWithName(inst.Version.String()) { args = append(args, fmt.Sprintf("--name=%s", uid)) } + case PDRoleRouter: + endpoints := pdEndpoints(inst.pds, true) + args = []string{ + "services", + "router", + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), + fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), + fmt.Sprintf("--log-file=%s", inst.LogFile()), + fmt.Sprintf("--config=%s", configPath), + } + if tidbver.PDSupportMicroservicesWithName(inst.Version.String()) { + args = append(args, fmt.Sprintf("--name=%s", uid)) + } } return inst.PrepareProcess(ctx, inst.BinPath, args, nil, inst.Dir) diff --git a/components/playground/main.go b/components/playground/main.go index fb325a11df..f993ca06c4 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -55,6 +55,7 @@ type BootOptions struct { PD instance.Config `yaml:"pd"` // will change to api when pd_mode == ms TSO instance.Config `yaml:"tso"` // Only available when pd_mode == ms Scheduling instance.Config `yaml:"scheduling"` // Only available when pd_mode == ms + Router instance.Config `yaml:"router"` TiProxy instance.Config `yaml:"tiproxy"` TiDB instance.Config `yaml:"tidb"` TiDBSystem instance.Config `yaml:"tidb.system"` @@ -287,6 +288,7 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().IntVar(&options.PD.Num, "pd", 0, "PD instance number") rootCmd.Flags().IntVar(&options.TSO.Num, "tso", 0, "TSO instance number") rootCmd.Flags().IntVar(&options.Scheduling.Num, "scheduling", 0, "Scheduling instance number") + rootCmd.Flags().IntVar(&options.Router.Num, "router", 0, "Router instance number") rootCmd.Flags().IntVar(&options.TiProxy.Num, "tiproxy", 0, "TiProxy instance number") rootCmd.Flags().IntVar(&options.TiFlash.Num, "tiflash", 0, fmt.Sprintf("TiFlash instance number, when --mode=%s or --mode=%s this will set instance number for both Write Node and Compute Node", instance.ModeCSE, instance.ModeDisAgg)) @@ -333,6 +335,7 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.PD.ConfigPath, "pd.config", "", "PD instance configuration file") rootCmd.Flags().StringVar(&options.TSO.ConfigPath, "tso.config", "", "TSO instance configuration file") rootCmd.Flags().StringVar(&options.Scheduling.ConfigPath, "scheduling.config", "", "Scheduling instance configuration file") + rootCmd.Flags().StringVar(&options.Router.ConfigPath, "router.config", "", "Router instance configuration file") rootCmd.Flags().StringVar(&options.TiProxy.ConfigPath, "tiproxy.config", "", "TiProxy instance configuration file") rootCmd.Flags().StringVar(&options.TiFlash.ConfigPath, "tiflash.config", "", fmt.Sprintf("TiFlash instance configuration file, when --mode=%s or --mode=%s, this will set config file for both Write Node and Compute Node", instance.ModeCSE, instance.ModeDisAgg)) @@ -354,6 +357,7 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset]. rootCmd.Flags().StringVar(&options.PD.BinPath, "pd.binpath", "", "PD instance binary path") rootCmd.Flags().StringVar(&options.TSO.BinPath, "tso.binpath", "", "TSO instance binary path") rootCmd.Flags().StringVar(&options.Scheduling.BinPath, "scheduling.binpath", "", "Scheduling instance binary path") + rootCmd.Flags().StringVar(&options.Router.BinPath, "router.binpath", "", "Router instance binary path") rootCmd.Flags().StringVar(&options.TiProxy.BinPath, "tiproxy.binpath", "", "TiProxy instance binary path") rootCmd.Flags().StringVar(&options.TiProxy.Version, "tiproxy.version", "", "TiProxy instance version") rootCmd.Flags().StringVar(&options.TiFlash.BinPath, "tiflash.binpath", "", @@ -445,6 +449,9 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error { defaultInt(&options.Scheduling.Num, "scheduling", 1) defaultStr(&options.Scheduling.BinPath, "scheduling.binpath", options.PD.BinPath) defaultStr(&options.Scheduling.ConfigPath, "scheduling.config", options.PD.ConfigPath) + defaultInt(&options.Router.Num, "router", 1) + defaultStr(&options.Router.BinPath, "router.binpath", options.PD.BinPath) + defaultStr(&options.Router.ConfigPath, "router.config", options.PD.ConfigPath) default: return errors.Errorf("Unknown --pd.mode %s", options.ShOpt.PDMode) } diff --git a/components/playground/playground.go b/components/playground/playground.go index 7c837f0aa2..6b16e5fdf4 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -65,6 +65,7 @@ type Playground struct { pds []*instance.PDInstance tsos []*instance.PDInstance schedulings []*instance.PDInstance + routers []*instance.PDInstance tikvs []*instance.TiKVInstance tikvWorkers []*instance.TiKVWorkerInstance tidbs []*instance.TiDBInstance @@ -306,6 +307,13 @@ func (p *Playground) handleScaleIn(w io.Writer, pid int) error { p.schedulings = slices.Delete(p.schedulings, i, i+1) } } + case spec.ComponentRouter: + for i := 0; i < len(p.routers); i++ { + if p.routers[i].Process().Pid() == pid { + p.routers = slices.Delete(p.routers, i, i+1) + } + } + case spec.ComponentTiKV: for i := 0; i < len(p.tikvs); i++ { if p.tikvs[i].Process().Pid() == pid { @@ -480,6 +488,8 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e return p.sanitizeConfig(p.bootOptions.TSO, cfg) case spec.ComponentScheduling: return p.sanitizeConfig(p.bootOptions.Scheduling, cfg) + case spec.ComponentRouter: + return p.sanitizeConfig(p.bootOptions.Router, cfg) case spec.ComponentTiKV: return p.sanitizeConfig(p.bootOptions.TiKV, cfg) case spec.ComponentTiKVWorker: @@ -698,6 +708,13 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst } } + for _, ins := range p.routers { + err := fn(spec.ComponentRouter, ins) + if err != nil { + return err + } + } + for _, ins := range p.tikvs { err := fn(spec.ComponentTiKV, ins) if err != nil { @@ -831,6 +848,8 @@ func (p *Playground) addInstance(componentID string, role string, cfg instance.C p.tsos = append(p.tsos, inst) } else if role == instance.PDRoleScheduling { p.schedulings = append(p.schedulings, inst) + } else if role == instance.PDRoleRouter { + p.routers = append(p.routers, inst) } case spec.ComponentTSO: inst := instance.NewPDInstance(instance.PDRoleTSO, p.bootOptions.ShOpt, cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds, cfg.Port, p.bootOptions.TiKV.Num == 1) @@ -840,6 +859,10 @@ func (p *Playground) addInstance(componentID string, role string, cfg instance.C inst := instance.NewPDInstance(instance.PDRoleScheduling, p.bootOptions.ShOpt, cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds, cfg.Port, p.bootOptions.TiKV.Num == 1) ins = inst p.schedulings = append(p.schedulings, inst) + case spec.ComponentRouter: + inst := instance.NewPDInstance(instance.PDRoleRouter, p.bootOptions.ShOpt, cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds, cfg.Port, p.bootOptions.TiKV.Num == 1) + ins = inst + p.routers = append(p.routers, inst) case spec.ComponentTiDB: inst := instance.NewTiDBInstance(p.bootOptions.ShOpt, cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, dataDir, p.enableBinlog(), role) ins = inst @@ -1071,6 +1094,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme &options.PD, &options.TSO, &options.Scheduling, + &options.Router, &options.TiProxy, &options.TiDB, &options.TiKV, @@ -1174,6 +1198,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme InstancePair{spec.ComponentPD, instance.PDRoleAPI, options.PD}, InstancePair{spec.ComponentPD, instance.PDRoleTSO, options.TSO}, InstancePair{spec.ComponentPD, instance.PDRoleScheduling, options.Scheduling}, + InstancePair{spec.ComponentPD, instance.PDRoleRouter, options.Router}, ) } @@ -1363,6 +1388,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme tsoAddr []string apiAddr []string schedulingAddr []string + routerAddr []string ) for _, api := range p.pds { apiAddr = append(apiAddr, api.Addr()) @@ -1374,12 +1400,18 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme schedulingAddr = append(schedulingAddr, scheduling.Addr()) } + for _, router := range p.routers { + routerAddr = append(schedulingAddr, router.Addr()) + } + fmt.Printf("PD API Endpoints: ") colorCmd.Printf("%s\n", strings.Join(apiAddr, ",")) fmt.Printf("PD TSO Endpoints: ") colorCmd.Printf("%s\n", strings.Join(tsoAddr, ",")) fmt.Printf("PD Scheduling Endpoints: ") colorCmd.Printf("%s\n", strings.Join(schedulingAddr, ",")) + fmt.Printf("PD router Endpoints: ") + colorCmd.Printf("%s\n", strings.Join(routerAddr, ",")) } else { var pdAddrs []string for _, pd := range p.pds { @@ -1540,6 +1572,11 @@ func (p *Playground) terminate(sig syscall.Signal) { kill(inst.Name(), inst.Process().Pid(), inst.Wait) } } + for _, inst := range p.routers { + if inst.Process() != nil && inst.Process().Cmd() != nil && inst.Process().Cmd().Process != nil { + kill(inst.Name(), inst.Process().Pid(), inst.Wait) + } + } for _, inst := range p.tiproxys { if inst.Process() != nil && inst.Process().Cmd() != nil && inst.Process().Cmd().Process != nil { kill(inst.Name(), inst.Process().Pid(), inst.Wait) diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index b4d4329863..b711b5ca65 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -190,6 +190,21 @@ scrape_configs: - targets: {{- range .SchedulingAddrs}} - '{{.}}' +{{- end}} + - job_name: "router" + honor_labels: true # don't overwrite job & instance labels +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range RouterAddrs}} + - '{{.}}' {{- end}} {{- if .TiFlashStatusAddrs}} - job_name: "tiflash" diff --git a/embed/templates/scripts/run_router.sh.tpl b/embed/templates/scripts/run_router.sh.tpl new file mode 100644 index 0000000000..4ea0595fbb --- /dev/null +++ b/embed/templates/scripts/run_router.sh.tpl @@ -0,0 +1,22 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} + +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services router\ +{{- else}} +exec env GODEBUG=madvdontneed=1 bin/pd-server services router \ +{{- end}} +{{- if .Name}} + --name="{{.Name}}" \ +{{- end}} + --backend-endpoints="{{.BackendEndpoints}}" \ + --listen-addr="{{.ListenURL}}" \ + --advertise-listen-addr="{{.AdvertiseListenURL}}" \ + --config=conf/router.toml \ + --log-file="{{.LogDir}}/router.log" 2>> "{{.LogDir}}/router_stderr.log" diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index 8c71c148c9..33f4b1d3db 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -1050,6 +1050,7 @@ func (pc *PDClient) SetLeaderPriority(name string, value int32) error { const ( tsoStatusURI = "status" schedulingStatusURI = "status" + routerStatusURI = "status" ) // TSOClient is an HTTP client of the TSO server @@ -1198,10 +1199,6 @@ func NewSchedulingClient( return cli } -// func (tc *SchedulingClient) l() *logprinter.Logger { -// return tc.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) -// } - func (tc *SchedulingClient) tryIdentifyVersion() { endpoints := tc.getEndpoints(schedulingStatusURI) response := map[string]string{} @@ -1255,3 +1252,94 @@ func (tc *SchedulingClient) CheckHealth() error { return nil } + +// RouterClient is an HTTP client of the router server +type RouterClient struct { + version string + addrs []string + tlsEnabled bool + httpClient *utils.HTTPClient + ctx context.Context +} + +// RouterClient returns a new RouterClient, the context must have +// a *logprinter.Logger as value of "logger" +func NewRouterClient( + ctx context.Context, + addrs []string, + timeout time.Duration, + tlsConfig *tls.Config, +) *RouterClient { + enableTLS := false + if tlsConfig != nil { + enableTLS = true + } + + if _, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); !ok { + panic("the context must have logger inside") + } + + cli := &RouterClient{ + addrs: addrs, + tlsEnabled: enableTLS, + httpClient: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } + + cli.tryIdentifyVersion() + return cli +} + +func (tc *RouterClient) tryIdentifyVersion() { + endpoints := tc.getEndpoints(routerStatusURI) + response := map[string]string{} + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, json.Unmarshal(body, &response) + }) + if err == nil { + tc.version = response["version"] + } +} + +// GetURL builds the client URL of PDClient +func (tc *RouterClient) GetURL(addr string) string { + httpPrefix := "http" + if tc.tlsEnabled { + httpPrefix = "https" + } + return fmt.Sprintf("%s://%s", httpPrefix, addr) +} + +func (tc *RouterClient) getEndpoints(uri string) (endpoints []string) { + for _, addr := range tc.addrs { + endpoint := fmt.Sprintf("%s/%s", tc.GetURL(addr), uri) + endpoints = append(endpoints, endpoint) + } + + return +} + +// CheckHealth checks the health of router node. +func (tc *RouterClient) CheckHealth() error { + endpoints := tc.getEndpoints(routerStatusURI) + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, nil + }) + + if err != nil { + return err + } + + return nil +} diff --git a/pkg/cluster/manager/manager_test.go b/pkg/cluster/manager/manager_test.go index bef3756f09..4341210a3b 100644 --- a/pkg/cluster/manager/manager_test.go +++ b/pkg/cluster/manager/manager_test.go @@ -96,6 +96,8 @@ tso_servers: - host: 172.16.5.53 scheduling_servers: - host: 172.16.5.54 +router_servers: + - host: 172.16.5.55 `), &topo) assert.Nil(err) err = validateNewTopo(&topo) diff --git a/pkg/cluster/manager/transfer_test.go b/pkg/cluster/manager/transfer_test.go index d4b4581f6a..097d49bcc4 100644 --- a/pkg/cluster/manager/transfer_test.go +++ b/pkg/cluster/manager/transfer_test.go @@ -81,4 +81,18 @@ func TestRenderSpec(t *testing.T) { dir, err = renderSpec("{{.DataDir}}", s, "test-scheduling") assert.Nil(t, err) assert.NotEmpty(t, dir) + + s = &spec.RouterInstance{BaseInstance: spec.BaseInstance{ + InstanceSpec: &spec.RouterSpec{ + Host: "172.16.5.140", + SSHPort: 22, + Name: "router-1", + DeployDir: "/home/test/deploy/router-3379", + DataDir: "/home/test/deploy/router-3379/data", + }, + }} + // s.BaseInstance.InstanceSpec + dir, err = renderSpec("{{.DataDir}}", s, "test-scheduling") + assert.Nil(t, err) + assert.NotEmpty(t, dir) } diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index f645e1db7b..16c4461a0b 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -143,7 +143,7 @@ func Upgrade( // Usage within the switch statement switch component.Name() { - case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling: + case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling, spec.ComponentRouter: // defer PD related leader/primary to be upgraded after others isLeader, err := checkAndDeferPDLeader(ctx, topo, int(options.APITimeout), tlsCfg, instance) if err != nil { @@ -242,6 +242,8 @@ func checkAndDeferPDLeader(ctx context.Context, topo spec.Topology, apiTimeout i isLeader, err = instance.(*spec.SchedulingInstance).IsPrimary(ctx, topo, tlsCfg) case spec.ComponentTSO: isLeader, err = instance.(*spec.TSOInstance).IsPrimary(ctx, topo, tlsCfg) + case spec.ComponentRouter: + isLeader, err = instance.(*spec.RouterInstance).IsPrimary(ctx, topo, tlsCfg) } if err != nil { return false, err diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 4ed6542a17..3a3a81f5ef 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -43,6 +43,7 @@ const ( ComponentPD = "pd" ComponentTSO = "tso" ComponentScheduling = "scheduling" + ComponentRouter = "router" ComponentTiFlash = "tiflash" ComponentTiProxy = "tiproxy" ComponentGrafana = "grafana" diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 731eea011a..4df9e7e479 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -345,6 +345,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddScheduling(scheduling.Host, uint64(scheduling.Port)) } } + if servers, found := topoHasField("RouterServers"); found { + for i := 0; i < servers.Len(); i++ { + router := servers.Index(i).Interface().(*RouterSpec) + uniqueHosts.Insert(router.Host) + cfig.AddRouter(router.Host, uint64(router.Port)) + } + } if servers, found := topoHasField("TiKVServers"); found { for i := 0; i < servers.Len(); i++ { kv := servers.Index(i).Interface().(*TiKVSpec) diff --git a/pkg/cluster/spec/router.go b/pkg/cluster/spec/router.go new file mode 100644 index 0000000000..a2119172d0 --- /dev/null +++ b/pkg/cluster/spec/router.go @@ -0,0 +1,363 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tiup/pkg/cluster/api" + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/tidbver" + "github.com/pingcap/tiup/pkg/utils" +) + +var routerService = "router" + +// RouterSpec represents the router router specification in topology.yaml +type RouterSpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + ListenHost string `yaml:"listen_host,omitempty"` + AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` + // Use Name to get the name with a default value if it's empty. + Name string `yaml:"name,omitempty"` + Port int `yaml:"port" default:"3379"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Source string `yaml:"source,omitempty" validate:"source:editable"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Status queries current status of the instance +func (s *RouterSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + addr := utils.JoinHostPort(s.GetManageHost(), s.Port) + tc := api.NewRouterClient(ctx, []string{addr}, timeout, tlsCfg) + pc := api.NewPDClient(ctx, pdList, timeout, tlsCfg) + + // check health + err := tc.CheckHealth() + if err != nil { + return "Down" + } + + primary, err := pc.GetServicePrimary(routerService) + if err != nil { + return "ERR" + } + res := "Up" + enableTLS := false + if tlsCfg != nil { + enableTLS = true + } + if s.GetAdvertiseListenURL(enableTLS) == primary { + res += "|P" + } + + return res +} + +// Role returns the component role of the instance +func (s *RouterSpec) Role() string { + return ComponentRouter +} + +// SSH returns the host and SSH port of the instance +func (s *RouterSpec) SSH() (string, int) { + host := s.Host + if s.ManageHost != "" { + host = s.ManageHost + } + return host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *RouterSpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *RouterSpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *RouterSpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *RouterSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + +// GetAdvertiseListenURL returns AdvertiseListenURL +func (s *RouterSpec) GetAdvertiseListenURL(enableTLS bool) string { + if s.AdvertiseListenAddr != "" { + return s.AdvertiseListenAddr + } + scheme := utils.Ternary(enableTLS, "https", "http").(string) + return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.Port)) +} + +// RouterComponent represents router component. +type RouterComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *RouterComponent) Name() string { + return ComponentRouter +} + +// Role implements Component interface. +func (c *RouterComponent) Role() string { + return ComponentRouter +} + +// Source implements Component interface. +func (c *RouterComponent) Source() string { + source := c.Topology.ComponentSources.PD + if source != "" { + return source + } + return ComponentPD +} + +// CalculateVersion implements the Component interface +func (c *RouterComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.Router + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *RouterComponent) SetVersion(version string) { + c.Topology.ComponentVersions.Router = version +} + +// Instances implements Component interface. +func (c *RouterComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.RouterServers)) + for _, s := range c.Topology.RouterServers { + ins = append(ins, &RouterInstance{ + BaseInstance: BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), + Port: s.Port, + SSHP: s.SSHPort, + Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: "", + + Ports: []int{ + s.Port, + }, + Dirs: []string{ + s.DeployDir, + s.DataDir, + }, + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) + }, + Component: c, + }, + topo: c.Topology, + }) + } + return ins +} + +// RouterInstance represent the router instance +type RouterInstance struct { + BaseInstance + topo Topology +} + +// InitConfig implement Instance interface +func (i *RouterInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + + enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*RouterSpec) + scheme := utils.Ternary(enableTLS, "https", "http").(string) + version := i.CalculateVersion(clusterVersion) + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg := &scripts.RouterScript{ + Name: spec.Name, + ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)), + AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS), + BackendEndpoints: strings.Join(pds, ","), + DeployDir: paths.Deploy, + DataDir: paths.Data[0], + LogDir: paths.Log, + NumaNode: spec.NumaNode, + } + if !tidbver.PDSupportMicroservicesWithName(version) { + cfg.Name = "" + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_router_%s_%d.sh", i.GetHost(), i.GetPort())) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_router.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + _, _, err := e.Execute(ctx, "chmod +x "+dst, false) + if err != nil { + return err + } + + globalConfig := topo.ServerConfigs.Router + // set TLS configs + spec.Config, err = i.setTLSConfig(ctx, enableTLS, spec.Config, paths) + if err != nil { + return err + } + + if err := i.MergeServerConfig(ctx, e, globalConfig, spec.Config, paths); err != nil { + return err + } + + return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), version, i.OS(), i.Arch(), i.ComponentName()+".toml", paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *RouterInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + // set TLS configs + if enableTLS { + if configs == nil { + configs = make(map[string]any) + } + configs["security.cacert-path"] = fmt.Sprintf( + "%s/tls/%s", + paths.Deploy, + TLSCACert, + ) + configs["security.cert-path"] = fmt.Sprintf( + "%s/tls/%s.crt", + paths.Deploy, + i.Role()) + configs["security.key-path"] = fmt.Sprintf( + "%s/tls/%s.pem", + paths.Deploy, + i.Role()) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cacert-path", + "security.cert-path", + "security.key-path", + } + // delete TLS configs + if configs != nil { + for _, config := range tlsConfigs { + delete(configs, config) + } + } + } + + return configs, nil +} + +// IsPrimary checks if the instance is primary +func (i *RouterInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) { + tidbTopo, ok := topo.(*Specification) + if !ok { + panic("topo should be type of tidb topology") + } + pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg) + primary, err := pdClient.GetServicePrimary(routerService) + if err != nil { + return false, errors.Annotatef(err, "failed to get router primary %s", i.GetHost()) + } + + spec := i.InstanceSpec.(*RouterSpec) + enableTLS := false + if tlsCfg != nil { + enableTLS = true + } + + return primary == spec.GetAdvertiseListenURL(enableTLS), nil +} + +// ScaleConfig deploy temporary config on scaling +func (i *RouterInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) +} + +var _ RollingUpdateInstance = &RouterInstance{} + +// PreRestart implements RollingUpdateInstance interface. +func (i *RouterInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config, updcfg *UpdateConfig) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *RouterInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config, updcfg *UpdateConfig) error { + return nil +} diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 019dfc76eb..ce01a4581b 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -124,6 +124,7 @@ type ( PD map[string]any `yaml:"pd"` TSO map[string]any `yaml:"tso"` Scheduling map[string]any `yaml:"scheduling"` + Router map[string]any `yaml:"router"` Dashboard map[string]any `yaml:"tidb_dashboard"` TiFlash map[string]any `yaml:"tiflash"` TiProxy map[string]any `yaml:"tiproxy"` @@ -143,6 +144,7 @@ type ( PD string `yaml:"pd,omitempty"` TSO string `yaml:"tso,omitempty"` Scheduling string `yaml:"scheduling,omitempty"` + Router string `yaml:"router,omitempty"` Dashboard string `yaml:"tidb_dashboard,omitempty"` Pump string `yaml:"pump,omitempty"` Drainer string `yaml:"drainer,omitempty"` @@ -184,6 +186,7 @@ type ( PDServers []*PDSpec `yaml:"pd_servers"` TSOServers []*TSOSpec `yaml:"tso_servers,omitempty"` SchedulingServers []*SchedulingSpec `yaml:"scheduling_servers,omitempty"` + RouterServers []*RouterSpec `yaml:"router_servers,omitempty"` DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` Drainers []*DrainerSpec `yaml:"drainer_servers,omitempty"` @@ -571,6 +574,7 @@ func (s *Specification) Merge(that Topology) Topology { TiProxyServers: append(s.TiProxyServers, spec.TiProxyServers...), TSOServers: append(s.TSOServers, spec.TSOServers...), SchedulingServers: append(s.SchedulingServers, spec.SchedulingServers...), + RouterServers: append(s.RouterServers, spec.RouterServers...), PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), @@ -591,6 +595,7 @@ func (v *ComponentVersions) Merge(that ComponentVersions) ComponentVersions { PD: utils.Ternary(that.PD != "", that.PD, v.PD).(string), TSO: utils.Ternary(that.TSO != "", that.TSO, v.TSO).(string), Scheduling: utils.Ternary(that.Scheduling != "", that.Scheduling, v.Scheduling).(string), + Router: utils.Ternary(that.Router != "", that.Router, v.Router).(string), Dashboard: utils.Ternary(that.Dashboard != "", that.Dashboard, v.Dashboard).(string), TiFlash: utils.Ternary(that.TiFlash != "", that.TiFlash, v.TiFlash).(string), TiProxy: utils.Ternary(that.TiProxy != "", that.TiProxy, v.TiProxy).(string), @@ -685,7 +690,7 @@ func setCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) error continue } host := reflect.Indirect(field).FieldByName("Host").String() - // `TSO` and `Scheduling` components use `Port` filed + // `TSO`, `Scheduling`, "Router" components use `Port` filed if reflect.Indirect(field).FieldByName("Port").IsValid() { port := reflect.Indirect(field).FieldByName("Port").Int() // field.String() is @@ -815,6 +820,7 @@ func (s *Specification) ComponentsByStartOrder() (comps []Component) { comps = append(comps, &PDComponent{s}) comps = append(comps, &TSOComponent{s}) comps = append(comps, &SchedulingComponent{s}) + comps = append(comps, &RouterComponent{s}) comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) @@ -837,7 +843,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen // Ref: https://github.com/pingcap/tiup/issues/2166 cdcUpgradeBeforePDTiKVTiDB := tidbver.TiCDCUpgradeBeforePDTiKVTiDB(curVer) - // "tiflash", <"cdc">, "pd", "tso", "scheduling", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" + // "tiflash", <"cdc">, "pd", "tso", "scheduling", "router", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" comps = append(comps, &TiFlashComponent{s}) if cdcUpgradeBeforePDTiKVTiDB { comps = append(comps, &CDCComponent{s}) @@ -845,6 +851,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen comps = append(comps, &PDComponent{s}) comps = append(comps, &TSOComponent{s}) comps = append(comps, &SchedulingComponent{s}) + comps = append(comps, &RouterComponent{s}) comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) comps = append(comps, &TiKVComponent{s}) diff --git a/pkg/cluster/spec/validate.go b/pkg/cluster/spec/validate.go index 562a72ecd1..59f9143c5b 100644 --- a/pkg/cluster/spec/validate.go +++ b/pkg/cluster/spec/validate.go @@ -937,6 +937,7 @@ func (s *Specification) validateTLSEnabled() error { case ComponentPD, ComponentTSO, ComponentScheduling, + ComponentRouter, ComponentTiDB, ComponentTiKV, ComponentTiFlash, @@ -1016,6 +1017,21 @@ func (s *Specification) validateSchedulingNames() error { return nil } +func (s *Specification) validateRouterName() error { + routerNames := set.NewStringSet() + for _, router := range s.RouterServers { + if router.Name == "" { + continue + } + + if routerNames.Exist(router.Name) { + return errors.Errorf("component router_servers.name is not supported duplicated, the name %s is duplicated", router.Name) + } + routerNames.Insert(router.Name) + } + return nil +} + func (s *Specification) validateTiFlashConfigs() error { c := FindComponent(s, ComponentTiFlash) for _, ins := range c.Instances() { @@ -1097,6 +1113,7 @@ func (s *Specification) Validate() error { s.validatePDNames, s.validateTSONames, s.validateSchedulingNames, + s.validateRouterName, s.validateTiSparkSpec, s.validateTiFlashConfigs, s.validateMonitorAgent, diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index aaa6eeb4cf..0909b8e9a2 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -90,6 +90,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.SchedulingServers = schedulingServers + routerServers := make([]*spec.RouterSpec, 0) + for i, instance := range (&spec.RouterComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + routerServers = append(routerServers, topo.RouterServers[i]) + } + newMeta.Topology.RouterServers = routerServers + tiproxyServers := make([]*spec.TiProxySpec, 0) for i, instance := range (&spec.TiProxyComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index d2fd5ccf5a..5f58042227 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -37,6 +37,7 @@ type PrometheusConfig struct { PDAddrs []string TSOAddrs []string SchedulingAddrs []string + RouterAddrs []string TiFlashStatusAddrs []string TiFlashLearnerStatusAddrs []string PumpAddrs []string @@ -113,6 +114,12 @@ func (c *PrometheusConfig) AddScheduling(ip string, port uint64) *PrometheusConf return c } +// AddRouter add a router address +func (c *PrometheusConfig) AddRouter(ip string, port uint64) *PrometheusConfig { + c.RouterAddrs = append(c.RouterAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddTiFlashLearner add a TiFlash learner address func (c *PrometheusConfig) AddTiFlashLearner(ip string, port uint64) *PrometheusConfig { c.TiFlashLearnerStatusAddrs = append(c.TiFlashLearnerStatusAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/pdms_test.go b/pkg/cluster/template/scripts/pdms_test.go index 414f693f1f..b1e41b5a5a 100644 --- a/pkg/cluster/template/scripts/pdms_test.go +++ b/pkg/cluster/template/scripts/pdms_test.go @@ -78,3 +78,32 @@ func TestTSO(t *testing.T) { assert.Nil(err) assert.False(strings.Contains(string(content), "--name")) } + +func TestRouter(t *testing.T) { + assert := require.New(t) + conf, err := os.CreateTemp("", "router.conf") + assert.Nil(err) + defer os.Remove(conf.Name()) + + cfg := &RouterScript{ + Name: "router-0", + ListenURL: "127.0.0.1", + AdvertiseListenURL: "127.0.0.2", + BackendEndpoints: "127.0.0.3", + DeployDir: "/deploy", + DataDir: "/data", + LogDir: "/log", + } + err = cfg.ConfigToFile(conf.Name()) + assert.Nil(err) + content, err := os.ReadFile(conf.Name()) + assert.Nil(err) + assert.True(strings.Contains(string(content), "--name")) + + cfg.Name = "" + err = cfg.ConfigToFile(conf.Name()) + assert.Nil(err) + content, err = os.ReadFile(conf.Name()) + assert.Nil(err) + assert.False(strings.Contains(string(content), "--name")) +} diff --git a/pkg/cluster/template/scripts/router.go b/pkg/cluster/template/scripts/router.go new file mode 100644 index 0000000000..1cfb95e100 --- /dev/null +++ b/pkg/cluster/template/scripts/router.go @@ -0,0 +1,58 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// RouterScript represent the data to generate router config +type RouterScript struct { + Name string + ListenURL string + AdvertiseListenURL string + BackendEndpoints string + + DeployDir string + DataDir string + LogDir string + + NumaNode string +} + +// ConfigToFile write config content to specific path +func (c *RouterScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_router.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + + tmpl, err := template.New("router").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +} From 1ab235514b7f3d6d95367d6a14d8d93a8efc1bb9 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 22 Dec 2025 21:30:34 +0800 Subject: [PATCH 2/4] make Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/cluster/api/pdapi.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index 33f4b1d3db..eb613ad148 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -1262,7 +1262,7 @@ type RouterClient struct { ctx context.Context } -// RouterClient returns a new RouterClient, the context must have +// NewRouterClient returns a new RouterClient, the context must have // a *logprinter.Logger as value of "logger" func NewRouterClient( ctx context.Context, From d123c6d418c3a4e1d0d12fc7a3795d06c8cc39a6 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 23 Dec 2025 09:59:43 +0800 Subject: [PATCH 3/4] remove primary check in router service Signed-off-by: bufferflies <1045931706@qq.com> --- components/playground/instance/instance.go | 2 +- embed/templates/config/prometheus.yml.tpl | 2 +- pkg/cluster/spec/router.go | 34 ++-------------------- 3 files changed, 4 insertions(+), 34 deletions(-) diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 01ae1c3678..84601b662f 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -209,7 +209,7 @@ func AdvertiseHost(listen string) string { func pdEndpoints(pds []*PDInstance, isHTTP bool) []string { var endpoints []string for _, pd := range pds { - if pd.Role() == PDRoleTSO || pd.Role() == PDRoleScheduling || pd.Role() == PDRoleRouter { + if pd.Role() == PDRoleTSO || pd.Role() == PDRoleScheduling || pd.Role() == PDRoleRouter || pd.Role() == PDRoleRouter { continue } if isHTTP { diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index b711b5ca65..cde4e8d0d6 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -203,7 +203,7 @@ scrape_configs: {{- end}} static_configs: - targets: -{{- range RouterAddrs}} +{{- range .RouterAddrs}} - '{{.}}' {{- end}} {{- if .TiFlashStatusAddrs}} diff --git a/pkg/cluster/spec/router.go b/pkg/cluster/spec/router.go index a2119172d0..3f17d6b812 100644 --- a/pkg/cluster/spec/router.go +++ b/pkg/cluster/spec/router.go @@ -21,7 +21,6 @@ import ( "strings" "time" - "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/ctxt" "github.com/pingcap/tiup/pkg/cluster/template/scripts" @@ -61,27 +60,13 @@ func (s *RouterSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg * addr := utils.JoinHostPort(s.GetManageHost(), s.Port) tc := api.NewRouterClient(ctx, []string{addr}, timeout, tlsCfg) - pc := api.NewPDClient(ctx, pdList, timeout, tlsCfg) // check health err := tc.CheckHealth() if err != nil { return "Down" } - - primary, err := pc.GetServicePrimary(routerService) - if err != nil { - return "ERR" - } res := "Up" - enableTLS := false - if tlsCfg != nil { - enableTLS = true - } - if s.GetAdvertiseListenURL(enableTLS) == primary { - res += "|P" - } - return res } @@ -312,24 +297,9 @@ func (i *RouterInstance) setTLSConfig(ctx context.Context, enableTLS bool, confi } // IsPrimary checks if the instance is primary +// for router, all instances are equal for currently. func (i *RouterInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) { - tidbTopo, ok := topo.(*Specification) - if !ok { - panic("topo should be type of tidb topology") - } - pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg) - primary, err := pdClient.GetServicePrimary(routerService) - if err != nil { - return false, errors.Annotatef(err, "failed to get router primary %s", i.GetHost()) - } - - spec := i.InstanceSpec.(*RouterSpec) - enableTLS := false - if tlsCfg != nil { - enableTLS = true - } - - return primary == spec.GetAdvertiseListenURL(enableTLS), nil + return false, nil } // ScaleConfig deploy temporary config on scaling From 3bb2b9b2cddcd8daaa97c258127e02599e30376c Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 6 Jan 2026 16:26:16 +0800 Subject: [PATCH 4/4] run Signed-off-by: bufferflies <1045931706@qq.com> --- components/playground/playground.go | 4 ---- embed/templates/config/prometheus.yml.tpl | 1 + pkg/cluster/spec/router.go | 4 +--- pkg/cluster/task/update_meta.go | 1 + pkg/cluster/template/scripts/router.go | 2 +- 5 files changed, 4 insertions(+), 8 deletions(-) diff --git a/components/playground/playground.go b/components/playground/playground.go index 75b42d437a..8dc2f1ba74 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -1422,10 +1422,6 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme resourceManagerAddr = append(resourceManagerAddr, resourceManager.Addr()) } - for _, router := range p.routers { - routerAddr = append(routerAddr, router.Addr()) - } - fmt.Printf("PD API Endpoints: ") colorCmd.Printf("%s\n", strings.Join(apiAddr, ",")) fmt.Printf("PD TSO Endpoints: ") diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 1d33effbe2..ef76764f49 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -206,6 +206,7 @@ scrape_configs: - targets: {{- range .ResourceManagerAddrs}} - '{{.}}' +{{- end}} {{- end}} - job_name: "router" honor_labels: true # don't overwrite job & instance labels diff --git a/pkg/cluster/spec/router.go b/pkg/cluster/spec/router.go index 3f17d6b812..f5b680e45b 100644 --- a/pkg/cluster/spec/router.go +++ b/pkg/cluster/spec/router.go @@ -1,4 +1,4 @@ -// Copyright 2025 PingCAP, Inc. +// Copyright 2026 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,8 +29,6 @@ import ( "github.com/pingcap/tiup/pkg/utils" ) -var routerService = "router" - // RouterSpec represents the router router specification in topology.yaml type RouterSpec struct { Host string `yaml:"host"` diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index 1e613376f3..e5b0fc105e 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -100,6 +100,7 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { routerServers = append(routerServers, topo.RouterServers[i]) } newMeta.Topology.RouterServers = routerServers + resourceManagers := make([]*spec.ResourceManagerSpec, 0) for i, instance := range (&spec.ResourceManagerComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { diff --git a/pkg/cluster/template/scripts/router.go b/pkg/cluster/template/scripts/router.go index 1cfb95e100..fd7d1321b2 100644 --- a/pkg/cluster/template/scripts/router.go +++ b/pkg/cluster/template/scripts/router.go @@ -1,4 +1,4 @@ -// Copyright 2025 PingCAP, Inc. +// Copyright 2026 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.