Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: handle dropping interleaved tables and indexes #8043

Merged
merged 1 commit into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,21 +392,15 @@ func (sc *SchemaChanger) truncateIndexes(
return nil
}

indexPrefix := sqlbase.MakeIndexKeyPrefix(tableDesc, desc.ID)

// Delete the index.
indexStartKey := roachpb.Key(indexPrefix)
indexEndKey := indexStartKey.PrefixEnd()
if log.V(2) {
log.Infof(context.TODO(), "DelRange %s - %s", indexStartKey, indexEndKey)
rd, err := makeRowDeleter(txn, tableDesc, nil, nil, false)
if err != nil {
return err
}
b := &client.Batch{}
b.DelRange(indexStartKey, indexEndKey, false)

if err := txn.Run(b); err != nil {
td := tableDeleter{rd: rd}
if err := td.init(txn); err != nil {
return err
}
return nil
return td.deleteIndex(&desc)
}); err != nil {
return err
}
Expand Down
12 changes: 9 additions & 3 deletions sql/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,15 @@ func (p *planner) finalizeInterleave(
}
// Only the last ancestor needs the backreference.
ancestor := index.Interleave.Ancestors[len(index.Interleave.Ancestors)-1]
ancestorTable, err := sqlbase.GetTableDescFromID(p.txn, ancestor.TableID)
if err != nil {
return err
var ancestorTable *sqlbase.TableDescriptor
if ancestor.TableID == desc.ID {
ancestorTable = desc
} else {
var err error
ancestorTable, err = sqlbase.GetTableDescFromID(p.txn, ancestor.TableID)
if err != nil {
return err
}
}
ancestorIndex, err := ancestorTable.FindIndexByID(ancestor.IndexID)
if err != nil {
Expand Down
87 changes: 86 additions & 1 deletion sql/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/privilege"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -222,6 +223,11 @@ func (n *dropIndexNode) Start() error {
return err
}
}
if len(idx.Interleave.Ancestors) > 0 {
if err := n.p.removeInterleaveBackReference(tableDesc, idx); err != nil {
return err
}
}

for _, ref := range idx.ReferencedBy {
fetched, err := n.p.canRemoveFK(idx.Name, ref, n.n.DropBehavior)
Expand All @@ -232,6 +238,11 @@ func (n *dropIndexNode) Start() error {
return err
}
}
for _, ref := range idx.InterleavedBy {
if err := n.p.removeInterleave(ref); err != nil {
return err
}
}

tableDesc.AddIndexMutation(tableDesc.Indexes[i], sqlbase.DescriptorMutation_DROP)
tableDesc.Indexes = append(tableDesc.Indexes[:i], tableDesc.Indexes[i+1:]...)
Expand Down Expand Up @@ -320,6 +331,11 @@ func (p *planner) DropTable(n *parser.DropTable) (planNode, error) {
return nil, err
}
}
for _, ref := range idx.InterleavedBy {
if err := p.canRemoveInterleave(droppedDesc.Name, ref, n.DropBehavior); err != nil {
return nil, err
}
}
}
td = append(td, droppedDesc)
}
Expand Down Expand Up @@ -350,6 +366,29 @@ func (p *planner) canRemoveFK(
return table, nil
}

func (p *planner) canRemoveInterleave(
from string, ref sqlbase.ForeignKeyReference, behavior parser.DropBehavior,
) error {
table, err := sqlbase.GetTableDescFromID(p.txn, ref.Table)
if err != nil {
return err
}
// TODO(dan): It's possible to DROP a table that has a child interleave, but
// some loose ends would have to be addresssed. The zone would have to be
// kept and deleted when the last table in it is removed. Also, the dropped
// table's descriptor would have to be kept around in some Dropped but
// non-public state for referential integrity of the `InterleaveDescriptor`
// pointers.
if behavior != parser.DropCascade {
return util.UnimplementedWithIssueErrorf(
8036, "%q is interleaved by table %q", from, table.Name)
}
if err := p.checkPrivilege(table, privilege.CREATE); err != nil {
return err
}
return nil
}

func (p *planner) removeFK(ref *sqlbase.ForeignKeyReference, table *sqlbase.TableDescriptor) error {
if table == nil {
var err error
Expand All @@ -366,6 +405,19 @@ func (p *planner) removeFK(ref *sqlbase.ForeignKeyReference, table *sqlbase.Tabl
return p.saveNonmutationAndNotify(table)
}

func (p *planner) removeInterleave(ref sqlbase.ForeignKeyReference) error {
table, err := sqlbase.GetTableDescFromID(p.txn, ref.Table)
if err != nil {
return err
}
idx, err := table.FindIndexByID(ref.Index)
if err != nil {
return err
}
idx.Interleave.Ancestors = nil
return p.saveNonmutationAndNotify(table)
}

func (n *dropTableNode) Start() error {
for _, droppedDesc := range n.td {
if droppedDesc == nil {
Expand Down Expand Up @@ -443,19 +495,29 @@ func (p *planner) dropTableImpl(tableDesc *sqlbase.TableDescriptor) error {
}
p.notifySchemaChange(tableDesc.ID, sqlbase.InvalidMutationID)

// Remove FK relationships.
// Remove FK and interleave relationships.
for _, idx := range tableDesc.AllNonDropIndexes() {
if idx.ForeignKey != nil {
if err := p.removeFKBackReference(tableDesc, idx); err != nil {
return err
}
}
if len(idx.Interleave.Ancestors) > 0 {
if err := p.removeInterleaveBackReference(tableDesc, idx); err != nil {
return err
}
}
for _, ref := range idx.ReferencedBy {
// Nil forces re-fetching tables, since they may have been modified.
if err := p.removeFK(ref, nil); err != nil {
return err
}
}
for _, ref := range idx.InterleavedBy {
if err := p.removeInterleave(ref); err != nil {
return err
}
}
}

verifyMetadataCallback := func(systemConfig config.SystemConfig, tableID sqlbase.ID) error {
Expand Down Expand Up @@ -497,6 +559,29 @@ func (p *planner) removeFKBackReference(
return p.saveNonmutationAndNotify(t)
}

func (p *planner) removeInterleaveBackReference(
tableDesc *sqlbase.TableDescriptor, idx sqlbase.IndexDescriptor,
) error {
if len(idx.Interleave.Ancestors) == 0 {
return nil
}
ancestor := idx.Interleave.Ancestors[len(idx.Interleave.Ancestors)-1]
t, err := sqlbase.GetTableDescFromID(p.txn, ancestor.TableID)
if err != nil {
return errors.Errorf("error resolving referenced table ID %d: %v", ancestor.TableID, err)
}
targetIdx, err := t.FindIndexByID(ancestor.IndexID)
if err != nil {
return err
}
for k, ref := range targetIdx.InterleavedBy {
if ref.Table == tableDesc.ID && ref.Index == idx.ID {
targetIdx.InterleavedBy = append(targetIdx.InterleavedBy[:k], targetIdx.InterleavedBy[k+1:]...)
}
}
return p.saveNonmutationAndNotify(t)
}

// truncateAndDropTable batches all the commands required for truncating and deleting the
// table descriptor.
// It is called from a mutation, async wrt the DROP statement.
Expand Down
20 changes: 20 additions & 0 deletions sql/rowwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,26 @@ func (rd *rowDeleter) deleteRow(b *client.Batch, values []parser.Datum) error {
return nil
}

// deleteIndexRow adds to the batch the kv operations necessary to delete a
// table row from the given index.
func (rd *rowDeleter) deleteIndexRow(
b *client.Batch, idx *sqlbase.IndexDescriptor, values []parser.Datum,
) error {
if err := rd.fks.checkAll(values); err != nil {
return err
}
secondaryIndexEntry, err := sqlbase.EncodeSecondaryIndex(
rd.helper.tableDesc, idx, rd.fetchColIDtoRowIndex, values)
if err != nil {
return err
}
if log.V(2) {
log.Infof(context.TODO(), "Del %s", secondaryIndexEntry.Key)
}
b.Del(secondaryIndexEntry.Key)
return nil
}

func colIDtoRowIndexFromCols(cols []sqlbase.ColumnDescriptor) map[sqlbase.ColumnID]int {
colIDtoRowIndex := make(map[sqlbase.ColumnID]int, len(cols))
for i, col := range cols {
Expand Down
131 changes: 131 additions & 0 deletions sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/log"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -578,3 +579,133 @@ func (td *tableDeleter) fastDelete(
td.b = nil
return rowCount, nil
}

// deleteAllRows runs the kv operations necessary to delete all sql rows in the
// table passed at construction. This may require a scan.
func (td *tableDeleter) deleteAllRows() error {
if td.rd.helper.tableDesc.IsInterleaved() {
if log.V(2) {
log.Info(context.TODO(), "delete forced to scan: table is interleaved")
}
return td.deleteAllRowsScan()
}
return td.deleteAllRowsFast()
}

func (td *tableDeleter) deleteAllRowsFast() error {
var tablePrefix []byte
// TODO(dan): This should be moved into keys.MakeTablePrefix, but updating
// all the uses of that will be a pain.
if interleave := td.rd.helper.tableDesc.PrimaryIndex.Interleave; len(interleave.Ancestors) > 0 {
tablePrefix = encoding.EncodeUvarintAscending(nil, uint64(interleave.Ancestors[0].TableID))
}
tablePrefix = encoding.EncodeUvarintAscending(nil, uint64(td.rd.helper.tableDesc.ID))

// Delete rows and indexes starting with the table's prefix.
tableStartKey := roachpb.Key(tablePrefix)
tableEndKey := tableStartKey.PrefixEnd()
if log.V(2) {
log.Infof(context.TODO(), "DelRange %s - %s", tableStartKey, tableEndKey)
}
td.b.DelRange(tableStartKey, tableEndKey, false)
return td.finalize()
}

func (td *tableDeleter) deleteAllRowsScan() error {
tablePrefix := sqlbase.MakeIndexKeyPrefix(
td.rd.helper.tableDesc, td.rd.helper.tableDesc.PrimaryIndex.ID)
span := sqlbase.Span{Start: roachpb.Key(tablePrefix), End: roachpb.Key(tablePrefix).PrefixEnd()}

valNeededForCol := make([]bool, len(td.rd.helper.tableDesc.Columns))
for _, idx := range td.rd.fetchColIDtoRowIndex {
valNeededForCol[idx] = true
}

var rf sqlbase.RowFetcher
err := rf.Init(
td.rd.helper.tableDesc, td.rd.fetchColIDtoRowIndex, &td.rd.helper.tableDesc.PrimaryIndex,
false, false, td.rd.fetchCols, valNeededForCol)
if err != nil {
return err
}
if err := rf.StartScan(td.txn, sqlbase.Spans{span}, 0); err != nil {
return err
}

for {
row, err := rf.NextRow()
if err != nil {
return err
}
if row == nil {
// Done deleting rows.
break
}
_, err = td.row(row)
if err != nil {
return err
}
}
return td.finalize()
}

// deleteIndex runs the kv operations necessary to delete all kv entries in the
// given index. This may require a scan.
func (td *tableDeleter) deleteIndex(idx *sqlbase.IndexDescriptor) error {
if len(idx.Interleave.Ancestors) > 0 || len(idx.InterleavedBy) > 0 {
if log.V(2) {
log.Info(context.TODO(), "delete forced to scan: table is interleaved")
}
return td.deleteIndexScan(idx)
}
return td.deleteIndexFast(idx)
}

func (td *tableDeleter) deleteIndexFast(idx *sqlbase.IndexDescriptor) error {
indexPrefix := sqlbase.MakeIndexKeyPrefix(td.rd.helper.tableDesc, idx.ID)
indexStartKey := roachpb.Key(indexPrefix)
indexEndKey := indexStartKey.PrefixEnd()

if log.V(2) {
log.Infof(context.TODO(), "DelRange %s - %s", indexStartKey, indexEndKey)
}
td.b.DelRange(indexStartKey, indexEndKey, false)
return td.finalize()
}

func (td *tableDeleter) deleteIndexScan(idx *sqlbase.IndexDescriptor) error {
tablePrefix := sqlbase.MakeIndexKeyPrefix(
td.rd.helper.tableDesc, td.rd.helper.tableDesc.PrimaryIndex.ID)
span := sqlbase.Span{Start: roachpb.Key(tablePrefix), End: roachpb.Key(tablePrefix).PrefixEnd()}

valNeededForCol := make([]bool, len(td.rd.helper.tableDesc.Columns))
for _, idx := range td.rd.fetchColIDtoRowIndex {
valNeededForCol[idx] = true
}

var rf sqlbase.RowFetcher
err := rf.Init(
td.rd.helper.tableDesc, td.rd.fetchColIDtoRowIndex, &td.rd.helper.tableDesc.PrimaryIndex,
false, false, td.rd.fetchCols, valNeededForCol)
if err != nil {
return err
}
if err := rf.StartScan(td.txn, sqlbase.Spans{span}, 0); err != nil {
return err
}

for {
row, err := rf.NextRow()
if err != nil {
return err
}
if row == nil {
// Done deleting rows.
break
}
if err := td.rd.deleteIndexRow(td.b, idx, row); err != nil {
return err
}
}
return td.finalize()
}
Loading