Skip to content

Commit

Permalink
Merge pull request #1086 from oom-ai/jinghan/online_create_table
Browse files Browse the repository at this point in the history
Jinghan/implement online method `CreateTable`
  • Loading branch information
jinghancc authored Jan 26, 2022
2 parents b6f2c3a + 1f54a6f commit 118cc7d
Show file tree
Hide file tree
Showing 18 changed files with 121 additions and 2 deletions.
6 changes: 6 additions & 0 deletions internal/database/dbutil/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,9 @@ func QuoteFn(backendType types.BackendType) func(...string) string {
return strings.Join(rs, ",")
}
}

func DropTable(ctx context.Context, db *sqlx.DB, tableName string) error {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, tableName)
_, err := db.ExecContext(ctx, query)
return errdefs.WithStack(err)
}
5 changes: 5 additions & 0 deletions internal/database/online/cassandra/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ func Open(option *types.CassandraOpt) (*DB, error) {
}
return &DB{Session: session}, nil
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
//TODO implement me
panic("implement me")
}
2 changes: 1 addition & 1 deletion internal/database/online/dynamodb/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (db *DB) Import(ctx context.Context, opt online.ImportOpt) error {
}
// Step 1: create table
entity := opt.Group.Entity
_, err := db.CreateTable(ctx, &dynamodb.CreateTableInput{
_, err := db.Client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
KeySchema: []types.KeySchemaElement{
{
Expand Down
5 changes: 5 additions & 0 deletions internal/database/online/dynamodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ func (db *DB) Ping(ctx context.Context) error {
func (db *DB) Close() error {
return nil
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
//TODO implement me
panic("implement me")
}
2 changes: 1 addition & 1 deletion internal/database/online/dynamodb/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTa

tableName := sqlutil.OnlineStreamTableName(opt.GroupID)

_, err := db.CreateTable(ctx, &dynamodb.CreateTableInput{
_, err := db.Client.CreateTable(ctx, &dynamodb.CreateTableInput{
TableName: aws.String(tableName),
KeySchema: []types.KeySchemaElement{
{
Expand Down
14 changes: 14 additions & 0 deletions internal/database/online/mock_online/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/database/online/mysql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (db *DB) Push(ctx context.Context, opt online.PushOpt) error {
func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error {
return sqlutil.SqlxPrepareStreamTable(ctx, db.DB, opt, Backend)
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
return sqlutil.CreateTable(ctx, db.DB, opt, Backend)
}
4 changes: 4 additions & 0 deletions internal/database/online/mysql/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ func TestPush(t *testing.T) {
func TestPing(t *testing.T) {
test_impl.TestPing(t, prepareStore, runtime_mysql.DestroyStore(DATABASE))
}

func TestCreateTable(t *testing.T) {
test_impl.TestCreateTable(t, prepareStore, runtime_mysql.DestroyStore(DATABASE))
}
4 changes: 4 additions & 0 deletions internal/database/online/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,7 @@ func (db *DB) Push(ctx context.Context, opt online.PushOpt) error {
func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error {
return sqlutil.SqlxPrepareStreamTable(ctx, db.DB, opt, Backend)
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
return sqlutil.CreateTable(ctx, db.DB, opt, Backend)
}
4 changes: 4 additions & 0 deletions internal/database/online/postgres/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ func TestPush(t *testing.T) {
func TestPing(t *testing.T) {
test_impl.TestPing(t, prepareStore, runtime_pg.DestroyStore(DATABASE))
}

func TestCreateTable(t *testing.T) {
test_impl.TestCreateTable(t, prepareStore, runtime_pg.DestroyStore(DATABASE))
}
4 changes: 4 additions & 0 deletions internal/database/online/redis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ func Open(opt *types.RedisOpt) *DB {
func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error {
return nil
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
return nil
}
4 changes: 4 additions & 0 deletions internal/database/online/sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ func (db *DB) Push(ctx context.Context, opt online.PushOpt) error {
func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error {
return sqlutil.SqlxPrepareStreamTable(ctx, db.DB, opt, Backend)
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
return sqlutil.CreateTable(ctx, db.DB, opt, Backend)
}
4 changes: 4 additions & 0 deletions internal/database/online/sqlite/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func TestPush(t *testing.T) {
func TestPing(t *testing.T) {
test_impl.TestPing(t, prepareStore, destroyStore)
}

func TestCreateTable(t *testing.T) {
test_impl.TestCreateTable(t, prepareStore, destroyStore)
}
26 changes: 26 additions & 0 deletions internal/database/online/sqlutil/create_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sqlutil

import (
"context"

"github.com/oom-ai/oomstore/internal/database/dbutil"
"github.com/oom-ai/oomstore/pkg/errdefs"

"github.com/jmoiron/sqlx"
"github.com/oom-ai/oomstore/internal/database/online"
"github.com/oom-ai/oomstore/pkg/oomstore/types"
)

func CreateTable(ctx context.Context, db *sqlx.DB, opt online.CreateTableOpt, backend types.BackendType) error {
// Step 1: drop existing table
if err := dbutil.DropTable(ctx, db, opt.TableName); err != nil {
return err
}
// Step 2: create new table
schema := dbutil.BuildTableSchema(opt.TableName, opt.EntityName, false, opt.Features, []string{opt.EntityName}, backend)
_, err := db.ExecContext(ctx, schema)
if err != nil {
return errdefs.WithStack(err)
}
return nil
}
1 change: 1 addition & 0 deletions internal/database/online/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Store interface {
Get(ctx context.Context, opt GetOpt) (dbutil.RowMap, error)
MultiGet(ctx context.Context, opt MultiGetOpt) (map[string]dbutil.RowMap, error)
Purge(ctx context.Context, revisionID int) error
CreateTable(ctx context.Context, opt CreateTableOpt) error

// Import batch / streaming features to online store
Import(ctx context.Context, opt ImportOpt) error
Expand Down
24 changes: 24 additions & 0 deletions internal/database/online/test_impl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ func TestPrepareStreamTable(t *testing.T, prepareStore PrepareStoreFn, destroySt
})
}
}

func TestCreateTable(t *testing.T, prepareStore PrepareStoreFn, destroyStore DestroyStoreFn) {
t.Cleanup(destroyStore)

ctx, store := prepareStore(t)
defer store.Close()

t.Run("create stream table", func(t *testing.T) {
err := store.CreateTable(ctx, online.CreateTableOpt{
EntityName: SampleStream.Entity.Name,
TableName: "stream_online",
Features: SampleStream.Features,
})
assert.NoError(t, err, "create stream table failed: %v", err)
})
t.Run("create batch table", func(t *testing.T) {
err := store.CreateTable(ctx, online.CreateTableOpt{
EntityName: SampleSmall.Entity.Name,
TableName: "batch_online",
Features: SampleSmall.Features,
})
assert.NoError(t, err, "create batch table failed: %v", err)
})
}
4 changes: 4 additions & 0 deletions internal/database/online/tikv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (db *DB) Ping(ctx context.Context) error {
func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error {
return nil
}

func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error {
return nil
}
6 changes: 6 additions & 0 deletions internal/database/online/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ type PrepareStreamTableOpt struct {
// otherwise it means the stream table will be created.
Feature *types.Feature
}

type CreateTableOpt struct {
EntityName string
TableName string
Features types.FeatureList
}

0 comments on commit 118cc7d

Please sign in to comment.