Skip to content

Commit

Permalink
Merge pull request cockroachdb#14292 from danhhz/placeholder
Browse files Browse the repository at this point in the history
sqlccl: make backup/restore work with sql placeholders
  • Loading branch information
danhhz authored Mar 21, 2017
2 parents 69521f9 + 275560c commit 58aab9e
Show file tree
Hide file tree
Showing 9 changed files with 4,395 additions and 4,299 deletions.
17 changes: 14 additions & 3 deletions pkg/ccl/sqlccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,15 @@ func backupPlanHook(
return nil, nil, err
}

toFn, err := p.TypeAsString(&backup.To)
if err != nil {
return nil, nil, err
}
incrementalFromFn, err := p.TypeAsStringArray(&backup.IncrementalFrom)
if err != nil {
return nil, nil, err
}

header := sql.ResultColumns{
{Name: "to", Typ: parser.TypeString},
{Name: "startTs", Typ: parser.TypeString},
Expand All @@ -307,7 +316,7 @@ func backupPlanHook(
var startTime hlc.Timestamp
if backup.IncrementalFrom != nil {
var err error
startTime, err = ValidatePreviousBackups(ctx, backup.IncrementalFrom)
startTime, err = ValidatePreviousBackups(ctx, incrementalFromFn())
if err != nil {
return nil, err
}
Expand All @@ -319,9 +328,11 @@ func backupPlanHook(
return nil, err
}
}

to := toFn()
desc, err := Backup(ctx,
p,
backup.To,
to,
backup.Targets,
startTime, endTime,
backup.Options,
Expand All @@ -330,7 +341,7 @@ func backupPlanHook(
return nil, err
}
ret := []parser.Datums{{
parser.NewDString(backup.To),
parser.NewDString(to),
parser.NewDString(startTime.String()),
parser.NewDString(endTime.String()),
parser.NewDInt(parser.DInt(desc.DataSize)),
Expand Down
107 changes: 52 additions & 55 deletions pkg/ccl/sqlccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func backupAndRestore(

var unused string
var dataSize int64
sqlDB.QueryRow(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, dest)).Scan(
sqlDB.QueryRow(`BACKUP DATABASE bench TO $1`, dest).Scan(
&unused, &unused, &unused, &dataSize,
)
approxDataSize := int64(backupRestoreRowPayloadSize) * numAccounts
Expand All @@ -188,7 +188,7 @@ func backupAndRestore(
// Force the ID of the restored bank table to be different.
sqlDBRestore.Exec(`CREATE TABLE bench.empty (a INT PRIMARY KEY)`)

sqlDBRestore.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s'`, dest))
sqlDBRestore.Exec(`RESTORE bench.* FROM $1`, dest)

var rowCount int64
sqlDBRestore.QueryRow(`SELECT COUNT(*) FROM bench.bank`).Scan(&rowCount)
Expand Down Expand Up @@ -224,19 +224,19 @@ func TestBackupRestoreInterleaved(t *testing.T) {
// The bank table has numAccounts accounts, put 2x that in i0, 3x in i0_0,
// and 4x in i1.
for i := 0; i < numAccounts; i++ {
_ = sqlDB.Exec(fmt.Sprintf(`INSERT INTO i0 VALUES (%d, 1), (%d, 2)`, i, i))
_ = sqlDB.Exec(fmt.Sprintf(`INSERT INTO i0_0 VALUES (%d, 1, 1), (%d, 2, 2), (%d, 3, 3)`, i, i, i))
_ = sqlDB.Exec(fmt.Sprintf(`INSERT INTO i1 VALUES (%d, 1), (%d, 2), (%d, 3), (%d, 4)`, i, i, i, i))
_ = sqlDB.Exec(`INSERT INTO i0 VALUES ($1, 1), ($1, 2)`, i)
_ = sqlDB.Exec(`INSERT INTO i0_0 VALUES ($1, 1, 1), ($1, 2, 2), ($1, 3, 3)`, i)
_ = sqlDB.Exec(`INSERT INTO i1 VALUES ($1, 1), ($1, 2), ($1, 3), ($1, 4)`, i)
}
_ = sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, dir))
_ = sqlDB.Exec(`BACKUP DATABASE bench TO $1`, dir)

t.Run("all tables in interleave hierarchy", func(t *testing.T) {
tcRestore := testcluster.StartTestCluster(t, singleNode, base.TestClusterArgs{})
defer tcRestore.Stopper().Stop()
sqlDBRestore := sqlutils.MakeSQLRunner(t, tcRestore.Conns[0])
sqlDBRestore.Exec(bankCreateDatabase)

sqlDBRestore.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s'`, dir))
sqlDBRestore.Exec(`RESTORE bench.* FROM $1`, dir)

var rowCount int64
sqlDBRestore.QueryRow(`SELECT COUNT(*) FROM bench.bank`).Scan(&rowCount)
Expand All @@ -263,7 +263,7 @@ func TestBackupRestoreInterleaved(t *testing.T) {
sqlDBRestore := sqlutils.MakeSQLRunner(t, tcRestore.Conns[0])
sqlDBRestore.Exec(bankCreateDatabase)

_, err := sqlDBRestore.DB.Exec(fmt.Sprintf(`RESTORE TABLE bench.i0 FROM '%s'`, dir))
_, err := sqlDBRestore.DB.Exec(`RESTORE TABLE bench.i0 FROM $1`, dir)
if !testutils.IsError(err, "without interleave parent") {
t.Fatalf("expected 'without interleave parent' error but got: %+v", err)
}
Expand All @@ -275,7 +275,7 @@ func TestBackupRestoreInterleaved(t *testing.T) {
sqlDBRestore := sqlutils.MakeSQLRunner(t, tcRestore.Conns[0])
sqlDBRestore.Exec(bankCreateDatabase)

_, err := sqlDBRestore.DB.Exec(fmt.Sprintf(`RESTORE TABLE bench.bank FROM '%s'`, dir))
_, err := sqlDBRestore.DB.Exec(`RESTORE TABLE bench.bank FROM $1`, dir)
if !testutils.IsError(err, "without interleave child") {
t.Fatalf("expected 'without interleave child' error but got: %+v", err)
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestBackupRestoreFKs(t *testing.T) {
)`)

for i := 0; i < numAccounts; i++ {
origDB.Exec(fmt.Sprintf(`INSERT INTO store.customers VALUES (%d, '%d')`, i, i))
origDB.Exec(`INSERT INTO store.customers VALUES ($1, $1::string)`, i)
}
// Each even customerID gets 3 orders, with predictable order and receipt IDs.
for cID := 0; cID < numAccounts; cID += 2 {
Expand All @@ -339,7 +339,7 @@ func TestBackupRestoreFKs(t *testing.T) {
}
}
}
_ = origDB.Exec(fmt.Sprintf(`BACKUP DATABASE store TO '%s'`, dir))
_ = origDB.Exec(`BACKUP DATABASE store TO $1`, dir)
}

origCustomers := origDB.QueryStr(`SHOW CONSTRAINTS FROM store.customers`)
Expand All @@ -351,7 +351,7 @@ func TestBackupRestoreFKs(t *testing.T) {
defer tc.Stopper().Stop()
db := sqlutils.MakeSQLRunner(t, tc.Conns[0])
db.Exec(createStore)
db.Exec(fmt.Sprintf(`RESTORE store.* FROM '%s'`, dir))
db.Exec(`RESTORE store.* FROM $1`, dir)
// Restore's Validate checks all the tables point to each other correctly.

db.CheckQueryResults(`SHOW CONSTRAINTS FROM store.customers`, origCustomers)
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestBackupRestoreFKs(t *testing.T) {
defer tc.Stopper().Stop()
db := sqlutils.MakeSQLRunner(t, tc.Conns[0])
db.Exec(createStore)
db.Exec(fmt.Sprintf(`RESTORE store.customers, store.orders FROM '%s'`, dir))
db.Exec(`RESTORE store.customers, store.orders FROM $1`, dir)
// Restore's Validate checks all the tables point to each other correctly.

// FK validation on customers from orders is preserved.
Expand All @@ -415,15 +415,14 @@ func TestBackupRestoreFKs(t *testing.T) {

// FK validation of self-FK is preserved.
if _, err := db.DB.Exec(
fmt.Sprintf(`RESTORE store.orders FROM '%s'`, dir),
`RESTORE store.orders FROM $1`, dir,
); !testutils.IsError(
err, "cannot restore table \"orders\" without referenced table 53 \\(or \"skip_missing_foreign_keys\" option\\)",
) {
t.Fatal(err)
}

db.Exec(
fmt.Sprintf(`RESTORE store.orders FROM '%s' WITH OPTIONS ('skip_missing_foreign_keys')`, dir))
db.Exec(`RESTORE store.orders FROM $1 WITH OPTIONS ('skip_missing_foreign_keys')`, dir)
// Restore's Validate checks all the tables point to each other correctly.

// FK validation is gone.
Expand All @@ -436,8 +435,7 @@ func TestBackupRestoreFKs(t *testing.T) {
defer tc.Stopper().Stop()
db := sqlutils.MakeSQLRunner(t, tc.Conns[0])
db.Exec(createStore)
db.Exec(fmt.Sprintf(
`RESTORE store.receipts FROM '%s' WITH OPTIONS ('skip_missing_foreign_keys')`, dir))
db.Exec(`RESTORE store.receipts FROM $1 WITH OPTIONS ('skip_missing_foreign_keys')`, dir)
// Restore's Validate checks all the tables point to each other correctly.

// FK validation of orders and customer is gone.
Expand All @@ -456,8 +454,7 @@ func TestBackupRestoreFKs(t *testing.T) {
defer tc.Stopper().Stop()
db := sqlutils.MakeSQLRunner(t, tc.Conns[0])
db.Exec(createStore)
db.Exec(fmt.Sprintf(
`RESTORE store.receipts, store.customers FROM '%s' WITH OPTIONS ('skip_missing_foreign_keys')`, dir))
db.Exec(`RESTORE store.receipts, store.customers FROM $1 WITH OPTIONS ('skip_missing_foreign_keys')`, dir)
// Restore's Validate checks all the tables point to each other correctly.

// FK validation of orders is gone.
Expand Down Expand Up @@ -678,12 +675,12 @@ func TestBackupRestoreWithConcurrentWrites(t *testing.T) {
<-bgActivity

// Backup DB while concurrent writes continue.
sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, baseDir))
sqlDB.Exec(`BACKUP DATABASE bench TO $1`, baseDir)

// Drop the table and restore from backup and check our invariant.
atomic.StoreInt32(&allowErrors, 1)
sqlDB.Exec(`DROP TABLE bench.bank`)
sqlDB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s'`, baseDir))
sqlDB.Exec(`RESTORE bench.* FROM $1`, baseDir)
atomic.StoreInt32(&allowErrors, 0)

bad := sqlDB.QueryStr(`SELECT id, balance, payload FROM bench.bank WHERE id != balance`)
Expand Down Expand Up @@ -718,7 +715,7 @@ func TestBackupAsOfSystemTime(t *testing.T) {

sqlDB.Exec(`DROP TABLE bench.bank`)

sqlDB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s'`, dir))
sqlDB.Exec(`RESTORE bench.* FROM $1`, dir)

sqlDB.QueryRow(`SELECT COUNT(*) FROM bench.bank`).Scan(&rowCount)
if rowCount != numAccounts {
Expand All @@ -736,7 +733,7 @@ func TestBackupRestoreChecksum(t *testing.T) {
_, dir, _, sqlDB, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts)
defer cleanupFn()

sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, dir))
sqlDB.Exec(`BACKUP DATABASE bench TO $1`, dir)

var backupDesc BackupDescriptor
{
Expand Down Expand Up @@ -768,7 +765,7 @@ func TestBackupRestoreChecksum(t *testing.T) {
}

sqlDB.Exec(`DROP TABLE bench.bank`)
_, err = sqlDB.DB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s'`, dir))
_, err = sqlDB.DB.Exec(`RESTORE bench.* FROM $1`, dir)
if !testutils.IsError(err, "checksum mismatch") {
t.Fatalf("expected 'checksum mismatch' error got: %+v", err)
}
Expand All @@ -788,40 +785,40 @@ func TestTimestampMismatch(t *testing.T) {
incrementalT2FromT1 := filepath.Join(dir, "2")
incrementalT3FromT1OneTable := filepath.Join(dir, "3")

sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`,
fullBackup))
sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s' INCREMENTAL FROM '%s'`,
incrementalT1FromFull, fullBackup))
sqlDB.Exec(fmt.Sprintf(`BACKUP TABLE bench.bank TO '%s' INCREMENTAL FROM '%s'`,
incrementalT3FromT1OneTable, fullBackup))
sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s' INCREMENTAL FROM '%s', '%s'`,
incrementalT2FromT1, fullBackup, incrementalT1FromFull))
sqlDB.Exec(`BACKUP DATABASE bench TO $1`,
fullBackup)
sqlDB.Exec(`BACKUP DATABASE bench TO $1 INCREMENTAL FROM $2`,
incrementalT1FromFull, fullBackup)
sqlDB.Exec(`BACKUP TABLE bench.bank TO $1 INCREMENTAL FROM $2`,
incrementalT3FromT1OneTable, fullBackup)
sqlDB.Exec(`BACKUP DATABASE bench TO $1 INCREMENTAL FROM $2, $3`,
incrementalT2FromT1, fullBackup, incrementalT1FromFull)

t.Run("Backup", func(t *testing.T) {
// Missing the initial full backup.
_, err := sqlDB.DB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s' INCREMENTAL FROM '%s'`,
dir, incrementalT1FromFull))
_, err := sqlDB.DB.Exec(`BACKUP DATABASE bench TO $1 INCREMENTAL FROM $2`,
dir, incrementalT1FromFull)
if !testutils.IsError(err, "no backup covers time") {
t.Errorf("expected 'no backup covers time' error got: %+v", err)
}

// Missing an intermediate incremental backup.
_, err = sqlDB.DB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s' INCREMENTAL FROM '%s', '%s'`,
dir, fullBackup, incrementalT2FromT1))
_, err = sqlDB.DB.Exec(`BACKUP DATABASE bench TO $1 INCREMENTAL FROM $2, $3`,
dir, fullBackup, incrementalT2FromT1)
if !testutils.IsError(err, "no backup covers time") {
t.Errorf("expected 'no backup covers time' error got: %+v", err)
}

// Backups specified out of order.
_, err = sqlDB.DB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s' INCREMENTAL FROM '%s', '%s'`,
dir, incrementalT1FromFull, fullBackup))
_, err = sqlDB.DB.Exec(`BACKUP DATABASE bench TO $1 INCREMENTAL FROM $2, $3`,
dir, incrementalT1FromFull, fullBackup)
if !testutils.IsError(err, "out of order") {
t.Errorf("expected 'out of order' error got: %+v", err)
}

// Missing data for one table in the most recent backup.
_, err = sqlDB.DB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s' INCREMENTAL FROM '%s', '%s'`,
dir, fullBackup, incrementalT3FromT1OneTable))
_, err = sqlDB.DB.Exec(`BACKUP DATABASE bench TO $1 INCREMENTAL FROM $2, $3`,
dir, fullBackup, incrementalT3FromT1OneTable)
if !testutils.IsError(err, "no backup covers time") {
t.Errorf("expected 'no backup covers time' error got: %+v", err)
}
Expand All @@ -831,29 +828,29 @@ func TestTimestampMismatch(t *testing.T) {
sqlDB.Exec(`DROP TABLE bench.t2`)
t.Run("Restore", func(t *testing.T) {
// Missing the initial full backup.
_, err := sqlDB.DB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s'`,
incrementalT1FromFull))
_, err := sqlDB.DB.Exec(`RESTORE bench.* FROM $1`,
incrementalT1FromFull)
if !testutils.IsError(err, "no backup covers time") {
t.Errorf("expected 'no backup covers time' error got: %+v", err)
}

// Missing an intermediate incremental backup.
_, err = sqlDB.DB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s', '%s'`,
fullBackup, incrementalT2FromT1))
_, err = sqlDB.DB.Exec(`RESTORE bench.* FROM $1, $2`,
fullBackup, incrementalT2FromT1)
if !testutils.IsError(err, "no backup covers time") {
t.Errorf("expected 'no backup covers time' error got: %+v", err)
}

// Backups specified out of order.
_, err = sqlDB.DB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s', '%s'`,
incrementalT1FromFull, fullBackup))
_, err = sqlDB.DB.Exec(`RESTORE bench.* FROM $1, $2`,
incrementalT1FromFull, fullBackup)
if !testutils.IsError(err, "out of order") {
t.Errorf("expected 'out of order' error got: %+v", err)
}

// Missing data for one table in the most recent backup.
_, err = sqlDB.DB.Exec(fmt.Sprintf(`RESTORE bench.* FROM '%s', '%s'`,
fullBackup, incrementalT3FromT1OneTable))
_, err = sqlDB.DB.Exec(`RESTORE bench.* FROM $1, $2`,
fullBackup, incrementalT3FromT1OneTable)
if !testutils.IsError(err, "no backup covers time") {
t.Errorf("expected 'no backup covers time' error got: %+v", err)
}
Expand Down Expand Up @@ -924,7 +921,7 @@ func TestBackupLevelDB(t *testing.T) {
_, dir, _, sqlDB, cleanupFn := backupRestoreTestSetup(t, singleNode, 0)
defer cleanupFn()

_ = sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, dir))
_ = sqlDB.Exec(`BACKUP DATABASE bench TO $1`, dir)

// Verify that the sstables are in LevelDB format by checking the trailer
// magic.
Expand Down Expand Up @@ -967,15 +964,15 @@ func TestRestoredPrivileges(t *testing.T) {

withGrants := sqlDB.QueryStr(`SHOW GRANTS ON bench.bank`)

sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, dir))
sqlDB.Exec(`BACKUP DATABASE bench TO $1`, dir)
sqlDB.Exec(`DROP TABLE bench.bank`)

t.Run("into fresh db", func(t *testing.T) {
tc := testcluster.StartTestCluster(t, singleNode, base.TestClusterArgs{})
defer tc.Stopper().Stop()
sqlDBRestore := sqlutils.MakeSQLRunner(t, tc.Conns[0])
sqlDBRestore.Exec(`CREATE DATABASE bench`)
sqlDBRestore.Exec(fmt.Sprintf(`RESTORE bench.bank FROM '%s'`, dir))
sqlDBRestore.Exec(`RESTORE bench.bank FROM $1`, dir)
sqlDBRestore.CheckQueryResults(`SHOW GRANTS ON bench.bank`, rootOnly)
})

Expand All @@ -986,7 +983,7 @@ func TestRestoredPrivileges(t *testing.T) {
sqlDBRestore.Exec(`CREATE DATABASE bench`)
sqlDBRestore.Exec(`CREATE USER someone`)
sqlDBRestore.Exec(`GRANT SELECT, INSERT, UPDATE, DELETE ON DATABASE bench TO someone`)
sqlDBRestore.Exec(fmt.Sprintf(`RESTORE bench.bank FROM '%s'`, dir))
sqlDBRestore.Exec(`RESTORE bench.bank FROM $1`, dir)
sqlDBRestore.CheckQueryResults(`SHOW GRANTS ON bench.bank`, withGrants)
})
}
Expand All @@ -1001,7 +998,7 @@ func TestRestoreInto(t *testing.T) {
_, dir, _, sqlDB, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts)
defer cleanupFn()

sqlDB.Exec(fmt.Sprintf(`BACKUP DATABASE bench TO '%s'`, dir))
sqlDB.Exec(`BACKUP DATABASE bench TO $1`, dir)

restoreStmt := fmt.Sprintf(`RESTORE bench.bank FROM '%s' WITH OPTIONS ('into_db'='bench2')`, dir)

Expand Down Expand Up @@ -1058,7 +1055,7 @@ func TestBackupRestorePermissions(t *testing.T) {
// Root doesn't have CREATE on `system` DB, so that should fail. Still need
// a valid `dir` though, since descriptors are always loaded first.
if _, err := sqlDB.DB.Exec(
fmt.Sprintf(`RESTORE bench.bank FROM '%s' WITH OPTIONS ('into_db'='system')`, dir),
`RESTORE bench.bank FROM $1 WITH OPTIONS ('into_db'='system')`, dir,
); !testutils.IsError(err, "user root does not have CREATE privilege") {
t.Fatal(err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/sqlccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,17 @@ func restorePlanHook(
return nil, nil, err
}

fromFn, err := p.TypeAsStringArray(&restore.From)
if err != nil {
return nil, nil, err
}

fn := func() ([]parser.Datums, error) {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(baseCtx, stmt.StatementTag())
defer tracing.FinishSpan(span)

err := Restore(ctx, p, restore.From, restore.Targets, restore.Options)
err := Restore(ctx, p, fromFn(), restore.Targets, restore.Options)
return nil, err
}
return fn, nil, nil
Expand Down
Loading

0 comments on commit 58aab9e

Please sign in to comment.