Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore/performance] Batch migration queries #3798

Merged
merged 4 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 104 additions & 93 deletions internal/db/bundb/migrations/20241121121623_enum_strings_to_ints.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,107 +30,118 @@ import (

func init() {
up := func(ctx context.Context, db *bun.DB) error {
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {

// Status visibility type indices.
var statusVisIndices = []struct {
name string
cols []string
order string
}{
{
name: "statuses_visibility_idx",
cols: []string{"visibility"},
order: "",
},
{
name: "statuses_profile_web_view_idx",
cols: []string{"account_id", "visibility"},
order: "id DESC",
},
{
name: "statuses_public_timeline_idx",
cols: []string{"visibility"},
order: "id DESC",
},
}

// Tables with visibility types.
var visTables = []struct {
Table string
Column string
Default *new_gtsmodel.Visibility
IndexCleanupCallback func(ctx context.Context, tx bun.Tx) error
}{
{
Table: "statuses",
Column: "visibility",
IndexCleanupCallback: func(ctx context.Context, tx bun.Tx) error {
// After new column has been created and
// populated, drop indices relying on old column.
for _, index := range statusVisIndices {
log.Infof(ctx, "dropping old index %s...", index.name)
if _, err := tx.NewDropIndex().
Index(index.name).
Exec(ctx); err != nil {
return err
}
// Status visibility type indices.
var statusVisIndices = []struct {
name string
cols []string
order string
}{
{
name: "statuses_visibility_idx",
cols: []string{"visibility"},
order: "",
},
{
name: "statuses_profile_web_view_idx",
cols: []string{"account_id", "visibility"},
order: "id DESC",
},
{
name: "statuses_public_timeline_idx",
cols: []string{"visibility"},
order: "id DESC",
},
}

// Tables with visibility types.
var visTables = []struct {
Table string
Column string
Default *new_gtsmodel.Visibility
IndexCleanupCallback func(ctx context.Context, tx bun.Tx) error
BatchByColumn string
}{
{
Table: "statuses",
Column: "visibility",
IndexCleanupCallback: func(ctx context.Context, tx bun.Tx) error {
// After new column has been created and
// populated, drop indices relying on old column.
for _, index := range statusVisIndices {
log.Infof(ctx, "dropping old index %s...", index.name)
if _, err := tx.NewDropIndex().
Index(index.name).
Exec(ctx); err != nil {
return err
}
return nil
},
}
return nil
},
{
Table: "sin_bin_statuses",
Column: "visibility",
},
{
Table: "account_settings",
Column: "privacy",
Default: util.Ptr(new_gtsmodel.VisibilityDefault)},
{
Table: "account_settings",
Column: "web_visibility",
Default: util.Ptr(new_gtsmodel.VisibilityDefault)},
BatchByColumn: "id",
},
{
Table: "sin_bin_statuses",
Column: "visibility",
BatchByColumn: "id",
},
{
Table: "account_settings",
Column: "privacy",
Default: util.Ptr(new_gtsmodel.VisibilityDefault),
BatchByColumn: "account_id",
},

{
Table: "account_settings",
Column: "web_visibility",
Default: util.Ptr(new_gtsmodel.VisibilityDefault),
BatchByColumn: "account_id",
},
}

// Get the mapping of old enum string values to new integer values.
visibilityMapping := visibilityEnumMapping[old_gtsmodel.Visibility]()

// Convert all visibility tables.
for _, table := range visTables {

// Perform each enum table conversion within its own transaction.
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
return convertEnums(ctx, tx, table.Table, table.Column,
visibilityMapping, table.Default, table.IndexCleanupCallback, table.BatchByColumn)
}); err != nil {
return err
}

// Get the mapping of old enum string values to new integer values.
visibilityMapping := visibilityEnumMapping[old_gtsmodel.Visibility]()

// Convert all visibility tables.
for _, table := range visTables {
if err := convertEnums(ctx, tx, table.Table, table.Column,
visibilityMapping, table.Default, table.IndexCleanupCallback); err != nil {
return err
}
}

// Recreate the visibility indices.
log.Info(ctx, "creating new visibility indexes...")
for _, index := range statusVisIndices {
log.Infof(ctx, "creating new index %s...", index.name)
q := db.NewCreateIndex().
Table("statuses").
Index(index.name).
Column(index.cols...)
if index.order != "" {
q = q.ColumnExpr(index.order)
}

// Recreate the visibility indices.
log.Info(ctx, "creating new visibility indexes...")
for _, index := range statusVisIndices {
log.Infof(ctx, "creating new index %s...", index.name)
q := tx.NewCreateIndex().
Table("statuses").
Index(index.name).
Column(index.cols...)
if index.order != "" {
q = q.ColumnExpr(index.order)
}
if _, err := q.Exec(ctx); err != nil {
return err
}
if _, err := q.Exec(ctx); err != nil {
return err
}
}

// Get the mapping of old enum string values to the new integer value types.
notificationMapping := notificationEnumMapping[old_gtsmodel.NotificationType]()
// Get the mapping of old enum string values to the new integer value types.
notificationMapping := notificationEnumMapping[old_gtsmodel.NotificationType]()

// Migrate over old notifications table column over to new column type.
if err := convertEnums(ctx, tx, "notifications", "notification_type", //nolint:revive
notificationMapping, nil, nil); err != nil {
return err
}
// Migrate over old notifications table column to new type in tx.
if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
return convertEnums(ctx, tx, "notifications", "notification_type", //nolint:revive
notificationMapping, nil, nil, "id")
}); err != nil {
return err
}

return nil
})
return nil
}

down := func(ctx context.Context, db *bun.DB) error {
Expand Down
85 changes: 77 additions & 8 deletions internal/db/bundb/migrations/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"errors"
"fmt"
"reflect"
"slices"
"strconv"
"strings"

"codeberg.org/gruf/go-byteutil"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect"
Expand All @@ -46,6 +48,7 @@ func convertEnums[OldType ~string, NewType ~int16](
mapping map[OldType]NewType,
defaultValue *NewType,
indexCleanupCallback func(context.Context, bun.Tx) error,
batchByColumn string,
) error {
if len(mapping) == 0 {
return errors.New("empty mapping")
Expand Down Expand Up @@ -87,7 +90,7 @@ func convertEnums[OldType ~string, NewType ~int16](
var qbuf byteutil.Buffer

// Prepare a singular UPDATE statement using
// SET $newColumn = (CASE $column WHEN $old THEN $new ... END)
// SET $newColumn = (CASE $column WHEN $old THEN $new ... END).
qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...)
args = append(args, bun.Ident(table))
args = append(args, bun.Ident(newColumn))
Expand All @@ -99,16 +102,82 @@ func convertEnums[OldType ~string, NewType ~int16](
qbuf.B = append(qbuf.B, "ELSE ? END)"...)
args = append(args, *defaultValue)

// Execute the prepared raw query with arguments.
res, err := tx.NewRaw(qbuf.String(), args...).Exec(ctx)
if err != nil {
return gtserror.Newf("error updating old column values: %w", err)
// Serialize it here to be
// used as the base for each
// set of batch queries below.
baseQStr := string(qbuf.B)
baseArgs := args

// Query batch size
// in number of rows.
const batchsz = 5000

// Stores highest batch value
// used in iterate queries,
// starting at highest possible.
highest := id.Highest

// Total updated rows.
var updated int

for {
// Limit to batchsz
// items at once.
batchQ := tx.
NewSelect().
Table(table).
Column(batchByColumn).
Where("? < ?", bun.Ident(batchByColumn), highest).
OrderExpr("? DESC", bun.Ident(batchByColumn)).
Limit(batchsz)

// Finalize UPDATE to operate on this batch only.
qStr := baseQStr + " WHERE ? IN (?)"
args := append(
slices.Clone(baseArgs),
bun.Ident(batchByColumn),
batchQ,
)

// Execute the prepared raw query with arguments.
res, err := tx.NewRaw(qStr, args...).Exec(ctx)
if err != nil {
return gtserror.Newf("error updating old column values: %w", err)
}

// Check how many items we updated.
thisUpdated, err := res.RowsAffected()
if err != nil {
return gtserror.Newf("error counting affected rows: %w", err)
}

if thisUpdated == 0 {
// Nothing updated
// means we're done.
break
}

// Update the overall count.
updated += int(thisUpdated)

// Log helpful message to admin.
log.Infof(ctx, "migrated %d of %d %s (up to %s)",
updated, total, table, highest)

// Get next highest
// id for next batch.
if err := tx.
NewSelect().
With("batch_query", batchQ).
ColumnExpr("min(?) FROM ?", bun.Ident(batchByColumn), bun.Ident("batch_query")).
Scan(ctx, &highest); err != nil {
return gtserror.Newf("error selecting next highest: %w", err)
}
}

// Count number items updated.
updated, _ := res.RowsAffected()
if total != int(updated) {
log.Warnf(ctx, "total=%d does not match updated=%d", total, updated)
// Return error here in order to rollback the whole transaction.
return fmt.Errorf("total=%d does not match updated=%d", total, updated)
}

// Run index cleanup callback if set.
Expand Down