Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: owning a schema gives privilege to drop tables in schema #52740

Merged
merged 1 commit into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pkg/sql/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,41 @@ func (p *planner) checkCanAlterToNewOwner(

return nil
}

// HasOwnershipOnSchema checks if the current user has ownership on the schema.
// For schemas, we cannot always use HasOwnership as not every schema has a
// descriptor.
func (p *planner) HasOwnershipOnSchema(ctx context.Context, schemaID descpb.ID) (bool, error) {
resolvedSchema, err := p.Descriptors().ResolveSchemaByID(
ctx, p.Txn(), schemaID,
)
if err != nil {
return false, err
}

hasOwnership := false
switch resolvedSchema.Kind {
case catalog.SchemaPublic:
// admin is the owner of the public schema.
hasOwnership, err = p.UserHasAdminRole(ctx, p.User())
if err != nil {
return false, err
}
case catalog.SchemaVirtual:
// Cannot drop on virtual schemas.
case catalog.SchemaTemporary:
// The user only owns the temporary schema that corresponds to the
// TemporarySchemaID in the sessionData.
hasOwnership = p.SessionData() != nil &&
p.SessionData().TemporarySchemaID == uint32(resolvedSchema.ID)
case catalog.SchemaUserDefined:
hasOwnership, err = p.HasOwnership(ctx, resolvedSchema.Desc)
if err != nil {
return false, err
}
default:
panic(errors.AssertionFailedf("unknown schema kind %d", resolvedSchema.Kind))
}

return hasOwnership, nil
}
1 change: 1 addition & 0 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ func (tc *Collection) ResolveSchemaByID(

// If this collection is attached to a session and the session has created
// a temporary schema, then check if the schema ID matches.
// This check doesn't work after switching databases. Issue #53163.
if tc.sessionData != nil && tc.sessionData.TemporarySchemaID == uint32(schemaID) {
return catalog.ResolvedSchema{
Kind: catalog.SchemaTemporary,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (d *dropCascadeState) resolveCollectedObjects(
" dropped or made public before dropping database %s",
objName.FQString(), tree.AsString((*tree.Name)(&dbName)))
}
if err := p.prepareDropWithTableDesc(ctx, tbDesc); err != nil {
if err := p.canDropTable(ctx, tbDesc); err != nil {
return err
}
// Recursively check permissions on all dependent views, since some may
Expand Down
29 changes: 20 additions & 9 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,31 @@ func (p *planner) prepareDrop(
if tableDesc == nil {
return nil, err
}
if err := p.prepareDropWithTableDesc(ctx, tableDesc); err != nil {
if err := p.canDropTable(ctx, tableDesc); err != nil {
return nil, err
}
return tableDesc, nil
}

// prepareDropWithTableDesc behaves as prepareDrop, except it assumes the
// table descriptor is already fetched. This is useful for DropDatabase,
// as prepareDrop requires resolving a TableName when DropDatabase already
// has it resolved.
func (p *planner) prepareDropWithTableDesc(
ctx context.Context, tableDesc *tabledesc.Mutable,
) error {
return p.CheckPrivilege(ctx, tableDesc, privilege.DROP)
// canDropTable returns an error if the user cannot drop the table.
func (p *planner) canDropTable(ctx context.Context, tableDesc *tabledesc.Mutable) error {
// Don't check for ownership if the table is a temp table.
// ResolveSchema currently doesn't work for temp schemas.
// Hack until #53163 is fixed.
hasOwnership := false
var err error
if !tableDesc.Temporary {
// If the user owns the schema the table is part of, they can drop the table.
hasOwnership, err = p.HasOwnershipOnSchema(ctx, tableDesc.GetParentSchemaID())
if err != nil {
return err
}
}
if !hasOwnership {
return p.CheckPrivilege(ctx, tableDesc, privilege.DROP)
}

return nil
}

// canRemoveFKBackReference returns an error if the input backreference isn't
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/drop_table
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,22 @@ CREATE TABLE a (id INT PRIMARY KEY)
query I
SELECT * FROM a
----

user testuser

statement ok
SET experimental_enable_user_defined_schemas = true

statement ok
CREATE SCHEMA s

user root

statement ok
CREATE TABLE s.t()

user testuser

# Being the owner of schema s should allow testuser to drop table s.t.
statement ok
DROP TABLE s.t