Skip to content

Commit

Permalink
Merge pull request #485 from oom-ai/jinghan/rename_data_table
Browse files Browse the repository at this point in the history
Jinghan/rename offline dataTable as `data_<group_id>_<revision_id>`
  • Loading branch information
jinghancc authored Nov 15, 2021
2 parents 84310e1 + 0c32bbe commit 1c45c8f
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 103 deletions.
9 changes: 5 additions & 4 deletions internal/database/metadata/mock_metadata/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/database/metadata/postgres/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/oom-ai/oomstore/internal/database/metadata"
)

func (db *DB) CreateRevision(ctx context.Context, opt metadata.CreateRevisionOpt) (int32, error) {
func (db *DB) CreateRevision(ctx context.Context, opt metadata.CreateRevisionOpt) (int32, string, error) {
var dataTable string
if opt.DataTable != nil {
dataTable = *opt.DataTable
Expand All @@ -31,7 +31,7 @@ func (db *DB) CreateRevision(ctx context.Context, opt metadata.CreateRevisionOpt
}
if opt.DataTable == nil {
updateQuery := "UPDATE feature_group_revision SET data_table = $1 WHERE id = $2"
dataTable := fmt.Sprintf("data_%d_%d", opt.GroupID, revisionId)
dataTable = fmt.Sprintf("data_%d_%d", opt.GroupID, revisionId)
result, err := tx.ExecContext(ctx, updateQuery, dataTable, revisionId)
if err != nil {
return err
Expand All @@ -48,7 +48,7 @@ func (db *DB) CreateRevision(ctx context.Context, opt metadata.CreateRevisionOpt
return nil
})

return revisionId, err
return revisionId, dataTable, err
}

// UpdateRevision = MustUpdateRevision
Expand Down
2 changes: 1 addition & 1 deletion internal/database/metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Store interface {
ListFeatureGroup(ctx context.Context, entityID *int16) types.FeatureGroupList

// revision
CreateRevision(ctx context.Context, opt CreateRevisionOpt) (int32, error)
CreateRevision(ctx context.Context, opt CreateRevisionOpt) (int32, string, error)
UpdateRevision(ctx context.Context, opt UpdateRevisionOpt) error
GetRevision(ctx context.Context, id int32) (*types.Revision, error)
GetRevisionBy(ctx context.Context, groupID int16, revision int64) (*types.Revision, error)
Expand Down
12 changes: 6 additions & 6 deletions internal/database/metadata/test/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestCreateRevision(t *testing.T, prepareStore PrepareStoreRuntimeFunc) {

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
actual, err := store.CreateRevision(ctx, tc.opt)
actual, _, err := store.CreateRevision(ctx, tc.opt)
require.Equal(t, tc.expected, actual)
if tc.expectedError != nil {
require.EqualError(t, err, tc.expectedError.Error())
Expand All @@ -98,7 +98,7 @@ func TestUpdateRevision(t *testing.T, prepareStore PrepareStoreRuntimeFunc) {
defer store.Close()

_, groupId := prepareEntityAndGroup(t, ctx, store)
revisionId, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
revisionId, _, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: 1000,
GroupID: groupId,
DataTable: stringPtr("device_info_1000"),
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestGetRevision(t *testing.T, prepareStore PrepareStoreRuntimeFunc) {
defer store.Close()

_, groupId := prepareEntityAndGroup(t, ctx, store)
revisionId, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
revisionId, _, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: 1000,
GroupID: groupId,
DataTable: stringPtr("device_info_1000"),
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestGetRevisionBy(t *testing.T, prepareStore PrepareStoreRuntimeFunc) {
defer store.Close()

_, groupId := prepareEntityAndGroup(t, ctx, store)
revisionId, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
revisionId, _, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: 1000,
GroupID: groupId,
DataTable: stringPtr("device_info_1000"),
Expand Down Expand Up @@ -357,15 +357,15 @@ func prepareRevisions(t *testing.T, ctx context.Context, store metadata.Store) (
})
require.NoError(t, err)
require.NoError(t, store.Refresh())
revisionId1, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
revisionId1, _, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: 1000,
GroupID: groupId,
DataTable: stringPtr("device_info_1000"),
Anchored: false,
})
require.NoError(t, err)

revisionId2, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
revisionId2, _, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: 2000,
GroupID: groupId,
DataTable: stringPtr("device_info_2000"),
Expand Down
9 changes: 4 additions & 5 deletions internal/database/offline/mock_offline/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions internal/database/offline/postgres/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/csv"
"fmt"
"io"
"strconv"
"time"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -44,9 +43,8 @@ func loadData(tx *sqlx.Tx, ctx context.Context, csvReader *csv.Reader, tableName

}

func (db *DB) Import(ctx context.Context, opt offline.ImportOpt) (int64, string, error) {
func (db *DB) Import(ctx context.Context, opt offline.ImportOpt) (int64, error) {
var revision int64
var finalTableName string
err := dbutil.WithTransaction(db.DB, ctx, func(ctx context.Context, tx *sqlx.Tx) error {
// create the data table
tmpTableName := dbutil.TempTable(opt.GroupName)
Expand All @@ -70,12 +68,9 @@ func (db *DB) Import(ctx context.Context, opt offline.ImportOpt) (int64, string,
revision = time.Now().Unix()
}

// generate final data table name
finalTableName = opt.GroupName + "_" + strconv.FormatInt(revision, 10)

rename := fmt.Sprintf("ALTER TABLE %s RENAME TO %s", tmpTableName, finalTableName)
rename := fmt.Sprintf("ALTER TABLE %s RENAME TO %s", tmpTableName, opt.DataTableName)
_, err = tx.ExecContext(ctx, rename)
return err
})
return revision, finalTableName, err
return revision, err
}
11 changes: 6 additions & 5 deletions internal/database/offline/postgres/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ func TestImport(t *testing.T) {
}

opt := offline.ImportOpt{
GroupName: "device",
Entity: &entity,
GroupName: "device",
Entity: &entity,
DataTableName: "device_1",
Features: []*types.Feature{
{
Name: "model",
Expand All @@ -45,15 +46,15 @@ func TestImport(t *testing.T) {
}

t.Run("invalid db value type", func(t *testing.T) {
_, _, err := db.Import(context.Background(), opt)
_, err := db.Import(context.Background(), opt)
assert.NotNil(t, err)
})

t.Run("normal import call", func(t *testing.T) {
revision := int64(1234)
opt.Features[0].DBValueType = "varchar(32)"
opt.Revision = &revision
_, tableName, err := db.Import(context.Background(), opt)
_, err := db.Import(context.Background(), opt)
assert.Nil(t, err)

type T struct {
Expand All @@ -63,7 +64,7 @@ func TestImport(t *testing.T) {
}
records := make([]T, 0)

assert.Nil(t, db.SelectContext(context.Background(), &records, fmt.Sprintf("select * from %s", tableName)))
assert.Nil(t, db.SelectContext(context.Background(), &records, fmt.Sprintf("select * from %s", opt.DataTableName)))
assert.Equal(t, 4, len(records))

sort.Slice(records, func(i, j int) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/database/offline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type Store interface {
Join(ctx context.Context, opt JoinOpt) (*types.JoinResult, error)
Export(ctx context.Context, opt ExportOpt) (<-chan *types.RawFeatureValueRecord, error)
Import(ctx context.Context, opt ImportOpt) (int64, string, error)
Import(ctx context.Context, opt ImportOpt) (int64, error)

TypeTag(dbType string) (string, error)
io.Closer
Expand Down
11 changes: 6 additions & 5 deletions internal/database/offline/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type JoinOneFeatureGroupOpt struct {
}

type ImportOpt struct {
GroupName string
Entity *types.Entity
Features types.FeatureList
Header []string
Revision *int64
GroupName string
Entity *types.Entity
Features types.FeatureList
Header []string
Revision *int64
DataTableName string

// CsvReader must not contain header
CsvReader *csv.Reader
Expand Down
45 changes: 32 additions & 13 deletions pkg/oomstore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,46 @@ func (s *OomStore) ImportBatchFeatures(ctx context.Context, opt types.ImportBatc
return 0, fmt.Errorf("csv header of the data source %v doesn't match the feature group schema %v", header, columnNames)
}

revision, dataTable, err := s.offline.Import(ctx, offline.ImportOpt{
GroupName: group.Name,
Entity: entity,
Features: features,
Header: header,
Revision: opt.Revision,
CsvReader: csvReader,
var revision int64
if opt.Revision != nil {
revision = *opt.Revision
}

newRevisionID, dataTableName, err := s.metadata.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: revision,
GroupID: group.ID,
// TODO: support user-defined DataTable
DataTable: nil,
Description: opt.Description,
Anchored: opt.Revision != nil,
})
if err != nil {
return 0, err
}

newRevisionID, err := s.metadata.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: revision,
GroupID: group.ID,
DataTable: &dataTable,
Description: opt.Description,
Anchored: opt.Revision != nil,
revision, err = s.offline.Import(ctx, offline.ImportOpt{
GroupName: group.Name,
Entity: entity,
Features: features,
Header: header,
Revision: opt.Revision,
CsvReader: csvReader,
DataTableName: dataTableName,
})
if err != nil {
return 0, err
}

if opt.Revision == nil {
if err := s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
RevisionID: newRevisionID,
NewRevision: &revision,
}); err != nil {
return 0, nil
}
}

// TODO: clean up revision and data_table if import failed

return newRevisionID, nil
}
Loading

0 comments on commit 1c45c8f

Please sign in to comment.