Skip to content

Commit

Permalink
introduce caveat support in CockroachDB
Browse files Browse the repository at this point in the history
  • Loading branch information
vroldanbet committed Oct 25, 2022
1 parent ef2738c commit 7f6b4d4
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 119 deletions.
19 changes: 19 additions & 0 deletions internal/datastore/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/structpb"

"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
Expand Down Expand Up @@ -50,3 +52,20 @@ func NewCreateRelationshipExistsError(relationship *core.RelationTuple) error {
relationship,
}
}

// ContextualizedCaveatFrom convenience method that handles creation of a contextualized caveat
// given the possibility of arguments with zero-values.
func ContextualizedCaveatFrom(name string, context map[string]any) (*core.ContextualizedCaveat, error) {
var caveat *core.ContextualizedCaveat
if name != "" {
strct, err := structpb.NewStruct(context)
if err != nil {
return nil, fmt.Errorf("malformed caveat context: %w", err)
}
caveat = &core.ContextualizedCaveat{
CaveatName: name,
Context: strct,
}
}
return caveat, nil
}
168 changes: 164 additions & 4 deletions internal/datastore/crdb/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,184 @@ package crdb

import (
"context"
"errors"
"fmt"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

var (
upsertCaveatSuffix = fmt.Sprintf(
"ON CONFLICT (%s) DO UPDATE SET %s = excluded.%s",
colCaveatName,
colCaveatDefinition,
colCaveatDefinition,
)
writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatDefinition).Suffix(upsertCaveatSuffix)
readCaveat = psql.Select(colCaveatDefinition, colTimestamp).From(tableCaveat)
listCaveat = psql.Select(colCaveatName, colCaveatDefinition).From(tableCaveat).OrderBy(colCaveatName)
deleteCaveat = psql.Delete(tableCaveat)
)

const (
errWriteCaveat = "unable to write new caveat revision: %w"
errReadCaveat = "unable to read new caveat `%s`: %w"
errListCaveats = "unable to list caveat: %w"
errDeleteCaveats = "unable to delete caveats: %w"
)

func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
return nil, datastore.NoRevision, fmt.Errorf("unimplemented caveat support in datastore")
ctx, span := tracer.Start(ctx, "ReadCaveatByName", trace.WithAttributes(attribute.String("name", name)))
defer span.End()

query := readCaveat.Where(sq.Eq{colCaveatName: name})
sql, args, err := query.ToSql()
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err)
}

var definitionBytes []byte
var timestamp time.Time
err = cr.executeWithTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
if err := tx.QueryRow(ctx, sql, args...).Scan(&definitionBytes, &timestamp); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
err = datastore.NewCaveatNameNotFoundErr(name)
}
return err
}
return nil
})
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err)
}
loaded := &core.CaveatDefinition{}
if err := loaded.UnmarshalVT(definitionBytes); err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err)
}
cr.addOverlapKey(name)
return loaded, revisionFromTimestamp(timestamp), nil
}

func (cr *crdbReader) ListCaveats(ctx context.Context, caveatNames ...string) ([]*core.CaveatDefinition, error) {
return nil, fmt.Errorf("unimplemented caveat support in datastore")
ctx, span := tracer.Start(ctx, "ListCaveats", trace.WithAttributes(attribute.StringSlice("names", caveatNames)))
defer span.End()

caveatsWithNames := listCaveat
if len(caveatNames) > 0 {
caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames})
}

sql, args, err := caveatsWithNames.ToSql()
if err != nil {
return nil, fmt.Errorf(errListCaveats, err)
}
var allDefinitionBytes [][]byte
err = cr.executeWithTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
rows, err := tx.Query(ctx, sql, args...)
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var defBytes []byte
var name string
err = rows.Scan(&name, &defBytes)
if err != nil {
return err
}
allDefinitionBytes = append(allDefinitionBytes, defBytes)
cr.addOverlapKey(name)
}
return nil
})
if err != nil {
return nil, fmt.Errorf(errListCaveats, err)
}

caveats := make([]*core.CaveatDefinition, 0, len(allDefinitionBytes))
for _, defBytes := range allDefinitionBytes {
loaded := &core.CaveatDefinition{}
if err := loaded.UnmarshalVT(defBytes); err != nil {
return nil, fmt.Errorf(errListCaveats, err)
}
caveats = append(caveats, loaded)
}

return caveats, nil
}

func (rwt *crdbReadWriteTXN) WriteCaveats(caveats []*core.CaveatDefinition) error {
return fmt.Errorf("unimplemented caveat support in datastore")
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "WriteCaveats")
defer span.End()

write := writeCaveat
writtenCaveatNames := make([]string, 0, len(caveats))
for _, caveat := range caveats {
definitionBytes, err := caveat.MarshalVT()
if err != nil {
return fmt.Errorf(errWriteCaveat, err)
}
valuesToWrite := []any{caveat.Name, definitionBytes}
write = write.Values(valuesToWrite...)
writtenCaveatNames = append(writtenCaveatNames, caveat.Name)
}
span.SetAttributes(common.CaveatNameKey.StringSlice(writtenCaveatNames))

// store the new caveat
sql, args, err := write.ToSql()
if err != nil {
return fmt.Errorf(errWriteCaveat, err)
}

for _, val := range writtenCaveatNames {
rwt.addOverlapKey(val)
}
return rwt.executeWithTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf(errWriteCaveat, err)
}
return nil
})
}

func (rwt *crdbReadWriteTXN) DeleteCaveats(names []string) error {
return fmt.Errorf("unimplemented caveat support in datastore")
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(rwt.ctx), "DeleteCaveats",
trace.WithAttributes(attribute.StringSlice("names", names)))
defer span.End()

deleteCaveatClause := deleteCaveat.Where(sq.Eq{colCaveatName: names})
sql, args, err := deleteCaveatClause.ToSql()
if err != nil {
return fmt.Errorf(errDeleteCaveats, err)
}
for _, val := range names {
rwt.addOverlapKey(val)
}
return rwt.executeWithTx(ctx, func(ctx context.Context, tx pgx.Tx) error {
if _, err := tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf(errDeleteCaveats, err)
}
return nil
})
}

func (cr *crdbReader) executeWithTx(ctx context.Context, f func(ctx context.Context, tx pgx.Tx) error) error {
return cr.execute(ctx, func(ctx context.Context) error {
tx, txCleanup, err := cr.txSource(ctx)
if err != nil {
return err
}
defer txCleanup(ctx)

return f(ctx, tx)
})
}
29 changes: 17 additions & 12 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ const (
tableNamespace = "namespace_config"
tableTuple = "relation_tuple"
tableTransactions = "transactions"

colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
tableCaveat = "caveat"

colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"

errUnableToInstantiate = "unable to instantiate datastore: %w"
errRevision = "unable to find revision: %w"
Expand Down Expand Up @@ -213,7 +218,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(createTxFunc, false),
Executor: pgxcommon.NewPGXExecutor(createTxFunc),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand All @@ -234,7 +239,7 @@ func (cds *crdbDatastore) ReadWriteTx(
}

querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(longLivedTx, false),
Executor: pgxcommon.NewPGXExecutor(longLivedTx),
UsersetBatchSize: cds.usersetBatchSize,
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v4"
)

const (
createCaveatTable = `CREATE TABLE caveat (
name VARCHAR NOT NULL,
definition BYTEA NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE DEFAULT now() NOT NULL,
CONSTRAINT pk_caveat_v1 PRIMARY KEY (name)
);`

addRelationshipCaveatContext = `ALTER TABLE relation_tuple
ADD COLUMN caveat_name VARCHAR,
ADD COLUMN caveat_context JSONB;`
)

func init() {
if err := CRDBMigrations.Register("add-caveats", "add-metadata-and-counters", noNonatomicMigration, func(ctx context.Context, tx pgx.Tx) error {
if _, err := tx.Exec(ctx, createCaveatTable); err != nil {
return err
}
if _, err := tx.Exec(ctx, addRelationshipCaveatContext); err != nil {
return err
}
return nil
}); err != nil {
panic("failed to register migration: " + err.Error())
}
}
3 changes: 3 additions & 0 deletions internal/datastore/crdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
colCaveatContextName,
colCaveatContext,
).From(tableTuple)

schema = common.SchemaInformation{
Expand All @@ -41,6 +43,7 @@ var (
ColUsersetNamespace: colUsersetNamespace,
ColUsersetObjectID: colUsersetObjectID,
ColUsersetRelation: colUsersetRelation,
ColCaveatName: colCaveatContextName,
}
)

Expand Down
17 changes: 15 additions & 2 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,18 @@ type crdbReadWriteTXN struct {

var (
upsertTupleSuffix = fmt.Sprintf(
"ON CONFLICT (%s,%s,%s,%s,%s,%s) DO UPDATE SET %s = now()",
"ON CONFLICT (%s,%s,%s,%s,%s,%s) DO UPDATE SET %s = now(), %s = excluded.%s, %s = excluded.%s",
colNamespace,
colObjectID,
colRelation,
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
colTimestamp,
colCaveatContextName,
colCaveatContextName,
colCaveatContext,
colCaveatContext,
)

queryWriteTuple = psql.Insert(tableTuple).Columns(
Expand All @@ -67,6 +71,8 @@ var (
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
colCaveatContextName,
colCaveatContext,
)

queryTouchTuple = queryWriteTuple.Suffix(upsertTupleSuffix)
Expand Down Expand Up @@ -96,8 +102,11 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleU
for _, mutation := range mutations {
rel := mutation.Tuple

var caveatContext map[string]any
var caveatName string
if rel.Caveat != nil {
panic("caveats not currently supported in CRDB datastore")
caveatName = rel.Caveat.CaveatName
caveatContext = rel.Caveat.Context.AsMap()
}

rwt.addOverlapKey(rel.ResourceAndRelation.Namespace)
Expand All @@ -113,6 +122,8 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleU
rel.Subject.Namespace,
rel.Subject.ObjectId,
rel.Subject.Relation,
caveatName,
caveatContext,
)
bulkTouchCount++
case core.RelationTupleUpdate_CREATE:
Expand All @@ -124,6 +135,8 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleU
rel.Subject.Namespace,
rel.Subject.ObjectId,
rel.Subject.Relation,
caveatName,
caveatContext,
)
bulkWriteCount++
case core.RelationTupleUpdate_DELETE:
Expand Down
Loading

0 comments on commit 7f6b4d4

Please sign in to comment.