Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(warehouse): handle schema change #2654

Merged
merged 12 commits into from
Nov 15, 2022
10 changes: 9 additions & 1 deletion warehouse/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package warehouse

import "fmt"
import (
"errors"
"fmt"
)

var (
ErrIncompatibleSchemaConv = errors.New("incompatible schema conversion")
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
ErrSchemaConvNotSupported = errors.New("schema conversion not supported")
)

type InvalidDestinationCredErr struct {
Base error
Expand Down
13 changes: 13 additions & 0 deletions warehouse/model/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package model

type SchemaType string

const (
StringDataType SchemaType = "string"
BooleanDataType SchemaType = "boolean"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the first constant in this group has an explicit type

https://stackoverflow.com/questions/55282615/golangci-lint-constant-explicit-type

IntDataType SchemaType = "int"
BigIntDataType SchemaType = "bigint"
FloatDataType SchemaType = "float"
JSONDataType SchemaType = "json"
TextDataType SchemaType = "text"
)
47 changes: 27 additions & 20 deletions warehouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"reflect"

"github.com/rudderlabs/rudder-server/warehouse/model"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/warehouse/manager"
Expand All @@ -22,42 +24,47 @@ type SchemaHandleT struct {
uploadSchema warehouseutils.SchemaT
}

func HandleSchemaChange(existingDataType, columnType string, columnVal interface{}) (newColumnVal interface{}, ok bool) {
if existingDataType == "string" || existingDataType == "text" {
func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error) {
var (
newColumnVal any
err error
)

if existingDataType == model.StringDataType || existingDataType == model.TextDataType {
// only stringify if the previous type is non-string/text/json
if columnType != "string" && columnType != "text" && columnType != "json" {
newColumnVal = fmt.Sprintf("%v", columnVal)
if currentDataType != model.StringDataType && currentDataType != model.TextDataType && currentDataType != model.JSONDataType {
newColumnVal = fmt.Sprintf("%v", value)
} else {
newColumnVal = columnVal
newColumnVal = value
}
} else if (columnType == "int" || columnType == "bigint") && existingDataType == "float" {
intVal, ok := columnVal.(int)
} else if (currentDataType == model.IntDataType || currentDataType == model.BigIntDataType) && existingDataType == model.FloatDataType {
intVal, ok := value.(int)
if !ok {
newColumnVal = nil
err = ErrIncompatibleSchemaConv
} else {
newColumnVal = float64(intVal)
}
} else if columnType == "float" && (existingDataType == "int" || existingDataType == "bigint") {
floatVal, ok := columnVal.(float64)
} else if currentDataType == model.FloatDataType && (existingDataType == model.IntDataType || existingDataType == model.BigIntDataType) {
floatVal, ok := value.(float64)
if !ok {
newColumnVal = nil
err = ErrIncompatibleSchemaConv
} else {
newColumnVal = int(floatVal)
}
} else if existingDataType == "json" {
var interfaceSliceSample []interface{}
if columnType == "int" || columnType == "float" || columnType == "boolean" {
newColumnVal = fmt.Sprintf("%v", columnVal)
} else if reflect.TypeOf(columnVal) == reflect.TypeOf(interfaceSliceSample) {
newColumnVal = columnVal
} else if existingDataType == model.JSONDataType {
var interfaceSliceSample []any
if currentDataType == model.IntDataType || currentDataType == model.FloatDataType || currentDataType == model.BooleanDataType {
newColumnVal = fmt.Sprintf("%v", value)
} else if reflect.TypeOf(value) == reflect.TypeOf(interfaceSliceSample) {
newColumnVal = value
} else {
newColumnVal = fmt.Sprintf(`"%v"`, columnVal)
newColumnVal = fmt.Sprintf(`"%v"`, value)
}
} else {
return nil, false
err = ErrSchemaConvNotSupported
}

return newColumnVal, true
return newColumnVal, err
}

func (sh *SchemaHandleT) getLocalSchema() (currentSchema warehouseutils.SchemaT) {
Expand Down
Loading