diff --git a/cdc/sink/mysql/mysql_syncpoint_store.go b/cdc/sink/mysql/mysql_syncpoint_store.go index 714ace4af42..26cf286cde4 100644 --- a/cdc/sink/mysql/mysql_syncpoint_store.go +++ b/cdc/sink/mysql/mysql_syncpoint_store.go @@ -156,7 +156,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("create sync table: begin Tx fail", zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "create sync table: begin Tx fail;")) } _, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database) if err != nil { @@ -164,7 +164,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } _, err = tx.Exec("USE " + database) if err != nil { @@ -172,7 +172,7 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } query := `CREATE TABLE IF NOT EXISTS %s ( @@ -191,10 +191,10 @@ func (s *mysqlSyncPointStore) CreateSyncTable(ctx context.Context) error { if err2 != nil { log.Error("failed to create syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } err = tx.Commit() - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to create syncpoint table;")) } func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, @@ -204,7 +204,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, tx, err := s.db.BeginTx(ctx, nil) if err != nil { log.Error("sync table: begin Tx fail", zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "sync table: begin Tx fail;")) } row := tx.QueryRow("select @@tidb_current_ts") var secondaryTs string @@ -215,7 +215,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } // insert ts map query := "insert ignore into " + filter.TiCDCSystemSchema + "." + filter.SyncPointTable + @@ -226,7 +226,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } // set global tidb_external_ts to secondary ts @@ -242,7 +242,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err2)) } - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } } @@ -267,7 +267,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, } err = tx.Commit() - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write syncpoint table;")) } func (s *mysqlSyncPointStore) Close() error { diff --git a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go index ac8072611f4..778d31682cd 100644 --- a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go @@ -16,10 +16,11 @@ package mysql import ( "context" "database/sql" + "fmt" "net/url" "time" - "github.com/pingcap/errors" + cerrors "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/parser/model" @@ -189,7 +190,7 @@ func (m *mysqlDDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error zap.Duration("duration", time.Since(start)), zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, err) + return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) } log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query), @@ -217,7 +218,7 @@ func (m *mysqlDDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model // Close closes the database connection. func (m *mysqlDDLSink) Close() error { if err := m.db.Close(); err != nil { - return errors.Trace(err) + return cerrors.Trace(err) } if m.statistics != nil { m.statistics.Close() diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index 157a3576e83..3389a62edd4 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -772,7 +772,7 @@ func logDMLTxnErr( zap.String("query", query), zap.Int("count", count), zap.String("changefeed", changefeed)) } - return err + return errors.WithMessage(err, fmt.Sprintf("Failed query info: %s; ", query)) } func isRetryableDMLError(err error) bool {