From 35fd7df7939a6f52839d77b3d14e966a08e967d8 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Sat, 18 Apr 2020 23:54:33 +0800 Subject: [PATCH 1/8] add plugin --- dm/config/subtask.go | 2 ++ syncer/plugin.go | 36 ++++++++++++++++++++++++++ syncer/syncer.go | 60 +++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 syncer/plugin.go diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 12cde3a473..68250cf0e6 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -181,6 +181,8 @@ type SubTaskConfig struct { PprofAddr string `toml:"pprof-addr" json:"pprof-addr"` StatusAddr string `toml:"status-addr" json:"status-addr"` + PluginPath string `toml:"plugin-path" json:"plugin-path"` + ConfigFile string `toml:"-" json:"config-file"` // still needed by Syncer / Loader bin diff --git a/syncer/plugin.go b/syncer/plugin.go new file mode 100644 index 0000000000..ca89f1b16e --- /dev/null +++ b/syncer/plugin.go @@ -0,0 +1,36 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "github.com/siddontang/go-mysql/replication" +) + +// Plugin is a struct of plugin used in syncer unit +type Plugin struct { + // the plugin's .so file path + path string + + // conn used to creating connection for double write or other usage + conn interface{} + + // Init do some init job + Init func() error + + // HandleDDLJobResult handles the result of ddl job + HandleDDLJobResult func(ev *replication.QueryEvent, ec eventContext, err error) error + + // HandleDMLJobResult handles the result of dml job + HandleDMLJobResult func(ev *replication.RowsEvent, ec eventContext, err error) error +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 1f32c075ac..5e9d0669a5 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -16,6 +16,7 @@ package syncer import ( "context" "fmt" + "plugin" "reflect" "strconv" "strings" @@ -178,6 +179,8 @@ type Syncer struct { } addJobFunc func(*job) error + + plugin *Plugin } // NewSyncer creates a new Syncer. @@ -222,6 +225,10 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.tctx.L().DPanic("cannot create schema tracker", zap.Error(err)) } + syncer.plugin = &Plugin{ + path: cfg.PluginPath, + } + return syncer } @@ -374,10 +381,53 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog}) + // init plugin + err = s.initPlugin() + if err != nil { + return err + } + s.reset() return nil } +// initPlugin initializes the plugin +func (s *Syncer) initPlugin() error { + if len(s.plugin.path) == 0 { + return nil + } + + p, err := plugin.Open(s.plugin.path) + if err != nil { + // TODO: use terror + return err + } + + initFunc, err := p.Lookup("Init") + if err != nil { + // TODO: use terror + return err + } + + handleDDLJobResultFunc, err := p.Lookup("HandleDDLJobResult") + if err != nil { + // TODO: use terror + return err + } + + handleDMLJobResultFunc, err := p.Lookup("HandleDMLJobResult") + if err != nil { + // TODO: use terror + return err + } + + s.plugin.Init = initFunc.(func() error) + s.plugin.HandleDDLJobResult = handleDDLJobResultFunc.(func(*replication.QueryEvent, eventContext, error) error) + s.plugin.HandleDMLJobResult = handleDMLJobResultFunc.(func(*replication.RowsEvent, eventContext, error) error) + + return s.plugin.Init() +} + // initShardingGroups initializes sharding groups according to source MySQL, filter rules and router rules // NOTE: now we don't support modify router rules after task has started func (s *Syncer) initShardingGroups() error { @@ -1285,13 +1335,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case *replication.RowsEvent: err = s.handleRowsEvent(ev, ec) - if err != nil { - return terror.Annotatef(err, "current location %s", currentLocation) + err1 := s.plugin.HandleDMLJobResult(ev, ec, err) + if err1 != nil { + return terror.Annotatef(err1, "current location %s", currentLocation) } case *replication.QueryEvent: err = s.handleQueryEvent(ev, ec) - if err != nil { - return terror.Annotatef(err, "current location %s", currentLocation) + err1 := s.plugin.HandleDDLJobResult(ev, ec, err) + if err1 != nil { + return terror.Annotatef(err1, "current location %s", currentLocation) } case *replication.XIDEvent: if shardingReSync != nil { From 6295163f030b8ad904b05eff1797bfd29a4faa7f Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 20 Apr 2020 19:35:43 +0800 Subject: [PATCH 2/8] refine plugin --- syncer/{ => plugin}/plugin.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename syncer/{ => plugin}/plugin.go (100%) diff --git a/syncer/plugin.go b/syncer/plugin/plugin.go similarity index 100% rename from syncer/plugin.go rename to syncer/plugin/plugin.go From e8da36e0275e0274e12149f9e23505f95923f6af Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 20 Apr 2020 21:08:53 +0800 Subject: [PATCH 3/8] add demo --- syncer/plugin/demo/demo.go | 131 +++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 syncer/plugin/demo/demo.go diff --git a/syncer/plugin/demo/demo.go b/syncer/plugin/demo/demo.go new file mode 100644 index 0000000000..d628c3d297 --- /dev/null +++ b/syncer/plugin/demo/demo.go @@ -0,0 +1,131 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/log" + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/format" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/siddontang/go-mysql/replication" + "go.uber.org/zap" +) + +// DemoPlugin is a demo to show how to use plugin +type DemoPlugin struct { + db *sql.DB +} + +// NewPlugin creates a new DemoPlugin +func NewPlugin() *DemoPlugin { + return &DemoPlugin{} +} + +// Init implements Plugin's Init +func (dp *DemoPlugin) Init(cfg *config.SubTaskConfig) error { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true", + cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port) + db, err := sql.Open("mysql", dsn) + if err != nil { + return err + } + dp.db = db + + return nil +} + +// HandleDDLJobResult implements Plugin's HandleDDLJobResult +func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error { + if err == nil { + return nil + } + + if !strings.Contains(err.Error(), "unsupported modify column length") { + return nil + } + + stmt, err := parser.New().ParseOneStmt(string(ev.Query), "", "") + if err != nil { + return err + } + + switch st := stmt.(type) { + case *ast.AlterTableStmt: + + switch st.Specs[0].Tp { + case ast.AlterTableModifyColumn: + originColName := st.Specs[0].NewColumns[0].Name.Name.O + tmpColName := fmt.Sprintf("%s_tmp", st.Specs[0].NewColumns[0].Name) + + var sb strings.Builder + st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb)) + originCol := sb.String() + + st.Specs[0].NewColumns[0].Name.Name = model.NewCIStr(tmpColName) + var sb2 strings.Builder + st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb2)) + tmpCol := sb2.String() + log.Info("after translate", zap.String("new col", tmpCol), zap.String("origin col", originColName)) + + ctx := context.Background() + tableInfo, err := dbutil.GetTableInfo(ctx, dp.db, st.Table.Schema.O, st.Table.Name.O, "") + if err != nil { + return err + } + keys, _ := dbutil.SelectUniqueOrderKey(tableInfo) + keysList := strings.Join(keys, ", ") + + addColSQL := fmt.Sprintf("alter table `%s`.`%s` add column %s after %s", st.Table.Schema.O, st.Table.Name.O, tmpCol, originColName) + _, err = dp.db.ExecContext(ctx, addColSQL) + if err != nil { + return err + } + + insertSQL := fmt.Sprintf("replace into `%s`.`%s`(%s, %s) SELECT %s, %s AS %s FROM `%s`.`%s`;", st.Table.Schema.O, st.Table.Name.O, keysList, tmpColName, keysList, originColName, tmpColName, st.Table.Schema.O, st.Table.Name.O) + _, err = dp.db.ExecContext(ctx, insertSQL) + if err != nil { + return err + } + + dropColSQL := fmt.Sprintf("alter table `%s`.`%s` drop column %s", st.Table.Schema.O, st.Table.Name.O, originColName) + _, err = dp.db.ExecContext(ctx, dropColSQL) + if err != nil { + return err + } + + changeColSQL := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN %s %s;", st.Table.Schema.O, st.Table.Name.O, tmpColName, originCol) + _, err = dp.db.ExecContext(ctx, changeColSQL) + if err != nil { + return err + } + } + default: + return nil + } + + return nil +} + +// HandleDMLJobResult implements Plugin's HandleDMLJobResult +func (dp *DemoPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error { + return nil +} From f00e4b325728fadf6af17f1db185daad3def6ea7 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 21 Apr 2020 11:14:46 +0800 Subject: [PATCH 4/8] refine code --- syncer/plugin/plugin.go | 72 ++++++++++++++++++++++++++++++++++++---- syncer/syncer.go | 73 +++++++++++++++++++++++------------------ 2 files changed, 106 insertions(+), 39 deletions(-) diff --git a/syncer/plugin/plugin.go b/syncer/plugin/plugin.go index ca89f1b16e..58dace1390 100644 --- a/syncer/plugin/plugin.go +++ b/syncer/plugin/plugin.go @@ -11,26 +11,84 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syncer +package plugin import ( + "plugin" + //"fmt" + + //"github.com/pingcap/errors" + "github.com/pingcap/dm/dm/config" "github.com/siddontang/go-mysql/replication" ) +var ( + createPluginFunc = "NewPlugin" +) + +// LoadPlugin loads plugin by plugin's file path +func LoadPlugin(filepath string) (Plugin, error) { + p, err := plugin.Open(filepath) + if err != nil { + // TODO: use terror + return nil, err + } + + pluginSymbol, err := p.Lookup(createPluginFunc) + if err != nil { + return nil, err + } + + newPlugin, ok := pluginSymbol.(func() interface{}) + if !ok { + // TODO: use terror + return nil, nil + } + + plg := newPlugin() + plg2, ok := plg.(Plugin) + if !ok { + return nil, nil + } + + return plg2, nil + /* + syncerPlugin.Init = initFunc + + + plg := newPlugin() + handleDDLJobResultFunc, ok := plg.(HandleDDLJobResult) + if !ok { + return nil, nil + } + syncerPlugin.HandleDDLJobResult = handleDDLJobResultFunc + + + plg := newPlugin() + HandleDMLJobResultFunc, ok := plg.(HandleDMLJobResult) + if !ok { + return nil, nil + } + syncerPlugin.HandleDMLJobResult = HandleDMLJobResultFunc + + return syncerPlugin + */ +} + // Plugin is a struct of plugin used in syncer unit -type Plugin struct { +type Plugin interface { // the plugin's .so file path - path string + //path string // conn used to creating connection for double write or other usage - conn interface{} + //conn interface{} // Init do some init job - Init func() error + Init(cfg *config.SubTaskConfig) error // HandleDDLJobResult handles the result of ddl job - HandleDDLJobResult func(ev *replication.QueryEvent, ec eventContext, err error) error + HandleDDLJobResult(ev *replication.QueryEvent, err error) error // HandleDMLJobResult handles the result of dml job - HandleDMLJobResult func(ev *replication.RowsEvent, ec eventContext, err error) error + HandleDMLJobResult(ev *replication.RowsEvent, err error) error } diff --git a/syncer/syncer.go b/syncer/syncer.go index 5e9d0669a5..27cf74c796 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -16,7 +16,7 @@ package syncer import ( "context" "fmt" - "plugin" + //"plugin" "reflect" "strconv" "strings" @@ -56,6 +56,7 @@ import ( "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/tracing" "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/dm/syncer/plugin" sm "github.com/pingcap/dm/syncer/safe-mode" "github.com/pingcap/dm/syncer/shardddl" operator "github.com/pingcap/dm/syncer/sql-operator" @@ -180,7 +181,7 @@ type Syncer struct { addJobFunc func(*job) error - plugin *Plugin + plugin plugin.Plugin } // NewSyncer creates a new Syncer. @@ -225,9 +226,9 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.tctx.L().DPanic("cannot create schema tracker", zap.Error(err)) } - syncer.plugin = &Plugin{ - path: cfg.PluginPath, - } + //syncer.plugin = &Plugin{ + // path: cfg.PluginPath, + //} return syncer } @@ -393,39 +394,47 @@ func (s *Syncer) Init(ctx context.Context) (err error) { // initPlugin initializes the plugin func (s *Syncer) initPlugin() error { - if len(s.plugin.path) == 0 { - return nil - } - - p, err := plugin.Open(s.plugin.path) + plugin, err := plugin.LoadPlugin(s.cfg.PluginPath) if err != nil { - // TODO: use terror return err } + s.plugin = plugin + return s.plugin.Init(s.cfg) + /* + if len(s.plugin.path) == 0 { + return nil + } - initFunc, err := p.Lookup("Init") - if err != nil { - // TODO: use terror - return err - } + p, err := plugin.Open(s.plugin.path) + if err != nil { + // TODO: use terror + return err + } - handleDDLJobResultFunc, err := p.Lookup("HandleDDLJobResult") - if err != nil { - // TODO: use terror - return err - } + initFunc, err := p.Lookup("Init") + if err != nil { + // TODO: use terror + return err + } - handleDMLJobResultFunc, err := p.Lookup("HandleDMLJobResult") - if err != nil { - // TODO: use terror - return err - } + handleDDLJobResultFunc, err := p.Lookup("HandleDDLJobResult") + if err != nil { + // TODO: use terror + return err + } - s.plugin.Init = initFunc.(func() error) - s.plugin.HandleDDLJobResult = handleDDLJobResultFunc.(func(*replication.QueryEvent, eventContext, error) error) - s.plugin.HandleDMLJobResult = handleDMLJobResultFunc.(func(*replication.RowsEvent, eventContext, error) error) + handleDMLJobResultFunc, err := p.Lookup("HandleDMLJobResult") + if err != nil { + // TODO: use terror + return err + } + + s.plugin.Init = initFunc.(func() error) + s.plugin.HandleDDLJobResult = handleDDLJobResultFunc.(func(*replication.QueryEvent, eventContext, error) error) + s.plugin.HandleDMLJobResult = handleDMLJobResultFunc.(func(*replication.RowsEvent, eventContext, error) error) - return s.plugin.Init() + return s.plugin.Init() + */ } // initShardingGroups initializes sharding groups according to source MySQL, filter rules and router rules @@ -1335,13 +1344,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } case *replication.RowsEvent: err = s.handleRowsEvent(ev, ec) - err1 := s.plugin.HandleDMLJobResult(ev, ec, err) + err1 := s.plugin.HandleDMLJobResult(ev, err) if err1 != nil { return terror.Annotatef(err1, "current location %s", currentLocation) } case *replication.QueryEvent: err = s.handleQueryEvent(ev, ec) - err1 := s.plugin.HandleDDLJobResult(ev, ec, err) + err1 := s.plugin.HandleDDLJobResult(ev, err) if err1 != nil { return terror.Annotatef(err1, "current location %s", currentLocation) } From 5f19e079d2cb10750b76a81cdee7ded5998ec4f4 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 21 Apr 2020 20:02:01 +0800 Subject: [PATCH 5/8] update error --- syncer/optimist.go | 4 ++-- syncer/plugin/plugin.go | 27 --------------------------- syncer/syncer.go | 20 +++++++++++--------- syncer/util.go | 20 ++++++++++++++++++++ 4 files changed, 33 insertions(+), 38 deletions(-) diff --git a/syncer/optimist.go b/syncer/optimist.go index 7f098ab5e2..02cf44c5ea 100644 --- a/syncer/optimist.go +++ b/syncer/optimist.go @@ -168,8 +168,8 @@ func (s *Syncer) handleQueryEventOptimistic( return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if detected, err := s.execError.Detected(); detected { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } for _, table := range onlineDDLTableNames { diff --git a/syncer/plugin/plugin.go b/syncer/plugin/plugin.go index 58dace1390..af9e4a09ef 100644 --- a/syncer/plugin/plugin.go +++ b/syncer/plugin/plugin.go @@ -52,37 +52,10 @@ func LoadPlugin(filepath string) (Plugin, error) { } return plg2, nil - /* - syncerPlugin.Init = initFunc - - - plg := newPlugin() - handleDDLJobResultFunc, ok := plg.(HandleDDLJobResult) - if !ok { - return nil, nil - } - syncerPlugin.HandleDDLJobResult = handleDDLJobResultFunc - - - plg := newPlugin() - HandleDMLJobResultFunc, ok := plg.(HandleDMLJobResult) - if !ok { - return nil, nil - } - syncerPlugin.HandleDMLJobResult = HandleDMLJobResultFunc - - return syncerPlugin - */ } // Plugin is a struct of plugin used in syncer unit type Plugin interface { - // the plugin's .so file path - //path string - - // conn used to creating connection for double write or other usage - //conn interface{} - // Init do some init job Init(cfg *config.SubTaskConfig) error diff --git a/syncer/syncer.go b/syncer/syncer.go index 27cf74c796..63a58ee78c 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -158,7 +158,9 @@ type Syncer struct { // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError // record whether error occurred when execute SQLs - execErrorDetected sync2.AtomicBool + //execErrorDetected sync2.AtomicBool + + execError executeError execErrors struct { sync.Mutex @@ -502,7 +504,7 @@ func (s *Syncer) reset() { // create new job chans s.newJobChans(s.cfg.WorkerCount + 1) - s.execErrorDetected.Set(false) + s.execError.Set(nil) s.resetExecErrors() switch s.cfg.ShardMode { @@ -858,7 +860,7 @@ func (s *Syncer) resetShardingGroup(schema, table string) { // // we may need to refactor the concurrency model to make the work-flow more clearer later func (s *Syncer) flushCheckPoints() error { - if s.execErrorDetected.Get() { + if detected, _ := s.execError.Detected(); detected { s.tctx.L().Warn("error detected when executing SQL job, skip flush checkpoint", zap.Stringer("checkpoint", s.checkpoint)) return nil } @@ -968,7 +970,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, } s.jobWg.Done() if err != nil { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -1000,7 +1002,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo } fatalF := func(err error) { - s.execErrorDetected.Set(true) + s.execError.Set(err) if !utils.IsContextCanceledError(err) { s.runFatalChan <- unit.NewProcessError(err) } @@ -1785,8 +1787,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // when add ddl job, will execute ddl and then flush checkpoint. // if execute ddl failed, the execErrorDetected will be true. - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if detected, err := s.execError.Detected(); detected { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("location", ec.currentLocation)) @@ -1976,8 +1978,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return err } - if s.execErrorDetected.Get() { - return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + if detected, err := s.execError.Detected(); detected { + return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } if len(onlineDDLTableNames) > 0 { diff --git a/syncer/util.go b/syncer/util.go index 2d95673964..00d260e35b 100644 --- a/syncer/util.go +++ b/syncer/util.go @@ -17,6 +17,7 @@ import ( "fmt" "os" "strconv" + "sync" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/terror" @@ -100,3 +101,22 @@ func getDBConfigFromEnv() config.DBConfig { Port: port, } } + +type executeError struct { + sync.RWMutex + + err error +} + +func (e *executeError) Detected() (bool, error) { + e.RLock() + defer e.RUnlock() + + return e.err != nil, e.err +} + +func (e *executeError) Set(err error) { + e.Lock() + e.err = err + e.Unlock() +} From e85c9c2324e7e8b2bafe0d1241d5d7a0d9a20ad8 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 21 Apr 2020 20:14:37 +0800 Subject: [PATCH 6/8] minor update --- syncer/plugin/demo/demo.go | 2 +- syncer/plugin/plugin.go | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/syncer/plugin/demo/demo.go b/syncer/plugin/demo/demo.go index d628c3d297..678595b25d 100644 --- a/syncer/plugin/demo/demo.go +++ b/syncer/plugin/demo/demo.go @@ -127,5 +127,5 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) // HandleDMLJobResult implements Plugin's HandleDMLJobResult func (dp *DemoPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error { - return nil + return err } diff --git a/syncer/plugin/plugin.go b/syncer/plugin/plugin.go index af9e4a09ef..cbbb4f9159 100644 --- a/syncer/plugin/plugin.go +++ b/syncer/plugin/plugin.go @@ -28,6 +28,10 @@ var ( // LoadPlugin loads plugin by plugin's file path func LoadPlugin(filepath string) (Plugin, error) { + if len(filepath) == 0 { + return new(NilPlugin), nil + } + p, err := plugin.Open(filepath) if err != nil { // TODO: use terror @@ -65,3 +69,21 @@ type Plugin interface { // HandleDMLJobResult handles the result of dml job HandleDMLJobResult(ev *replication.RowsEvent, err error) error } + +// NilPlugin is a plugin which do nothing +type NilPlugin struct {} + +// Init implements Plugin's Init +func (n *NilPlugin) Init(cfg *config.SubTaskConfig) error { + return nil +} + +// HandleDDLJobResult implements Plugin's HandleDDLJobResult +func (n *NilPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error { + return err +} + +// HandleDMLJobResult implements Plugin's HandleDMLJobResult +func (n *NilPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error { + return err +} \ No newline at end of file From 01d66aef8cc6b72efdfb83cb6f51849a78f859f2 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 21 Apr 2020 22:33:22 +0800 Subject: [PATCH 7/8] dm-syncer support plugin && add test --- Makefile | 11 ++++---- cmd/dm-syncer/config.go | 7 +++++ pkg/terror/error_list.go | 3 ++ syncer/plugin/demo/demo.go | 35 ++++++++++++++++++------ syncer/plugin/plugin.go | 13 +++++---- syncer/syncer.go | 38 -------------------------- tests/_utils/run_dm_syncer | 5 ++-- tests/_utils/test_prepare | 4 +-- tests/dm_syncer/conf/dm-syncer-1.toml | 3 ++ tests/dm_syncer/data/db1.increment.sql | 6 ++++ 10 files changed, 63 insertions(+), 62 deletions(-) diff --git a/Makefile b/Makefile index a14f97e13a..9d6827327a 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,9 @@ dm-portal: dm-syncer: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dm-syncer ./cmd/dm-syncer +plugin-demo: + CGO_ENABLED=1 $(GOBUILD) -ldflags '$(LDFLAGS)' -o /tmp/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go + debug-tools: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./debug-tools/binlog-event-blackhole @@ -157,10 +160,8 @@ dm_integration_test_build: retool_setup -coverpkg=github.com/pingcap/dm/... \ -o bin/dm-tracer.test github.com/pingcap/dm/cmd/dm-tracer \ || { $(FAILPOINT_DISABLE); exit 1; } - $(GOTEST) -c $(TEST_RACE_FLAG) -cover -covermode=atomic \ - -coverpkg=github.com/pingcap/dm/... \ - -o bin/dm-syncer.test github.com/pingcap/dm/cmd/dm-syncer \ - || { $(FAILPOINT_DISABLE); exit 1; } + CGO_ENABLED=1 GO111MODULE=on go build -o bin/dm-syncer ./cmd/dm-syncer + CGO_ENABLED=1 GO111MODULE=on go build -o /tmp/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go $(FAILPOINT_DISABLE) tests/prepare_tools.sh @@ -173,7 +174,7 @@ integration_test: check_third_party_binary @which bin/dm-master.test @which bin/dm-worker.test @which bin/dm-tracer.test - @which bin/dm-syncer.test + @which bin/dm-syncer tests/run.sh $(CASE) compatibility_test: check_third_party_binary diff --git a/cmd/dm-syncer/config.go b/cmd/dm-syncer/config.go index 548b88483f..96f38ef5b8 100644 --- a/cmd/dm-syncer/config.go +++ b/cmd/dm-syncer/config.go @@ -56,6 +56,8 @@ type commonConfig struct { EnableANSIQuotes bool TimezoneStr string + PluginPath string + SyncerConfigFormat bool } @@ -77,6 +79,7 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask MaxRetry: c.MaxRetry, EnableANSIQuotes: c.EnableANSIQuotes, TimezoneStr: c.TimezoneStr, + PluginPath: c.PluginPath, } cfg.FlagSet = flag.NewFlagSet("dm-syncer", flag.ContinueOnError) @@ -271,6 +274,8 @@ type syncerConfig struct { TimezoneStr string `toml:"timezone" json:"timezone"` Timezone *time.Location `json:"-"` + PluginPath string `toml:"plugin-path" json:"plugin-path"` + printVersion bool } @@ -349,6 +354,8 @@ func (oc *syncerConfig) convertToNewFormat() (*config.SubTaskConfig, error) { Timezone: oc.TimezoneStr, From: oc.From, To: oc.To, + + PluginPath: oc.PluginPath, } for _, rule := range oc.RouteRules { diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 0621398ade..797add249b 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -365,6 +365,7 @@ const ( codeSyncerUnitGenBWList codeSyncerUnitHandleDDLFailed codeSyncerShardDDLConflict + codeSyncerLoadPlugin ) // DM-master error code @@ -878,6 +879,8 @@ var ( ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s") ErrSyncerShardDDLConflict = New(codeSyncerShardDDLConflict, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle shard ddl %v in optimistic mode, because schema conflict detected") + ErrSyncerLoadPlugin = New(codeSyncerLoadPlugin, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to load plugin from %s") + // DM-master error ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") ErrMasterSQLOpNotSupport = New(codeMasterSQLOpNotSupport, ClassDMMaster, ScopeInternal, LevelMedium, "op %s not supported") diff --git a/syncer/plugin/demo/demo.go b/syncer/plugin/demo/demo.go index 678595b25d..9f7a8c318b 100644 --- a/syncer/plugin/demo/demo.go +++ b/syncer/plugin/demo/demo.go @@ -36,7 +36,7 @@ type DemoPlugin struct { } // NewPlugin creates a new DemoPlugin -func NewPlugin() *DemoPlugin { +func NewPlugin() interface{} { return &DemoPlugin{} } @@ -59,20 +59,26 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) return nil } + log.Info("demo plugin HandleDDLJobResult", zap.String("query", string(ev.Query)), zap.Error(err)) if !strings.Contains(err.Error(), "unsupported modify column length") { + log.Info("don't contain error message \"unsupported modify column length\"") return nil } stmt, err := parser.New().ParseOneStmt(string(ev.Query), "", "") if err != nil { + log.Info("parser failed", zap.Error(err)) return err } + schema := string(ev.Schema) + switch st := stmt.(type) { case *ast.AlterTableStmt: - switch st.Specs[0].Tp { case ast.AlterTableModifyColumn: + log.Info("handle AlterTableModifyColumn") + originColName := st.Specs[0].NewColumns[0].Name.Name.O tmpColName := fmt.Sprintf("%s_tmp", st.Specs[0].NewColumns[0].Name) @@ -84,44 +90,57 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) var sb2 strings.Builder st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb2)) tmpCol := sb2.String() - log.Info("after translate", zap.String("new col", tmpCol), zap.String("origin col", originColName)) + //log.Info("after translate", zap.String("new col", tmpCol), zap.String("origin col", originColName)) ctx := context.Background() - tableInfo, err := dbutil.GetTableInfo(ctx, dp.db, st.Table.Schema.O, st.Table.Name.O, "") + tableInfo, err := dbutil.GetTableInfo(ctx, dp.db, schema, st.Table.Name.O, "") if err != nil { + log.Info("GetTableInfo failed", zap.Error(err)) return err } keys, _ := dbutil.SelectUniqueOrderKey(tableInfo) keysList := strings.Join(keys, ", ") - addColSQL := fmt.Sprintf("alter table `%s`.`%s` add column %s after %s", st.Table.Schema.O, st.Table.Name.O, tmpCol, originColName) + addColSQL := fmt.Sprintf("alter table `%s`.`%s` add column %s after %s", schema, st.Table.Name.O, tmpCol, originColName) + log.Info("execute", zap.String("sql", addColSQL)) _, err = dp.db.ExecContext(ctx, addColSQL) if err != nil { + log.Info("GetTableInfo failed", zap.Error(err)) return err } - insertSQL := fmt.Sprintf("replace into `%s`.`%s`(%s, %s) SELECT %s, %s AS %s FROM `%s`.`%s`;", st.Table.Schema.O, st.Table.Name.O, keysList, tmpColName, keysList, originColName, tmpColName, st.Table.Schema.O, st.Table.Name.O) + insertSQL := fmt.Sprintf("replace into `%s`.`%s`(%s, %s) SELECT %s, %s AS %s FROM `%s`.`%s`;", schema, st.Table.Name.O, keysList, tmpColName, keysList, originColName, tmpColName, schema, st.Table.Name.O) + log.Info("execute", zap.String("sql", insertSQL)) _, err = dp.db.ExecContext(ctx, insertSQL) if err != nil { + log.Info("GetTableInfo failed", zap.Error(err)) return err } - dropColSQL := fmt.Sprintf("alter table `%s`.`%s` drop column %s", st.Table.Schema.O, st.Table.Name.O, originColName) + dropColSQL := fmt.Sprintf("alter table `%s`.`%s` drop column %s", schema, st.Table.Name.O, originColName) + log.Info("execute", zap.String("sql", dropColSQL)) _, err = dp.db.ExecContext(ctx, dropColSQL) if err != nil { + log.Info("GetTableInfo failed", zap.Error(err)) return err } - changeColSQL := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN %s %s;", st.Table.Schema.O, st.Table.Name.O, tmpColName, originCol) + changeColSQL := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN %s %s;", schema, st.Table.Name.O, tmpColName, originCol) + log.Info("execute", zap.String("sql", changeColSQL)) _, err = dp.db.ExecContext(ctx, changeColSQL) if err != nil { return err } + default: + log.Info("ignore") + return nil } default: + log.Info("ignore") return nil } + log.Info("ignore") return nil } diff --git a/syncer/plugin/plugin.go b/syncer/plugin/plugin.go index cbbb4f9159..0acaf38cad 100644 --- a/syncer/plugin/plugin.go +++ b/syncer/plugin/plugin.go @@ -19,6 +19,7 @@ import ( //"github.com/pingcap/errors" "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/terror" "github.com/siddontang/go-mysql/replication" ) @@ -35,24 +36,24 @@ func LoadPlugin(filepath string) (Plugin, error) { p, err := plugin.Open(filepath) if err != nil { // TODO: use terror - return nil, err + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) } pluginSymbol, err := p.Lookup(createPluginFunc) if err != nil { - return nil, err + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) } newPlugin, ok := pluginSymbol.(func() interface{}) if !ok { // TODO: use terror - return nil, nil + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) } plg := newPlugin() plg2, ok := plg.(Plugin) if !ok { - return nil, nil + return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) } return plg2, nil @@ -71,7 +72,7 @@ type Plugin interface { } // NilPlugin is a plugin which do nothing -type NilPlugin struct {} +type NilPlugin struct{} // Init implements Plugin's Init func (n *NilPlugin) Init(cfg *config.SubTaskConfig) error { @@ -86,4 +87,4 @@ func (n *NilPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) er // HandleDMLJobResult implements Plugin's HandleDMLJobResult func (n *NilPlugin) HandleDMLJobResult(ev *replication.RowsEvent, err error) error { return err -} \ No newline at end of file +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 63a58ee78c..c9dfc1fa91 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -402,41 +402,6 @@ func (s *Syncer) initPlugin() error { } s.plugin = plugin return s.plugin.Init(s.cfg) - /* - if len(s.plugin.path) == 0 { - return nil - } - - p, err := plugin.Open(s.plugin.path) - if err != nil { - // TODO: use terror - return err - } - - initFunc, err := p.Lookup("Init") - if err != nil { - // TODO: use terror - return err - } - - handleDDLJobResultFunc, err := p.Lookup("HandleDDLJobResult") - if err != nil { - // TODO: use terror - return err - } - - handleDMLJobResultFunc, err := p.Lookup("HandleDMLJobResult") - if err != nil { - // TODO: use terror - return err - } - - s.plugin.Init = initFunc.(func() error) - s.plugin.HandleDDLJobResult = handleDDLJobResultFunc.(func(*replication.QueryEvent, eventContext, error) error) - s.plugin.HandleDMLJobResult = handleDMLJobResultFunc.(func(*replication.RowsEvent, eventContext, error) error) - - return s.plugin.Init() - */ } // initShardingGroups initializes sharding groups according to source MySQL, filter rules and router rules @@ -971,9 +936,6 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, s.jobWg.Done() if err != nil { s.execError.Set(err) - if !utils.IsContextCanceledError(err) { - s.runFatalChan <- unit.NewProcessError(err) - } continue } s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls))) diff --git a/tests/_utils/run_dm_syncer b/tests/_utils/run_dm_syncer index 9b4c81f3bd..bee13db761 100755 --- a/tests/_utils/run_dm_syncer +++ b/tests/_utils/run_dm_syncer @@ -5,7 +5,7 @@ set -eu workdir=$1 config=$2 -binary=$PWD/bin/dm-syncer.test +binary=$PWD/bin/dm-syncer PWD=$(pwd) @@ -23,7 +23,6 @@ fi cd $workdir echo "$binary --log-file="$workdir/log/dm-syncer.log" --config="$config" $meta $format >> $workdir/log/stdout.log 2>&1 &" -$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.syncer.out" DEVEL \ --L=Debug --log-file="$workdir/log/dm-syncer.log" --config="$config" $name $meta $format >> $workdir/log/stdout.log 2>&1 & +$binary -L=Debug --log-file="$workdir/log/dm-syncer.log" --config="$config" $name $meta $format >> $workdir/log/stdout.log 2>&1 & cd $PWD diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 1a5c291bec..a52c830bd5 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -14,12 +14,12 @@ function cleanup_process() { pkill -hup dm-worker.test 2>/dev/null || true pkill -hup dm-master.test 2>/dev/null || true pkill -hup dm-tracer.test 2>/dev/null || true - pkill -hup dm-syncer.test 2>/dev/null || true + pkill -hup dm-syncer 2>/dev/null || true wait_process_exit dm-master.test wait_process_exit dm-worker.test wait_process_exit dm-tracer.test - wait_process_exit dm-syncer.test + wait_process_exit dm-syncer } if [ "$RESET_MASTER" = true ]; then diff --git a/tests/dm_syncer/conf/dm-syncer-1.toml b/tests/dm_syncer/conf/dm-syncer-1.toml index 3d9b8bef54..f5005ca72b 100644 --- a/tests/dm_syncer/conf/dm-syncer-1.toml +++ b/tests/dm_syncer/conf/dm-syncer-1.toml @@ -23,6 +23,9 @@ disable-heartbeat = true # replicate from relay log or remote binlog binlog-type = "remote" +# plugin configuration +plugin-path = "/tmp/demo.so" + # Mydumper configuration # -t, --threads diff --git a/tests/dm_syncer/data/db1.increment.sql b/tests/dm_syncer/data/db1.increment.sql index e8f75e3ea2..94f449d9a3 100644 --- a/tests/dm_syncer/data/db1.increment.sql +++ b/tests/dm_syncer/data/db1.increment.sql @@ -23,3 +23,9 @@ insert into t1 (id, name, info) values (7, 'gentest', '{"id": 126}'); update t1 set name = 'gentestxxxxxx' where gen_id = 124; -- delete with unique key delete from t1 where gen_id > 124; + +create table t11(id int primary key, name varchar(100)); +insert into t11 values(1, "a"),(2, "b"); +/* alter table will failed when execute in TiDB, will be handled by plugin */ +ALTER TABLE t11 MODIFY COLUMN name varchar(50); +insert into t11 values(3, "c"),(4, "d"); From 2a9135bcaee506ca347dd5dafd8045ec79e69653 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Tue, 21 Apr 2020 23:12:20 +0800 Subject: [PATCH 8/8] clean code --- Makefile | 5 +---- _utils/terror_gen/errors_release.txt | 1 + syncer/plugin/demo/demo.go | 28 ++++++++++++++++----------- syncer/plugin/plugin.go | 4 ---- syncer/syncer.go | 9 +-------- tests/dm_syncer/conf/dm-syncer-1.toml | 1 - tests/dm_syncer/run.sh | 4 ++++ 7 files changed, 24 insertions(+), 28 deletions(-) diff --git a/Makefile b/Makefile index 9d6827327a..9f9cdf00d6 100644 --- a/Makefile +++ b/Makefile @@ -69,9 +69,6 @@ dm-portal: dm-syncer: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dm-syncer ./cmd/dm-syncer -plugin-demo: - CGO_ENABLED=1 $(GOBUILD) -ldflags '$(LDFLAGS)' -o /tmp/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go - debug-tools: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./debug-tools/binlog-event-blackhole @@ -161,7 +158,7 @@ dm_integration_test_build: retool_setup -o bin/dm-tracer.test github.com/pingcap/dm/cmd/dm-tracer \ || { $(FAILPOINT_DISABLE); exit 1; } CGO_ENABLED=1 GO111MODULE=on go build -o bin/dm-syncer ./cmd/dm-syncer - CGO_ENABLED=1 GO111MODULE=on go build -o /tmp/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go + CGO_ENABLED=1 GO111MODULE=on go build -o bin/demo.so -buildmode=plugin ./syncer/plugin/demo/demo.go $(FAILPOINT_DISABLE) tests/prepare_tools.sh diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index de802c0261..53f5d03d30 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -292,6 +292,7 @@ ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:le ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list" ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s" ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high],"fail to handle shard ddl %v in optimistic mode, because schema conflict detected" +ErrSyncerLoadPlugin,[code=36063:class=sync-unit:scope=internal:level=high],"fail to load plugin from %s" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid" diff --git a/syncer/plugin/demo/demo.go b/syncer/plugin/demo/demo.go index 9f7a8c318b..a733ae7cba 100644 --- a/syncer/plugin/demo/demo.go +++ b/syncer/plugin/demo/demo.go @@ -42,6 +42,8 @@ func NewPlugin() interface{} { // Init implements Plugin's Init func (dp *DemoPlugin) Init(cfg *config.SubTaskConfig) error { + log.Info("demo plugin initialize") + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true", cfg.To.User, cfg.To.Password, cfg.To.Host, cfg.To.Port) db, err := sql.Open("mysql", dsn) @@ -54,12 +56,16 @@ func (dp *DemoPlugin) Init(cfg *config.SubTaskConfig) error { } // HandleDDLJobResult implements Plugin's HandleDDLJobResult +// for example: +// ev.Query is `ALTER TABLE test.t1 MODIFY COLUMN name varchar(50);` +// error is `unsupported modify column length 50 is less than origin 100` func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) error { if err == nil { return nil } - log.Info("demo plugin HandleDDLJobResult", zap.String("query", string(ev.Query)), zap.Error(err)) + log.Info("demo plugin handle ddl job result", zap.String("query", string(ev.Query)), zap.Error(err)) + if !strings.Contains(err.Error(), "unsupported modify column length") { log.Info("don't contain error message \"unsupported modify column length\"") return nil @@ -77,25 +83,25 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) case *ast.AlterTableStmt: switch st.Specs[0].Tp { case ast.AlterTableModifyColumn: - log.Info("handle AlterTableModifyColumn") - originColName := st.Specs[0].NewColumns[0].Name.Name.O tmpColName := fmt.Sprintf("%s_tmp", st.Specs[0].NewColumns[0].Name) + // get origin column from ast, originCol is `name varchar(50)` var sb strings.Builder st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb)) originCol := sb.String() + // generate tmp column, tmpCol is `name_tmp varchar(50)` st.Specs[0].NewColumns[0].Name.Name = model.NewCIStr(tmpColName) var sb2 strings.Builder st.Specs[0].NewColumns[0].Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, &sb2)) tmpCol := sb2.String() - //log.Info("after translate", zap.String("new col", tmpCol), zap.String("origin col", originColName)) + // get table infomation, used to get primary key and unique key ctx := context.Background() tableInfo, err := dbutil.GetTableInfo(ctx, dp.db, schema, st.Table.Name.O, "") if err != nil { - log.Info("GetTableInfo failed", zap.Error(err)) + log.Info("get table information failed", zap.Error(err)) return err } keys, _ := dbutil.SelectUniqueOrderKey(tableInfo) @@ -105,7 +111,7 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) log.Info("execute", zap.String("sql", addColSQL)) _, err = dp.db.ExecContext(ctx, addColSQL) if err != nil { - log.Info("GetTableInfo failed", zap.Error(err)) + log.Info("execute sql failed", zap.String("sql", addColSQL), zap.Error(err)) return err } @@ -113,7 +119,7 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) log.Info("execute", zap.String("sql", insertSQL)) _, err = dp.db.ExecContext(ctx, insertSQL) if err != nil { - log.Info("GetTableInfo failed", zap.Error(err)) + log.Info("execute sql failed", zap.String("sql", insertSQL), zap.Error(err)) return err } @@ -121,7 +127,7 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) log.Info("execute", zap.String("sql", dropColSQL)) _, err = dp.db.ExecContext(ctx, dropColSQL) if err != nil { - log.Info("GetTableInfo failed", zap.Error(err)) + log.Info("execute sql failed", zap.String("sql", dropColSQL), zap.Error(err)) return err } @@ -129,18 +135,18 @@ func (dp *DemoPlugin) HandleDDLJobResult(ev *replication.QueryEvent, err error) log.Info("execute", zap.String("sql", changeColSQL)) _, err = dp.db.ExecContext(ctx, changeColSQL) if err != nil { + log.Info("execute sql failed", zap.String("sql", changeColSQL), zap.Error(err)) return err } default: - log.Info("ignore") + log.Info("unhandle ddl type") return nil } default: - log.Info("ignore") + log.Info("unhandle ddl type") return nil } - log.Info("ignore") return nil } diff --git a/syncer/plugin/plugin.go b/syncer/plugin/plugin.go index 0acaf38cad..973895481f 100644 --- a/syncer/plugin/plugin.go +++ b/syncer/plugin/plugin.go @@ -15,9 +15,7 @@ package plugin import ( "plugin" - //"fmt" - //"github.com/pingcap/errors" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/terror" "github.com/siddontang/go-mysql/replication" @@ -35,7 +33,6 @@ func LoadPlugin(filepath string) (Plugin, error) { p, err := plugin.Open(filepath) if err != nil { - // TODO: use terror return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) } @@ -46,7 +43,6 @@ func LoadPlugin(filepath string) (Plugin, error) { newPlugin, ok := pluginSymbol.(func() interface{}) if !ok { - // TODO: use terror return nil, terror.ErrSyncerLoadPlugin.Delegate(err, filepath) } diff --git a/syncer/syncer.go b/syncer/syncer.go index c9dfc1fa91..460dbb46cf 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -16,7 +16,6 @@ package syncer import ( "context" "fmt" - //"plugin" "reflect" "strconv" "strings" @@ -158,8 +157,6 @@ type Syncer struct { // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError // record whether error occurred when execute SQLs - //execErrorDetected sync2.AtomicBool - execError executeError execErrors struct { @@ -228,10 +225,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.tctx.L().DPanic("cannot create schema tracker", zap.Error(err)) } - //syncer.plugin = &Plugin{ - // path: cfg.PluginPath, - //} - return syncer } @@ -1748,7 +1741,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } // when add ddl job, will execute ddl and then flush checkpoint. - // if execute ddl failed, the execErrorDetected will be true. + // if execute ddl failed, the detected will be true. if detected, err := s.execError.Detected(); detected { return terror.ErrSyncerUnitHandleDDLFailed.Delegate(err, ev.Query) } diff --git a/tests/dm_syncer/conf/dm-syncer-1.toml b/tests/dm_syncer/conf/dm-syncer-1.toml index f5005ca72b..b97e3529e7 100644 --- a/tests/dm_syncer/conf/dm-syncer-1.toml +++ b/tests/dm_syncer/conf/dm-syncer-1.toml @@ -24,7 +24,6 @@ disable-heartbeat = true binlog-type = "remote" # plugin configuration -plugin-path = "/tmp/demo.so" # Mydumper configuration diff --git a/tests/dm_syncer/run.sh b/tests/dm_syncer/run.sh index ce2e803cb4..87b54c57ed 100755 --- a/tests/dm_syncer/run.sh +++ b/tests/dm_syncer/run.sh @@ -63,10 +63,14 @@ function run() { name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2}'|tr -d ' ') pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2}'|tr -d ' ') fi + sed -i "s/binlog-name-placeholder-1/\"$name1\"/g" $WORK_DIR/dm-syncer-1.toml sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-syncer-1.toml sed -i "s/binlog-name-placeholder-2/\"$name2\"/g" $WORK_DIR/old_meta_file sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/old_meta_file + plugin_so=$PWD/bin/demo.so + sed -i "/plugin/i\plugin-path = \"$plugin_so\"" $WORK_DIR/dm-syncer-1.toml + run_dm_syncer $WORK_DIR/syncer1 $WORK_DIR/dm-syncer-1.toml meta_file=$WORK_DIR/old_meta_file run_dm_syncer $WORK_DIR/syncer2 $WORK_DIR/dm-syncer-2.toml $meta_file --syncer-config-format syncer2