Skip to content

Commit

Permalink
feat(metastore): update mock metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
lianxmfor committed Nov 16, 2021
1 parent ea88174 commit 1fa9653
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 22 deletions.
126 changes: 125 additions & 1 deletion internal/database/metadata/mock_metadata/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions internal/database/metadata/postgres/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/oom-ai/oomstore/internal/database/metadata"
)

func createEntity(ctx context.Context, exec metadata.ExecContext, opt metadata.CreateEntityOpt) (int16, error) {
func createEntity(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.CreateEntityOpt) (int16, error) {
var entityId int16
query := "insert into feature_entity(name, length, description) values($1, $2, $3) returning id"
err := exec.GetContext(ctx, &entityId, query, opt.Name, opt.Length, opt.Description)
err := sqlxCtx.GetContext(ctx, &entityId, query, opt.Name, opt.Length, opt.Description)
if er, ok := err.(*pq.Error); ok {
if er.Code == pgerrcode.UniqueViolation {
return 0, fmt.Errorf("entity %s already exists", opt.Name)
Expand All @@ -21,9 +21,9 @@ func createEntity(ctx context.Context, exec metadata.ExecContext, opt metadata.C
return entityId, err
}

func updateEntity(ctx context.Context, exec metadata.ExecContext, opt metadata.UpdateEntityOpt) error {
func updateEntity(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.UpdateEntityOpt) error {
query := "UPDATE feature_entity SET description = $1 WHERE id = $2"
result, err := exec.ExecContext(ctx, query, opt.NewDescription, opt.EntityID)
result, err := sqlxCtx.ExecContext(ctx, query, opt.NewDescription, opt.EntityID)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions internal/database/metadata/postgres/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"github.com/oom-ai/oomstore/internal/database/metadata"
)

func createFeature(ctx context.Context, exec metadata.ExecContext, opt metadata.CreateFeatureOpt) (int16, error) {
if err := validateDataType(ctx, exec, opt.DBValueType); err != nil {
func createFeature(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.CreateFeatureOpt) (int16, error) {
if err := validateDataType(ctx, sqlxCtx, opt.DBValueType); err != nil {
return 0, fmt.Errorf("err when validating value_type input, details: %s", err.Error())
}
var featureId int16
query := "INSERT INTO feature(name, group_id, db_value_type, value_type, description) VALUES ($1, $2, $3, $4, $5) RETURNING id"
err := exec.GetContext(ctx, &featureId, query, opt.Name, opt.GroupID, opt.DBValueType, opt.ValueType, opt.Description)
err := sqlxCtx.GetContext(ctx, &featureId, query, opt.Name, opt.GroupID, opt.DBValueType, opt.ValueType, opt.Description)
if err != nil {
if e2, ok := err.(*pq.Error); ok {
if e2.Code == pgerrcode.UniqueViolation {
Expand All @@ -27,9 +27,9 @@ func createFeature(ctx context.Context, exec metadata.ExecContext, opt metadata.
return featureId, err
}

func updateFeature(ctx context.Context, exec metadata.ExecContext, opt metadata.UpdateFeatureOpt) error {
func updateFeature(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.UpdateFeatureOpt) error {
query := "UPDATE feature SET description = $1 WHERE id = $2"
result, err := exec.ExecContext(ctx, query, opt.NewDescription, opt.FeatureID)
result, err := sqlxCtx.ExecContext(ctx, query, opt.NewDescription, opt.FeatureID)
if err != nil {
return err
}
Expand All @@ -43,9 +43,9 @@ func updateFeature(ctx context.Context, exec metadata.ExecContext, opt metadata.
return nil
}

func validateDataType(ctx context.Context, exec metadata.ExecContext, dataType string) error {
func validateDataType(ctx context.Context, sqlxCtx metadata.SqlxContext, dataType string) error {
tmpTable := dbutil.TempTable("validate_data_type")
stmt := fmt.Sprintf("CREATE TEMPORARY TABLE %s (a %s) ON COMMIT DROP", tmpTable, dataType)
_, err := exec.ExecContext(ctx, stmt)
_, err := sqlxCtx.ExecContext(ctx, stmt)
return err
}
8 changes: 4 additions & 4 deletions internal/database/metadata/postgres/feature_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/oom-ai/oomstore/pkg/oomstore/types"
)

func createFeatureGroup(ctx context.Context, exec metadata.ExecContext, opt metadata.CreateFeatureGroupOpt) (int16, error) {
func createFeatureGroup(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.CreateFeatureGroupOpt) (int16, error) {
if opt.Category != types.BatchFeatureCategory && opt.Category != types.StreamFeatureCategory {
return 0, fmt.Errorf("illegal category '%s', should be either 'stream' or 'batch'", opt.Category)
}
var featureGroupId int16
query := "insert into feature_group(name, entity_id, category, description) values($1, $2, $3, $4) returning id"
err := exec.GetContext(ctx, &featureGroupId, query, opt.Name, opt.EntityID, opt.Category, opt.Description)
err := sqlxCtx.GetContext(ctx, &featureGroupId, query, opt.Name, opt.EntityID, opt.Category, opt.Description)
if err != nil {
if e2, ok := err.(*pq.Error); ok {
if e2.Code == pgerrcode.UniqueViolation {
Expand All @@ -29,7 +29,7 @@ func createFeatureGroup(ctx context.Context, exec metadata.ExecContext, opt meta
return featureGroupId, err
}

func updateFeatureGroup(ctx context.Context, exec metadata.ExecContext, opt metadata.UpdateFeatureGroupOpt) error {
func updateFeatureGroup(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.UpdateFeatureGroupOpt) error {
and := make(map[string]interface{})
if opt.NewDescription != nil {
and["description"] = *opt.NewDescription
Expand All @@ -48,7 +48,7 @@ func updateFeatureGroup(ctx context.Context, exec metadata.ExecContext, opt meta
}

query := fmt.Sprintf("UPDATE feature_group SET %s WHERE id = ?", strings.Join(cond, ","))
result, err := exec.ExecContext(ctx, exec.Rebind(query), args...)
result, err := sqlxCtx.ExecContext(ctx, sqlxCtx.Rebind(query), args...)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/database/metadata/postgres/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"github.com/oom-ai/oomstore/internal/database/metadata"
)

func createRevision(ctx context.Context, exec metadata.ExecContext, opt metadata.CreateRevisionOpt) (int32, string, error) {
func createRevision(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.CreateRevisionOpt) (int32, string, error) {
var dataTable string
if opt.DataTable != nil {
dataTable = *opt.DataTable
}

var revisionId int32
insertQuery := "INSERT INTO feature_group_revision(group_id, revision, data_table, anchored, description) VALUES ($1, $2, $3, $4, $5) RETURNING id"
if err := exec.GetContext(ctx, &revisionId, insertQuery, opt.GroupID, opt.Revision, dataTable, opt.Anchored, opt.Description); err != nil {
if err := sqlxCtx.GetContext(ctx, &revisionId, insertQuery, opt.GroupID, opt.Revision, dataTable, opt.Anchored, opt.Description); err != nil {
if e2, ok := err.(*pq.Error); ok {
if e2.Code == pgerrcode.UniqueViolation {
return 0, "", fmt.Errorf("revision already exists: groupId=%d, revision=%d", opt.GroupID, opt.Revision)
Expand All @@ -30,7 +30,7 @@ func createRevision(ctx context.Context, exec metadata.ExecContext, opt metadata
if opt.DataTable == nil {
updateQuery := "UPDATE feature_group_revision SET data_table = $1 WHERE id = $2"
dataTable = fmt.Sprintf("data_%d_%d", opt.GroupID, revisionId)
result, err := exec.ExecContext(ctx, updateQuery, dataTable, revisionId)
result, err := sqlxCtx.ExecContext(ctx, updateQuery, dataTable, revisionId)
if err != nil {
return 0, "", err
}
Expand All @@ -48,7 +48,7 @@ func createRevision(ctx context.Context, exec metadata.ExecContext, opt metadata

// UpdateRevision = MustUpdateRevision
// If fail to update any row or update more than one row, return error
func updateRevision(ctx context.Context, exec metadata.ExecContext, opt metadata.UpdateRevisionOpt) error {
func updateRevision(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metadata.UpdateRevisionOpt) error {
and := make(map[string]interface{})
if opt.NewRevision != nil {
and["revision"] = *opt.NewRevision
Expand All @@ -66,7 +66,7 @@ func updateRevision(ctx context.Context, exec metadata.ExecContext, opt metadata
args = append(args, opt.RevisionID)

query := fmt.Sprintf("UPDATE feature_group_revision SET %s WHERE id = ?", strings.Join(cond, ","))
result, err := exec.ExecContext(ctx, exec.Rebind(query), args...)
result, err := sqlxCtx.ExecContext(ctx, sqlxCtx.Rebind(query), args...)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/database/metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Store interface {
io.Closer
}

type ExecContext interface {
type SqlxContext interface {
GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
Expand Down

0 comments on commit 1fa9653

Please sign in to comment.