diff --git a/components/cluster/command/upgrade.go b/components/cluster/command/upgrade.go index bff84dc4b9..f8db275f64 100644 --- a/components/cluster/command/upgrade.go +++ b/components/cluster/command/upgrade.go @@ -24,7 +24,7 @@ import ( func newUpgradeCmd() *cobra.Command { offlineMode := false ignoreVersionCheck := false - var tidbVer, tikvVer, pdVer, tsoVer, schedulingVer, resourceManagerVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string + var tidbVer, tikvVer, pdVer, tsoVer, schedulingVer, resourceManagerVer, routerVer, tiflashVer, kvcdcVer, dashboardVer, cdcVer, alertmanagerVer, nodeExporterVer, blackboxExporterVer, tiproxyVer string var restartTimeout time.Duration cmd := &cobra.Command{ @@ -49,6 +49,7 @@ func newUpgradeCmd() *cobra.Command { spec.ComponentPD: pdVer, spec.ComponentTSO: tsoVer, spec.ComponentScheduling: schedulingVer, + spec.ComponentRouter: routerVer, spec.ComponentResourceManager: resourceManagerVer, spec.ComponentTiFlash: tiflashVer, spec.ComponentTiKVCDC: kvcdcVer, diff --git a/components/playground/playground.go b/components/playground/playground.go index 61fb7e6f8d..8dc2f1ba74 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -1594,12 +1594,12 @@ func (p *Playground) terminate(sig syscall.Signal) { } for _, inst := range p.routers { if inst.Process() != nil && inst.Process().Cmd() != nil && inst.Process().Cmd().Process != nil { - kill(inst.Component(), inst.Process().Pid(), inst.Wait) + kill(inst.Name(), inst.Process().Pid(), inst.Wait) } } for _, inst := range p.resourceManagers { if inst.Process() != nil && inst.Process().Cmd() != nil && inst.Process().Cmd().Process != nil { - kill(inst.Component(), inst.Process().Pid(), inst.Wait) + kill(inst.Name(), inst.Process().Pid(), inst.Wait) } } for _, inst := range p.tiproxys { diff --git a/embed/templates/config/prometheus.yml.tpl b/embed/templates/config/prometheus.yml.tpl index 123e9287c6..ef76764f49 100644 --- a/embed/templates/config/prometheus.yml.tpl +++ b/embed/templates/config/prometheus.yml.tpl @@ -207,6 +207,21 @@ scrape_configs: {{- range .ResourceManagerAddrs}} - '{{.}}' {{- end}} +{{- end}} + - job_name: "router" + honor_labels: true # don't overwrite job & instance labels +{{- if .TLSEnabled}} + scheme: https + tls_config: + insecure_skip_verify: false + ca_file: ../tls/ca.crt + cert_file: ../tls/prometheus.crt + key_file: ../tls/prometheus.pem +{{- end}} + static_configs: + - targets: +{{- range .RouterAddrs}} + - '{{.}}' {{- end}} {{- if .TiFlashStatusAddrs}} - job_name: "tiflash" diff --git a/embed/templates/scripts/run_router.sh.tpl b/embed/templates/scripts/run_router.sh.tpl new file mode 100644 index 0000000000..4ea0595fbb --- /dev/null +++ b/embed/templates/scripts/run_router.sh.tpl @@ -0,0 +1,22 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} + +cd "${DEPLOY_DIR}" || exit 1 + +{{- if .NumaNode}} +exec numactl --cpunodebind={{.NumaNode}} --membind={{.NumaNode}} env GODEBUG=madvdontneed=1 bin/pd-server services router\ +{{- else}} +exec env GODEBUG=madvdontneed=1 bin/pd-server services router \ +{{- end}} +{{- if .Name}} + --name="{{.Name}}" \ +{{- end}} + --backend-endpoints="{{.BackendEndpoints}}" \ + --listen-addr="{{.ListenURL}}" \ + --advertise-listen-addr="{{.AdvertiseListenURL}}" \ + --config=conf/router.toml \ + --log-file="{{.LogDir}}/router.log" 2>> "{{.LogDir}}/router_stderr.log" diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index c8254ab6bb..04ef60d634 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -1051,6 +1051,7 @@ const ( tsoStatusURI = "status" schedulingStatusURI = "status" resourceManagerStatusURI = "status" + routerStatusURI = "status" ) // TSOClient is an HTTP client of the TSO server @@ -1199,10 +1200,6 @@ func NewSchedulingClient( return cli } -// func (tc *SchedulingClient) l() *logprinter.Logger { -// return tc.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) -// } - func (tc *SchedulingClient) tryIdentifyVersion() { endpoints := tc.getEndpoints(schedulingStatusURI) response := map[string]string{} @@ -1338,3 +1335,94 @@ func (tc *ResourceManagerClient) CheckHealth() error { }) return err } + +// RouterClient is an HTTP client of the router server +type RouterClient struct { + version string + addrs []string + tlsEnabled bool + httpClient *utils.HTTPClient + ctx context.Context +} + +// NewRouterClient returns a new RouterClient, the context must have +// a *logprinter.Logger as value of "logger" +func NewRouterClient( + ctx context.Context, + addrs []string, + timeout time.Duration, + tlsConfig *tls.Config, +) *RouterClient { + enableTLS := false + if tlsConfig != nil { + enableTLS = true + } + + if _, ok := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger); !ok { + panic("the context must have logger inside") + } + + cli := &RouterClient{ + addrs: addrs, + tlsEnabled: enableTLS, + httpClient: utils.NewHTTPClient(timeout, tlsConfig), + ctx: ctx, + } + + cli.tryIdentifyVersion() + return cli +} + +func (tc *RouterClient) tryIdentifyVersion() { + endpoints := tc.getEndpoints(routerStatusURI) + response := map[string]string{} + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, json.Unmarshal(body, &response) + }) + if err == nil { + tc.version = response["version"] + } +} + +// GetURL builds the client URL of PDClient +func (tc *RouterClient) GetURL(addr string) string { + httpPrefix := "http" + if tc.tlsEnabled { + httpPrefix = "https" + } + return fmt.Sprintf("%s://%s", httpPrefix, addr) +} + +func (tc *RouterClient) getEndpoints(uri string) (endpoints []string) { + for _, addr := range tc.addrs { + endpoint := fmt.Sprintf("%s/%s", tc.GetURL(addr), uri) + endpoints = append(endpoints, endpoint) + } + + return +} + +// CheckHealth checks the health of router node. +func (tc *RouterClient) CheckHealth() error { + endpoints := tc.getEndpoints(routerStatusURI) + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := tc.httpClient.Get(tc.ctx, endpoint) + if err != nil { + return body, err + } + + return body, nil + }) + + if err != nil { + return err + } + + return nil +} diff --git a/pkg/cluster/manager/manager_test.go b/pkg/cluster/manager/manager_test.go index 91130cbe71..60d4e63a9e 100644 --- a/pkg/cluster/manager/manager_test.go +++ b/pkg/cluster/manager/manager_test.go @@ -90,6 +90,8 @@ tso_servers: - host: 172.16.5.53 scheduling_servers: - host: 172.16.5.54 +router_servers: + - host: 172.16.5.55 `), &topo) assert.Nil(err) } diff --git a/pkg/cluster/manager/transfer_test.go b/pkg/cluster/manager/transfer_test.go index e252ca294c..e097a66a2d 100644 --- a/pkg/cluster/manager/transfer_test.go +++ b/pkg/cluster/manager/transfer_test.go @@ -79,4 +79,18 @@ func TestRenderSpec(t *testing.T) { dir, err = renderSpec("{{.DataDir}}", s, "test-scheduling") assert.Nil(t, err) assert.NotEmpty(t, dir) + + s = &spec.RouterInstance{BaseInstance: spec.BaseInstance{ + InstanceSpec: &spec.RouterSpec{ + Host: "172.16.5.140", + SSHPort: 22, + Name: "router-1", + DeployDir: "/home/test/deploy/router-3379", + DataDir: "/home/test/deploy/router-3379/data", + }, + }} + // s.BaseInstance.InstanceSpec + dir, err = renderSpec("{{.DataDir}}", s, "test-scheduling") + assert.Nil(t, err) + assert.NotEmpty(t, dir) } diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 74f6528478..9b612edcf4 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -143,7 +143,7 @@ func Upgrade( // Usage within the switch statement switch component.Name() { - case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling, spec.ComponentResourceManager: + case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling, spec.ComponentResourceManager, spec.ComponentRouter: // defer PD related leader/primary to be upgraded after others isLeader, err := checkAndDeferPDLeader(ctx, topo, int(options.APITimeout), tlsCfg, instance) if err != nil { @@ -242,6 +242,8 @@ func checkAndDeferPDLeader(ctx context.Context, topo spec.Topology, apiTimeout i isLeader, err = instance.(*spec.SchedulingInstance).IsPrimary(ctx, topo, tlsCfg) case spec.ComponentTSO: isLeader, err = instance.(*spec.TSOInstance).IsPrimary(ctx, topo, tlsCfg) + case spec.ComponentRouter: + isLeader, err = instance.(*spec.RouterInstance).IsPrimary(ctx, topo, tlsCfg) case spec.ComponentResourceManager: isLeader, err = instance.(*spec.ResourceManagerInstance).IsPrimary(ctx, topo, tlsCfg) } diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index c3722d7eb2..dbc0a720b3 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -339,6 +339,13 @@ func (i *MonitorInstance) InitConfig( cfig.AddScheduling(scheduling.Host, uint64(scheduling.Port)) } } + if servers, found := topoHasField("RouterServers"); found { + for i := 0; i < servers.Len(); i++ { + router := servers.Index(i).Interface().(*RouterSpec) + uniqueHosts.Insert(router.Host) + cfig.AddRouter(router.Host, uint64(router.Port)) + } + } if servers, found := topoHasField("ResourceManagerServers"); found { for i := 0; i < servers.Len(); i++ { rm := servers.Index(i).Interface().(*ResourceManagerSpec) diff --git a/pkg/cluster/spec/router.go b/pkg/cluster/spec/router.go new file mode 100644 index 0000000000..f5b680e45b --- /dev/null +++ b/pkg/cluster/spec/router.go @@ -0,0 +1,331 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/pingcap/tiup/pkg/cluster/api" + "github.com/pingcap/tiup/pkg/cluster/ctxt" + "github.com/pingcap/tiup/pkg/cluster/template/scripts" + "github.com/pingcap/tiup/pkg/meta" + "github.com/pingcap/tiup/pkg/tidbver" + "github.com/pingcap/tiup/pkg/utils" +) + +// RouterSpec represents the router router specification in topology.yaml +type RouterSpec struct { + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + ListenHost string `yaml:"listen_host,omitempty"` + AdvertiseListenAddr string `yaml:"advertise_listen_addr,omitempty"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` + // Use Name to get the name with a default value if it's empty. + Name string `yaml:"name,omitempty"` + Port int `yaml:"port" default:"3379"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + LogDir string `yaml:"log_dir,omitempty"` + Source string `yaml:"source,omitempty" validate:"source:editable"` + NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"` + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` +} + +// Status queries current status of the instance +func (s *RouterSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { + if timeout < time.Second { + timeout = statusQueryTimeout + } + + addr := utils.JoinHostPort(s.GetManageHost(), s.Port) + tc := api.NewRouterClient(ctx, []string{addr}, timeout, tlsCfg) + + // check health + err := tc.CheckHealth() + if err != nil { + return "Down" + } + res := "Up" + return res +} + +// Role returns the component role of the instance +func (s *RouterSpec) Role() string { + return ComponentRouter +} + +// SSH returns the host and SSH port of the instance +func (s *RouterSpec) SSH() (string, int) { + host := s.Host + if s.ManageHost != "" { + host = s.ManageHost + } + return host, s.SSHPort +} + +// GetMainPort returns the main port of the instance +func (s *RouterSpec) GetMainPort() int { + return s.Port +} + +// GetManageHost returns the manage host of the instance +func (s *RouterSpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// IsImported returns if the node is imported from TiDB-Ansible +func (s *RouterSpec) IsImported() bool { + return false +} + +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *RouterSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + +// GetAdvertiseListenURL returns AdvertiseListenURL +func (s *RouterSpec) GetAdvertiseListenURL(enableTLS bool) string { + if s.AdvertiseListenAddr != "" { + return s.AdvertiseListenAddr + } + scheme := utils.Ternary(enableTLS, "https", "http").(string) + return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.Port)) +} + +// RouterComponent represents router component. +type RouterComponent struct{ Topology *Specification } + +// Name implements Component interface. +func (c *RouterComponent) Name() string { + return ComponentRouter +} + +// Role implements Component interface. +func (c *RouterComponent) Role() string { + return ComponentRouter +} + +// Source implements Component interface. +func (c *RouterComponent) Source() string { + source := c.Topology.ComponentSources.PD + if source != "" { + return source + } + return ComponentPD +} + +// CalculateVersion implements the Component interface +func (c *RouterComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.Router + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *RouterComponent) SetVersion(version string) { + c.Topology.ComponentVersions.Router = version +} + +// Instances implements Component interface. +func (c *RouterComponent) Instances() []Instance { + ins := make([]Instance, 0, len(c.Topology.RouterServers)) + for _, s := range c.Topology.RouterServers { + ins = append(ins, &RouterInstance{ + BaseInstance: BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), + Port: s.Port, + SSHP: s.SSHPort, + Source: s.Source, + NumaNode: s.NumaNode, + NumaCores: "", + + Ports: []int{ + s.Port, + }, + Dirs: []string{ + s.DeployDir, + s.DataDir, + }, + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) + }, + Component: c, + }, + topo: c.Topology, + }) + } + return ins +} + +// RouterInstance represent the router instance +type RouterInstance struct { + BaseInstance + topo Topology +} + +// InitConfig implement Instance interface +func (i *RouterInstance) InitConfig( + ctx context.Context, + e ctxt.Executor, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + topo := i.topo.(*Specification) + if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil { + return err + } + + enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*RouterSpec) + scheme := utils.Ternary(enableTLS, "https", "http").(string) + version := i.CalculateVersion(clusterVersion) + + pds := []string{} + for _, pdspec := range topo.PDServers { + pds = append(pds, pdspec.GetAdvertiseClientURL(enableTLS)) + } + cfg := &scripts.RouterScript{ + Name: spec.Name, + ListenURL: fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(i.GetListenHost(), spec.Port)), + AdvertiseListenURL: spec.GetAdvertiseListenURL(enableTLS), + BackendEndpoints: strings.Join(pds, ","), + DeployDir: paths.Deploy, + DataDir: paths.Data[0], + LogDir: paths.Log, + NumaNode: spec.NumaNode, + } + if !tidbver.PDSupportMicroservicesWithName(version) { + cfg.Name = "" + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_router_%s_%d.sh", i.GetHost(), i.GetPort())) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(paths.Deploy, "scripts", "run_router.sh") + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + _, _, err := e.Execute(ctx, "chmod +x "+dst, false) + if err != nil { + return err + } + + globalConfig := topo.ServerConfigs.Router + // set TLS configs + spec.Config, err = i.setTLSConfig(ctx, enableTLS, spec.Config, paths) + if err != nil { + return err + } + + if err := i.MergeServerConfig(ctx, e, globalConfig, spec.Config, paths); err != nil { + return err + } + + return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), version, i.OS(), i.Arch(), i.ComponentName()+".toml", paths) +} + +// setTLSConfig set TLS Config to support enable/disable TLS +func (i *RouterInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { + // set TLS configs + if enableTLS { + if configs == nil { + configs = make(map[string]any) + } + configs["security.cacert-path"] = fmt.Sprintf( + "%s/tls/%s", + paths.Deploy, + TLSCACert, + ) + configs["security.cert-path"] = fmt.Sprintf( + "%s/tls/%s.crt", + paths.Deploy, + i.Role()) + configs["security.key-path"] = fmt.Sprintf( + "%s/tls/%s.pem", + paths.Deploy, + i.Role()) + } else { + // drainer tls config list + tlsConfigs := []string{ + "security.cacert-path", + "security.cert-path", + "security.key-path", + } + // delete TLS configs + if configs != nil { + for _, config := range tlsConfigs { + delete(configs, config) + } + } + } + + return configs, nil +} + +// IsPrimary checks if the instance is primary +// for router, all instances are equal for currently. +func (i *RouterInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) { + return false, nil +} + +// ScaleConfig deploy temporary config on scaling +func (i *RouterInstance) ScaleConfig( + ctx context.Context, + e ctxt.Executor, + topo Topology, + clusterName, + clusterVersion, + deployUser string, + paths meta.DirPaths, +) error { + s := i.topo + defer func() { + i.topo = s + }() + i.topo = mustBeClusterTopo(topo) + return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths) +} + +var _ RollingUpdateInstance = &RouterInstance{} + +// PreRestart implements RollingUpdateInstance interface. +func (i *RouterInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config, updcfg *UpdateConfig) error { + return nil +} + +// PostRestart implements RollingUpdateInstance interface. +func (i *RouterInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg *tls.Config, updcfg *UpdateConfig) error { + return nil +} diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index f2ef0b1025..f20052afe3 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -124,6 +124,7 @@ type ( TSO map[string]any `yaml:"tso"` Scheduling map[string]any `yaml:"scheduling"` ResourceManager map[string]any `yaml:"resource_manager"` + Router map[string]any `yaml:"router"` Dashboard map[string]any `yaml:"tidb_dashboard"` TiFlash map[string]any `yaml:"tiflash"` TiProxy map[string]any `yaml:"tiproxy"` @@ -144,6 +145,7 @@ type ( TSO string `yaml:"tso,omitempty"` Scheduling string `yaml:"scheduling,omitempty"` ResourceManager string `yaml:"resource_manager,omitempty"` + Router string `yaml:"router,omitempty"` Dashboard string `yaml:"tidb_dashboard,omitempty"` Pump string `yaml:"pump,omitempty"` Drainer string `yaml:"drainer,omitempty"` @@ -186,6 +188,7 @@ type ( TSOServers []*TSOSpec `yaml:"tso_servers,omitempty"` SchedulingServers []*SchedulingSpec `yaml:"scheduling_servers,omitempty"` ResourceManagerServers []*ResourceManagerSpec `yaml:"resource_manager_servers,omitempty"` + RouterServers []*RouterSpec `yaml:"router_servers,omitempty"` DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` Drainers []*DrainerSpec `yaml:"drainer_servers,omitempty"` @@ -574,6 +577,7 @@ func (s *Specification) Merge(that Topology) Topology { TSOServers: append(s.TSOServers, spec.TSOServers...), SchedulingServers: append(s.SchedulingServers, spec.SchedulingServers...), ResourceManagerServers: append(s.ResourceManagerServers, spec.ResourceManagerServers...), + RouterServers: append(s.RouterServers, spec.RouterServers...), PumpServers: append(s.PumpServers, spec.PumpServers...), Drainers: append(s.Drainers, spec.Drainers...), CDCServers: append(s.CDCServers, spec.CDCServers...), @@ -595,6 +599,7 @@ func (v *ComponentVersions) Merge(that ComponentVersions) ComponentVersions { TSO: utils.Ternary(that.TSO != "", that.TSO, v.TSO).(string), Scheduling: utils.Ternary(that.Scheduling != "", that.Scheduling, v.Scheduling).(string), ResourceManager: utils.Ternary(that.ResourceManager != "", that.ResourceManager, v.ResourceManager).(string), + Router: utils.Ternary(that.Router != "", that.Router, v.Router).(string), Dashboard: utils.Ternary(that.Dashboard != "", that.Dashboard, v.Dashboard).(string), TiFlash: utils.Ternary(that.TiFlash != "", that.TiFlash, v.TiFlash).(string), TiProxy: utils.Ternary(that.TiProxy != "", that.TiProxy, v.TiProxy).(string), @@ -689,7 +694,7 @@ func setCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) error continue } host := reflect.Indirect(field).FieldByName("Host").String() - // `TSO` and `Scheduling` components use `Port` filed + // `TSO`, `Scheduling`, "Router" components use `Port` filed if reflect.Indirect(field).FieldByName("Port").IsValid() { port := reflect.Indirect(field).FieldByName("Port").Int() // field.String() is @@ -811,6 +816,7 @@ func (s *Specification) ComponentsByStartOrder() (comps []Component) { comps = append(comps, &PDComponent{s}) comps = append(comps, &TSOComponent{s}) comps = append(comps, &SchedulingComponent{s}) + comps = append(comps, &RouterComponent{s}) comps = append(comps, &ResourceManagerComponent{s}) comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) @@ -834,7 +840,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen // Ref: https://github.com/pingcap/tiup/issues/2166 cdcUpgradeBeforePDTiKVTiDB := tidbver.TiCDCUpgradeBeforePDTiKVTiDB(curVer) - // "tiflash", <"cdc">, "pd", "tso", "scheduling", "resource-manager", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" + // "tiflash", <"cdc">, "pd", "tso", "scheduling", "resource-manager","router", "dashboard", "tiproxy", "tikv", "pump", "tidb", "drainer", <"cdc>", "prometheus", "grafana", "alertmanager" comps = append(comps, &TiFlashComponent{s}) if cdcUpgradeBeforePDTiKVTiDB { comps = append(comps, &CDCComponent{s}) @@ -842,6 +848,7 @@ func (s *Specification) ComponentsByUpdateOrder(curVer string) (comps []Componen comps = append(comps, &PDComponent{s}) comps = append(comps, &TSOComponent{s}) comps = append(comps, &SchedulingComponent{s}) + comps = append(comps, &RouterComponent{s}) comps = append(comps, &ResourceManagerComponent{s}) comps = append(comps, &DashboardComponent{s}) comps = append(comps, &TiProxyComponent{s}) diff --git a/pkg/cluster/spec/validate.go b/pkg/cluster/spec/validate.go index 64ed0f2552..cae8861e06 100644 --- a/pkg/cluster/spec/validate.go +++ b/pkg/cluster/spec/validate.go @@ -915,6 +915,7 @@ func (s *Specification) validateTLSEnabled() error { case ComponentPD, ComponentTSO, ComponentScheduling, + ComponentRouter, ComponentResourceManager, ComponentTiDB, ComponentTiKV, @@ -995,6 +996,21 @@ func (s *Specification) validateSchedulingNames() error { return nil } +func (s *Specification) validateRouterName() error { + routerNames := set.NewStringSet() + for _, router := range s.RouterServers { + if router.Name == "" { + continue + } + + if routerNames.Exist(router.Name) { + return errors.Errorf("component router_servers.name is not supported duplicated, the name %s is duplicated", router.Name) + } + routerNames.Insert(router.Name) + } + return nil +} + func (s *Specification) validateResourceManagerNames() error { resourceManagerNames := set.NewStringSet() for _, rm := range s.ResourceManagerServers { @@ -1087,6 +1103,7 @@ func (s *Specification) Validate() error { s.validatePDNames, s.validateTSONames, s.validateSchedulingNames, + s.validateRouterName, s.validateResourceManagerNames, s.validateTiSparkSpec, s.validateTiFlashConfigs, diff --git a/pkg/cluster/task/update_meta.go b/pkg/cluster/task/update_meta.go index 25e6175da1..e5b0fc105e 100644 --- a/pkg/cluster/task/update_meta.go +++ b/pkg/cluster/task/update_meta.go @@ -92,6 +92,15 @@ func (u *UpdateMeta) Execute(ctx context.Context) error { } newMeta.Topology.SchedulingServers = schedulingServers + routerServers := make([]*spec.RouterSpec, 0) + for i, instance := range (&spec.RouterComponent{Topology: topo}).Instances() { + if deleted.Exist(instance.ID()) { + continue + } + routerServers = append(routerServers, topo.RouterServers[i]) + } + newMeta.Topology.RouterServers = routerServers + resourceManagers := make([]*spec.ResourceManagerSpec, 0) for i, instance := range (&spec.ResourceManagerComponent{Topology: topo}).Instances() { if deleted.Exist(instance.ID()) { diff --git a/pkg/cluster/template/config/prometheus.go b/pkg/cluster/template/config/prometheus.go index f58a6e970f..f20b30acce 100644 --- a/pkg/cluster/template/config/prometheus.go +++ b/pkg/cluster/template/config/prometheus.go @@ -37,6 +37,7 @@ type PrometheusConfig struct { PDAddrs []string TSOAddrs []string SchedulingAddrs []string + RouterAddrs []string ResourceManagerAddrs []string TiFlashStatusAddrs []string TiFlashLearnerStatusAddrs []string @@ -114,6 +115,12 @@ func (c *PrometheusConfig) AddScheduling(ip string, port uint64) *PrometheusConf return c } +// AddRouter add a router address +func (c *PrometheusConfig) AddRouter(ip string, port uint64) *PrometheusConfig { + c.RouterAddrs = append(c.RouterAddrs, utils.JoinHostPort(ip, int(port))) + return c +} + // AddResourceManager add a resource manager address func (c *PrometheusConfig) AddResourceManager(ip string, port uint64) *PrometheusConfig { c.ResourceManagerAddrs = append(c.ResourceManagerAddrs, utils.JoinHostPort(ip, int(port))) diff --git a/pkg/cluster/template/scripts/pdms_test.go b/pkg/cluster/template/scripts/pdms_test.go index 7939774c87..49d8ec681c 100644 --- a/pkg/cluster/template/scripts/pdms_test.go +++ b/pkg/cluster/template/scripts/pdms_test.go @@ -107,3 +107,32 @@ func TestResourceManager(t *testing.T) { assert.Nil(err) assert.False(strings.Contains(string(content), "--name")) } + +func TestRouter(t *testing.T) { + assert := require.New(t) + conf, err := os.CreateTemp("", "router.conf") + assert.Nil(err) + defer os.Remove(conf.Name()) + + cfg := &RouterScript{ + Name: "router-0", + ListenURL: "127.0.0.1", + AdvertiseListenURL: "127.0.0.2", + BackendEndpoints: "127.0.0.3", + DeployDir: "/deploy", + DataDir: "/data", + LogDir: "/log", + } + err = cfg.ConfigToFile(conf.Name()) + assert.Nil(err) + content, err := os.ReadFile(conf.Name()) + assert.Nil(err) + assert.True(strings.Contains(string(content), "--name")) + + cfg.Name = "" + err = cfg.ConfigToFile(conf.Name()) + assert.Nil(err) + content, err = os.ReadFile(conf.Name()) + assert.Nil(err) + assert.False(strings.Contains(string(content), "--name")) +} diff --git a/pkg/cluster/template/scripts/router.go b/pkg/cluster/template/scripts/router.go new file mode 100644 index 0000000000..fd7d1321b2 --- /dev/null +++ b/pkg/cluster/template/scripts/router.go @@ -0,0 +1,58 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/pingcap/tiup/embed" + "github.com/pingcap/tiup/pkg/utils" +) + +// RouterScript represent the data to generate router config +type RouterScript struct { + Name string + ListenURL string + AdvertiseListenURL string + BackendEndpoints string + + DeployDir string + DataDir string + LogDir string + + NumaNode string +} + +// ConfigToFile write config content to specific path +func (c *RouterScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_router.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return err + } + + tmpl, err := template.New("router").Parse(string(tpl)) + if err != nil { + return err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return err + } + + return utils.WriteFile(file, content.Bytes(), 0755) +}