diff --git a/internal/database/database.go b/internal/database/database.go index 0a28b614e..d4e3336c1 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -11,6 +11,8 @@ import ( _ "github.com/lib/pq" ) +type RowMap = map[string]interface{} + type DB struct { *sqlx.DB } diff --git a/internal/database/export.go b/internal/database/export.go deleted file mode 100644 index 1ba17d426..000000000 --- a/internal/database/export.go +++ /dev/null @@ -1,56 +0,0 @@ -package database - -import ( - "context" - "fmt" - "math/rand" - "strings" - - "github.com/onestore-ai/onestore/pkg/onestore/types" -) - -func (db *DB) createTableEntityDfWithFeatures(ctx context.Context, features []*types.RichFeature, entity *types.Entity) (string, error) { - tableName := fmt.Sprintf("entity_df_with_features_%d", rand.Int()) - schema := ` - CREATE TABLE %s ( - unique_key VARCHAR(%d) NOT NULL, - entity_key VARCHAR(%d) NOT NULL, - unix_time BIGINT NOT NULL, - %s, - PRIMARY KEY (entity_key, unix_time) - ); - ` - - var columnDefs []string - for _, f := range features { - columnDefs = append(columnDefs, fmt.Sprintf(`"%s" %s`, f.Name, f.ValueType)) - } - // unique_key = entity_key,unix_time; length of unique_key = entity.Length + 9 - schema = fmt.Sprintf(schema, tableName, entity.Length+10, entity.Length, strings.Join(columnDefs, ",\n")) - _, err := db.ExecContext(ctx, schema) - return tableName, err -} - -func (db *DB) createAndImportTableEntityDf(ctx context.Context, entityRows []types.EntityRow, entity *types.Entity) (string, error) { - tableName := fmt.Sprintf("entity_df_%d", rand.Int()) - schema := fmt.Sprintf(` - CREATE TABLE %s ( - entity_key VARCHAR(%d) NOT NULL, - unix_time BIGINT NOT NULL, - PRIMARY KEY (entity_key, unix_time) - ); - `, tableName, entity.Length) - if _, err := db.ExecContext(ctx, schema); err != nil { - return tableName, err - } - - insertQuery := fmt.Sprintf(`INSERT INTO %s(entity_key, unix_time) VALUES (:entity_key, :unix_time)`, tableName) - _, err := db.NamedExec(insertQuery, entityRows) - return tableName, err -} - -func (db *DB) dropTable(ctx context.Context, tableName string) error { - query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, tableName) - _, err := db.ExecContext(ctx, query) - return err -} diff --git a/internal/database/feature_value.go b/internal/database/feature_value.go deleted file mode 100644 index 08a6e727f..000000000 --- a/internal/database/feature_value.go +++ /dev/null @@ -1,97 +0,0 @@ -package database - -import ( - "context" - "fmt" - "strings" - - "github.com/jmoiron/sqlx" - "github.com/onestore-ai/onestore/pkg/onestore/types" - "github.com/spf13/cast" -) - -type RowMap = map[string]interface{} - -func getFeatureValueMapFromRows(rows *sqlx.Rows, entityName string) (map[string]RowMap, error) { - featureValueMap := make(map[string]RowMap) - for rows.Next() { - rowMap := make(RowMap) - if err := rows.MapScan(rowMap); err != nil { - return nil, err - } - entityKey, ok := rowMap[entityName] - if !ok { - return nil, fmt.Errorf("missing column %s", entityName) - } - delete(rowMap, entityName) - featureValueMap[cast.ToString(entityKey)] = rowMap - } - return featureValueMap, nil -} - -func (db *DB) GetPointInTimeFeatureValues(ctx context.Context, entity *types.Entity, revisionRanges []*types.RevisionRange, features []*types.RichFeature, entityRows []types.EntityRow) (dataMap map[string]RowMap, err error) { - if len(features) == 0 { - return make(map[string]RowMap), nil - } - - // Step 0: prepare temporary tables - entityDfWithFeatureName, tmpErr := db.createTableEntityDfWithFeatures(ctx, features, entity) - if tmpErr != nil { - return nil, tmpErr - } - defer func() { - if tmpErr := db.dropTable(ctx, entityDfWithFeatureName); tmpErr != nil { - err = tmpErr - } - }() - - entityDfName, tmpErr := db.createAndImportTableEntityDf(ctx, entityRows, entity) - if tmpErr != nil { - return nil, tmpErr - } - defer func() { - if tmpErr := db.dropTable(ctx, entityDfName); tmpErr != nil { - err = tmpErr - } - }() - - // Step 1: iterate each table range, get result - joinQuery := ` - INSERT INTO %s(unique_key, entity_key, unix_time, %s) - SELECT - CONCAT(l.entity_key, ',', l.unix_time) AS unique_key, - l.entity_key AS entity_key, - l.unix_time AS unix_time, - %s - FROM %s AS l - LEFT JOIN %s AS r - ON l.entity_key = r.%s - WHERE l.unix_time >= $1 AND l.unix_time < $2; - ` - featureNamesStr := buildFeatureNameStr(features) - for _, r := range revisionRanges { - _, tmpErr := db.ExecContext(ctx, fmt.Sprintf(joinQuery, entityDfWithFeatureName, featureNamesStr, featureNamesStr, entityDfName, r.DataTable, entity.Name), r.MinRevision, r.MaxRevision) - if tmpErr != nil { - return nil, tmpErr - } - } - - // Step 2: get rows from entity_df_with_features table - resultQuery := fmt.Sprintf(`SELECT * FROM %s`, entityDfWithFeatureName) - rows, tmpErr := db.QueryxContext(ctx, resultQuery) - if tmpErr != nil { - return nil, tmpErr - } - defer rows.Close() - - dataMap, err = getFeatureValueMapFromRows(rows, "unique_key") - return dataMap, err -} - -func buildFeatureNameStr(features []*types.RichFeature) string { - featureNames := make([]string, 0, len(features)) - for _, f := range features { - featureNames = append(featureNames, f.Name) - } - return strings.Join(featureNames, " ,") -}