Skip to content

Commit

Permalink
*,sqlbase,descpb: pull protobufs out of sqlbase
Browse files Browse the repository at this point in the history
This commit is part of the process to better abstract descriptors. In this
step the protocol buffers which used to live in sqlbase have been extracted
to a new package `pkg/sql/catalog/descpb`.

Part of this change has required adopting wrappers around descriptors.
This change has proven to be a rather large change and I appologize to the
reviewers but am at a loss on how to split this one up.

Release note: None
  • Loading branch information
ajwerner committed Aug 5, 2020
1 parent 3b193fe commit ab161ab
Show file tree
Hide file tree
Showing 453 changed files with 10,515 additions and 9,620 deletions.
293 changes: 147 additions & 146 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import "build/info.proto";
import "roachpb/api.proto";
import "roachpb/data.proto";
import "sql/stats/table_statistic.proto";
import "sql/sqlbase/structured.proto";
import "sql/catalog/descpb/structured.proto";
import "util/hlc/timestamp.proto";
import "gogoproto/gogo.proto";

Expand Down Expand Up @@ -57,7 +57,7 @@ message BackupManifest {

message DescriptorRevision {
util.hlc.Timestamp time = 1 [(gogoproto.nullable) = false];
uint32 ID = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ID"];
uint32 ID = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
sql.sqlbase.Descriptor desc = 3;
}

Expand Down Expand Up @@ -98,7 +98,7 @@ message BackupManifest {
repeated Tenant tenants = 24 [(gogoproto.nullable) = false];
// databases in descriptors that have all tables also in descriptors.
repeated uint32 complete_dbs = 14 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ID"];
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
reserved 6;
RowCount entry_counts = 12 [(gogoproto.nullable) = false];

Expand All @@ -124,7 +124,7 @@ message BackupManifest {
// indicated in the table_statistic_files field.
repeated sql.stats.TableStatisticProto deprecated_statistics = 21;
map<uint32, string> statistics_filenames = 23 [
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/sqlbase.ID"
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];
int32 descriptor_coverage = 22 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.DescriptorCoverage"];
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
Expand Down Expand Up @@ -271,8 +272,8 @@ func backup(
})

pkIDs := make(map[uint64]bool)
for _, desc := range backupManifest.Descriptors {
if t := desc.Table(hlc.Timestamp{}); t != nil {
for i := range backupManifest.Descriptors {
if t := sqlbase.TableFromDescriptor(&backupManifest.Descriptors[i], hlc.Timestamp{}); t != nil {
pkIDs[roachpb.BulkOpSummaryID(uint64(t.ID), uint64(t.PrimaryIndex.ID))] = true
}
}
Expand Down Expand Up @@ -344,8 +345,8 @@ func backup(
return RowCount{}, err
}
var tableStatistics []*stats.TableStatisticProto
for _, desc := range backupManifest.Descriptors {
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
for i := range backupManifest.Descriptors {
if tableDesc := sqlbase.TableFromDescriptor(&backupManifest.Descriptors[i], hlc.Timestamp{}); tableDesc != nil {
// Collect all the table stats for this table.
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc.GetID())
if err != nil {
Expand Down
132 changes: 68 additions & 64 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -89,8 +90,8 @@ var useTBI = settings.RegisterBoolSetting(
)

type tableAndIndex struct {
tableID sqlbase.ID
indexID sqlbase.IndexID
tableID descpb.ID
indexID descpb.IndexID
}

type backupKMSEnv struct {
Expand Down Expand Up @@ -162,17 +163,14 @@ func (e *encryptedDataKeyMap) rangeOverMap(fn func(masterKeyID hashedMasterKeyID
// spansForAllTableIndexes returns non-overlapping spans for every index and
// table passed in. They would normally overlap if any of them are interleaved.
func spansForAllTableIndexes(
codec keys.SQLCodec,
tables []sqlbase.TableDescriptorInterface,
revs []BackupManifest_DescriptorRevision,
codec keys.SQLCodec, tables []sqlbase.TableDescriptor, revs []BackupManifest_DescriptorRevision,
) []roachpb.Span {

added := make(map[tableAndIndex]bool, len(tables))
sstIntervalTree := interval.NewTree(interval.ExclusiveOverlapper)
for _, table := range tables {
tableDesc := table.TableDesc()
for _, index := range tableDesc.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(tableDesc.IndexSpan(codec, index.ID)), false); err != nil {
for _, index := range table.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(codec, index.ID)), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[tableAndIndex{tableID: table.GetID(), indexID: index.ID}] = true
Expand All @@ -188,7 +186,9 @@ func spansForAllTableIndexes(
// entire interval. DROPPED tables should never later become PUBLIC.
// TODO(pbardea): Consider and test the interaction between revision_history
// backups and OFFLINE tables.
if tbl := rev.Desc.Table(hlc.Timestamp{}); tbl != nil && tbl.State != sqlbase.TableDescriptor_DROP {
rawTbl := sqlbase.TableFromDescriptor(rev.Desc, hlc.Timestamp{})
if rawTbl != nil && rawTbl.State != descpb.TableDescriptor_DROP {
tbl := sqlbase.NewImmutableTableDescriptor(*rawTbl)
for _, idx := range tbl.AllNonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID}
if !added[key] {
Expand Down Expand Up @@ -519,27 +519,23 @@ func backupPlanHook(
return err
}

var tables []sqlbase.TableDescriptorInterface
statsFiles := make(map[sqlbase.ID]string)
var tables []sqlbase.TableDescriptor
statsFiles := make(map[descpb.ID]string)
for _, desc := range targetDescs {
if dbDesc := desc.GetDatabase(); dbDesc != nil {
db := sqlbase.NewImmutableDatabaseDescriptor(*dbDesc)
if err := p.CheckPrivilege(ctx, db, privilege.SELECT); err != nil {
switch desc := desc.(type) {
case sqlbase.DatabaseDescriptor:
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
}
}
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
// TODO(ajwerner): This construction of a wrapper is unfortunate and should
// go away in this PR.
table := sqlbase.NewImmutableTableDescriptor(*tableDesc)
if err := p.CheckPrivilege(ctx, table, privilege.SELECT); err != nil {
case sqlbase.TableDescriptor:
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
}
tables = append(tables, table)
tables = append(tables, desc)

// TODO (anzo): look into the tradeoffs of having all objects in the array to be in the same file,
// vs having each object in a separate file, or somewhere in between.
statsFiles[tableDesc.GetID()] = BackupStatisticsFileName
statsFiles[desc.GetID()] = BackupStatisticsFileName
}
}

Expand Down Expand Up @@ -742,11 +738,11 @@ func backupPlanHook(
startTime = prevBackups[len(prevBackups)-1].EndTime
}

var priorIDs map[sqlbase.ID]sqlbase.ID
var priorIDs map[descpb.ID]descpb.ID

var revs []BackupManifest_DescriptorRevision
if mvccFilter == MVCCFilter_All {
priorIDs = make(map[sqlbase.ID]sqlbase.ID)
priorIDs = make(map[descpb.ID]descpb.ID)
revs, err = getRelevantDescChanges(ctx, p.ExecCfg().DB, startTime, endTime, targetDescs, completeDBs, priorIDs)
if err != nil {
return err
Expand Down Expand Up @@ -787,10 +783,11 @@ func backupPlanHook(
}

if len(prevBackups) > 0 {
tablesInPrev := make(map[sqlbase.ID]struct{})
dbsInPrev := make(map[sqlbase.ID]struct{})
for _, d := range prevBackups[len(prevBackups)-1].Descriptors {
if t := d.Table(hlc.Timestamp{}); t != nil {
tablesInPrev := make(map[descpb.ID]struct{})
dbsInPrev := make(map[descpb.ID]struct{})
rawDescs := prevBackups[len(prevBackups)-1].Descriptors
for i := range rawDescs {
if t := sqlbase.TableFromDescriptor(&rawDescs[i], hlc.Timestamp{}); t != nil {
tablesInPrev[t.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -838,12 +835,16 @@ func backupPlanHook(
// mis-handled, but is disallowed above. IntroducedSpans may also be lost by
// a 1.x node, meaning that if 1.1 nodes may resume a backup, the limitation
// of requiring full backups after schema changes remains.
descriptorProtos := make([]descpb.Descriptor, 0, len(targetDescs))
for _, desc := range targetDescs {
descriptorProtos = append(descriptorProtos, *desc.DescriptorProto())
}

backupManifest := BackupManifest{
StartTime: startTime,
EndTime: endTime,
MVCCFilter: mvccFilter,
Descriptors: targetDescs,
Descriptors: descriptorProtos,
Tenants: tenants,
DescriptorChanges: revs,
CompleteDbs: completeDBs,
Expand Down Expand Up @@ -999,9 +1000,10 @@ func backupPlanHook(
jr := jobs.Record{
Description: description,
Username: p.User(),
DescriptorIDs: func() (sqlDescIDs []sqlbase.ID) {
for _, sqlDesc := range backupManifest.Descriptors {
sqlDescIDs = append(sqlDescIDs, sqlDesc.GetID())
DescriptorIDs: func() (sqlDescIDs []descpb.ID) {
for i := range backupManifest.Descriptors {
sqlDescIDs = append(sqlDescIDs,
sqlbase.GetDescriptorID(&backupManifest.Descriptors[i]))
}
return sqlDescIDs
}(),
Expand Down Expand Up @@ -1131,46 +1133,48 @@ func checkForNewTables(
ctx context.Context,
db *kv.DB,
targetDescs []sqlbase.Descriptor,
tablesInPrev map[sqlbase.ID]struct{},
dbsInPrev map[sqlbase.ID]struct{},
priorIDs map[sqlbase.ID]sqlbase.ID,
tablesInPrev map[descpb.ID]struct{},
dbsInPrev map[descpb.ID]struct{},
priorIDs map[descpb.ID]descpb.ID,
startTime hlc.Timestamp,
endTime hlc.Timestamp,
) error {
for _, d := range targetDescs {
if t := d.Table(hlc.Timestamp{}); t != nil {
// If we're trying to use a previous backup for this table, ideally it
// actually contains this table.
if _, ok := tablesInPrev[t.ID]; ok {
continue
t, ok := d.(sqlbase.TableDescriptor)
if !ok {
continue
}
// If we're trying to use a previous backup for this table, ideally it
// actually contains this table.
if _, ok := tablesInPrev[t.GetID()]; ok {
continue
}
// This table isn't in the previous backup... maybe was added to a
// DB that the previous backup captured?
if _, ok := dbsInPrev[t.GetParentID()]; ok {
continue
}
// Maybe this table is missing from the previous backup because it was
// truncated?
if replacement := t.GetReplacementOf(); replacement.ID != descpb.InvalidID {
// Check if we need to lazy-load the priorIDs (i.e. if this is the first
// truncate we've encountered in non-MVCC backup).
if priorIDs == nil {
priorIDs = make(map[descpb.ID]descpb.ID)
_, err := getAllDescChanges(ctx, db, startTime, endTime, priorIDs)
if err != nil {
return err
}
}
// This table isn't in the previous backup... maybe was added to a
// DB that the previous backup captured?
if _, ok := dbsInPrev[t.ParentID]; ok {
continue
found := false
for was := replacement.ID; was != descpb.InvalidID && !found; was = priorIDs[was] {
_, found = tablesInPrev[was]
}
// Maybe this table is missing from the previous backup because it was
// truncated?
if t.ReplacementOf.ID != sqlbase.InvalidID {
// Check if we need to lazy-load the priorIDs (i.e. if this is the first
// truncate we've encountered in non-MVCC backup).
if priorIDs == nil {
priorIDs = make(map[sqlbase.ID]sqlbase.ID)
_, err := getAllDescChanges(ctx, db, startTime, endTime, priorIDs)
if err != nil {
return err
}
}
found := false
for was := t.ReplacementOf.ID; was != sqlbase.InvalidID && !found; was = priorIDs[was] {
_, found = tablesInPrev[was]
}
if found {
continue
}
if found {
continue
}
return errors.Errorf("previous backup does not contain table %q", t.Name)
}
return errors.Errorf("previous backup does not contain table %q", t.GetName())
}
return nil
}
Expand Down
Loading

0 comments on commit ab161ab

Please sign in to comment.