diff --git a/internal/database/dbutil/sql.go b/internal/database/dbutil/sql.go index 02bd51b47..4553dbb49 100644 --- a/internal/database/dbutil/sql.go +++ b/internal/database/dbutil/sql.go @@ -89,3 +89,9 @@ func QuoteFn(backendType types.BackendType) func(...string) string { return strings.Join(rs, ",") } } + +func DropTable(ctx context.Context, db *sqlx.DB, tableName string) error { + query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, tableName) + _, err := db.ExecContext(ctx, query) + return errdefs.WithStack(err) +} diff --git a/internal/database/online/cassandra/store.go b/internal/database/online/cassandra/store.go index fa054d0c3..2f15fc3d7 100644 --- a/internal/database/online/cassandra/store.go +++ b/internal/database/online/cassandra/store.go @@ -50,3 +50,8 @@ func Open(option *types.CassandraOpt) (*DB, error) { } return &DB{Session: session}, nil } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + //TODO implement me + panic("implement me") +} diff --git a/internal/database/online/dynamodb/import.go b/internal/database/online/dynamodb/import.go index c6ceb570c..b920ba249 100644 --- a/internal/database/online/dynamodb/import.go +++ b/internal/database/online/dynamodb/import.go @@ -38,7 +38,7 @@ func (db *DB) Import(ctx context.Context, opt online.ImportOpt) error { } // Step 1: create table entity := opt.Group.Entity - _, err := db.CreateTable(ctx, &dynamodb.CreateTableInput{ + _, err := db.Client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), KeySchema: []types.KeySchemaElement{ { diff --git a/internal/database/online/dynamodb/store.go b/internal/database/online/dynamodb/store.go index 00374843c..0b641d19e 100644 --- a/internal/database/online/dynamodb/store.go +++ b/internal/database/online/dynamodb/store.go @@ -55,3 +55,8 @@ func (db *DB) Ping(ctx context.Context) error { func (db *DB) Close() error { return nil } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + //TODO implement me + panic("implement me") +} diff --git a/internal/database/online/dynamodb/stream.go b/internal/database/online/dynamodb/stream.go index be2995c82..9afff1713 100644 --- a/internal/database/online/dynamodb/stream.go +++ b/internal/database/online/dynamodb/stream.go @@ -23,7 +23,7 @@ func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTa tableName := sqlutil.OnlineStreamTableName(opt.GroupID) - _, err := db.CreateTable(ctx, &dynamodb.CreateTableInput{ + _, err := db.Client.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(tableName), KeySchema: []types.KeySchemaElement{ { diff --git a/internal/database/online/mock_online/store.go b/internal/database/online/mock_online/store.go index ddc975419..907684955 100644 --- a/internal/database/online/mock_online/store.go +++ b/internal/database/online/mock_online/store.go @@ -50,6 +50,20 @@ func (mr *MockStoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStore)(nil).Close)) } +// CreateTable mocks base method. +func (m *MockStore) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateTable", ctx, opt) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateTable indicates an expected call of CreateTable. +func (mr *MockStoreMockRecorder) CreateTable(ctx, opt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTable", reflect.TypeOf((*MockStore)(nil).CreateTable), ctx, opt) +} + // Get mocks base method. func (m *MockStore) Get(ctx context.Context, opt online.GetOpt) (dbutil.RowMap, error) { m.ctrl.T.Helper() diff --git a/internal/database/online/mysql/store.go b/internal/database/online/mysql/store.go index 23a9cab70..32d1e1ed2 100644 --- a/internal/database/online/mysql/store.go +++ b/internal/database/online/mysql/store.go @@ -68,3 +68,7 @@ func (db *DB) Push(ctx context.Context, opt online.PushOpt) error { func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error { return sqlutil.SqlxPrepareStreamTable(ctx, db.DB, opt, Backend) } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + return sqlutil.CreateTable(ctx, db.DB, opt, Backend) +} diff --git a/internal/database/online/mysql/store_test.go b/internal/database/online/mysql/store_test.go index 447eea4a8..e01647db3 100644 --- a/internal/database/online/mysql/store_test.go +++ b/internal/database/online/mysql/store_test.go @@ -64,3 +64,7 @@ func TestPush(t *testing.T) { func TestPing(t *testing.T) { test_impl.TestPing(t, prepareStore, runtime_mysql.DestroyStore(DATABASE)) } + +func TestCreateTable(t *testing.T) { + test_impl.TestCreateTable(t, prepareStore, runtime_mysql.DestroyStore(DATABASE)) +} diff --git a/internal/database/online/postgres/store.go b/internal/database/online/postgres/store.go index cbeb74909..4a3cacc45 100644 --- a/internal/database/online/postgres/store.go +++ b/internal/database/online/postgres/store.go @@ -69,3 +69,7 @@ func (db *DB) Push(ctx context.Context, opt online.PushOpt) error { func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error { return sqlutil.SqlxPrepareStreamTable(ctx, db.DB, opt, Backend) } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + return sqlutil.CreateTable(ctx, db.DB, opt, Backend) +} diff --git a/internal/database/online/postgres/store_test.go b/internal/database/online/postgres/store_test.go index e7ffbea00..25a167cf5 100644 --- a/internal/database/online/postgres/store_test.go +++ b/internal/database/online/postgres/store_test.go @@ -64,3 +64,7 @@ func TestPush(t *testing.T) { func TestPing(t *testing.T) { test_impl.TestPing(t, prepareStore, runtime_pg.DestroyStore(DATABASE)) } + +func TestCreateTable(t *testing.T) { + test_impl.TestCreateTable(t, prepareStore, runtime_pg.DestroyStore(DATABASE)) +} diff --git a/internal/database/online/redis/store.go b/internal/database/online/redis/store.go index 9d8439432..b3e78ef55 100644 --- a/internal/database/online/redis/store.go +++ b/internal/database/online/redis/store.go @@ -37,3 +37,7 @@ func Open(opt *types.RedisOpt) *DB { func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error { return nil } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + return nil +} diff --git a/internal/database/online/sqlite/store.go b/internal/database/online/sqlite/store.go index 27b93ca39..efaeea7b4 100644 --- a/internal/database/online/sqlite/store.go +++ b/internal/database/online/sqlite/store.go @@ -66,3 +66,7 @@ func (db *DB) Push(ctx context.Context, opt online.PushOpt) error { func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error { return sqlutil.SqlxPrepareStreamTable(ctx, db.DB, opt, Backend) } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + return sqlutil.CreateTable(ctx, db.DB, opt, Backend) +} diff --git a/internal/database/online/sqlite/store_test.go b/internal/database/online/sqlite/store_test.go index 98613403b..337e041a5 100644 --- a/internal/database/online/sqlite/store_test.go +++ b/internal/database/online/sqlite/store_test.go @@ -63,3 +63,7 @@ func TestPush(t *testing.T) { func TestPing(t *testing.T) { test_impl.TestPing(t, prepareStore, destroyStore) } + +func TestCreateTable(t *testing.T) { + test_impl.TestCreateTable(t, prepareStore, destroyStore) +} diff --git a/internal/database/online/sqlutil/create_table.go b/internal/database/online/sqlutil/create_table.go new file mode 100644 index 000000000..ff80e49f6 --- /dev/null +++ b/internal/database/online/sqlutil/create_table.go @@ -0,0 +1,26 @@ +package sqlutil + +import ( + "context" + + "github.com/oom-ai/oomstore/internal/database/dbutil" + "github.com/oom-ai/oomstore/pkg/errdefs" + + "github.com/jmoiron/sqlx" + "github.com/oom-ai/oomstore/internal/database/online" + "github.com/oom-ai/oomstore/pkg/oomstore/types" +) + +func CreateTable(ctx context.Context, db *sqlx.DB, opt online.CreateTableOpt, backend types.BackendType) error { + // Step 1: drop existing table + if err := dbutil.DropTable(ctx, db, opt.TableName); err != nil { + return err + } + // Step 2: create new table + schema := dbutil.BuildTableSchema(opt.TableName, opt.EntityName, false, opt.Features, []string{opt.EntityName}, backend) + _, err := db.ExecContext(ctx, schema) + if err != nil { + return errdefs.WithStack(err) + } + return nil +} diff --git a/internal/database/online/store.go b/internal/database/online/store.go index 4efdbea4e..5eed85450 100644 --- a/internal/database/online/store.go +++ b/internal/database/online/store.go @@ -11,6 +11,7 @@ type Store interface { Get(ctx context.Context, opt GetOpt) (dbutil.RowMap, error) MultiGet(ctx context.Context, opt MultiGetOpt) (map[string]dbutil.RowMap, error) Purge(ctx context.Context, revisionID int) error + CreateTable(ctx context.Context, opt CreateTableOpt) error // Import batch / streaming features to online store Import(ctx context.Context, opt ImportOpt) error diff --git a/internal/database/online/test_impl/create_table.go b/internal/database/online/test_impl/create_table.go index 5a157ea35..ab5e3796d 100644 --- a/internal/database/online/test_impl/create_table.go +++ b/internal/database/online/test_impl/create_table.go @@ -33,3 +33,27 @@ func TestPrepareStreamTable(t *testing.T, prepareStore PrepareStoreFn, destroySt }) } } + +func TestCreateTable(t *testing.T, prepareStore PrepareStoreFn, destroyStore DestroyStoreFn) { + t.Cleanup(destroyStore) + + ctx, store := prepareStore(t) + defer store.Close() + + t.Run("create stream table", func(t *testing.T) { + err := store.CreateTable(ctx, online.CreateTableOpt{ + EntityName: SampleStream.Entity.Name, + TableName: "stream_online", + Features: SampleStream.Features, + }) + assert.NoError(t, err, "create stream table failed: %v", err) + }) + t.Run("create batch table", func(t *testing.T) { + err := store.CreateTable(ctx, online.CreateTableOpt{ + EntityName: SampleSmall.Entity.Name, + TableName: "batch_online", + Features: SampleSmall.Features, + }) + assert.NoError(t, err, "create batch table failed: %v", err) + }) +} diff --git a/internal/database/online/tikv/store.go b/internal/database/online/tikv/store.go index 1ed9bfc54..cc693d698 100644 --- a/internal/database/online/tikv/store.go +++ b/internal/database/online/tikv/store.go @@ -46,3 +46,7 @@ func (db *DB) Ping(ctx context.Context) error { func (db *DB) PrepareStreamTable(ctx context.Context, opt online.PrepareStreamTableOpt) error { return nil } + +func (db *DB) CreateTable(ctx context.Context, opt online.CreateTableOpt) error { + return nil +} diff --git a/internal/database/online/types.go b/internal/database/online/types.go index 1997136b9..bedef8e0a 100644 --- a/internal/database/online/types.go +++ b/internal/database/online/types.go @@ -42,3 +42,9 @@ type PrepareStreamTableOpt struct { // otherwise it means the stream table will be created. Feature *types.Feature } + +type CreateTableOpt struct { + EntityName string + TableName string + Features types.FeatureList +}