Skip to content

Commit

Permalink
chore(warehouse): handle schema change (#2654)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Nov 15, 2022
1 parent 5d466d3 commit d01f98b
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 240 deletions.
10 changes: 9 additions & 1 deletion warehouse/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package warehouse

import "fmt"
import (
"errors"
"fmt"
)

var (
ErrIncompatibleSchemaConversion = errors.New("incompatible schema conversion")
ErrSchemaConversionNotSupported = errors.New("schema conversion not supported")
)

type InvalidDestinationCredErr struct {
Base error
Expand Down
13 changes: 13 additions & 0 deletions warehouse/model/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package model

type SchemaType string

const (
StringDataType SchemaType = "string"
BooleanDataType SchemaType = "boolean"
IntDataType SchemaType = "int"
BigIntDataType SchemaType = "bigint"
FloatDataType SchemaType = "float"
JSONDataType SchemaType = "json"
TextDataType SchemaType = "text"
)
47 changes: 27 additions & 20 deletions warehouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"reflect"

"github.com/rudderlabs/rudder-server/warehouse/model"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/warehouse/manager"
Expand All @@ -22,42 +24,47 @@ type SchemaHandleT struct {
uploadSchema warehouseutils.SchemaT
}

func HandleSchemaChange(existingDataType, columnType string, columnVal interface{}) (newColumnVal interface{}, ok bool) {
if existingDataType == "string" || existingDataType == "text" {
func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error) {
var (
newColumnVal any
err error
)

if existingDataType == model.StringDataType || existingDataType == model.TextDataType {
// only stringify if the previous type is non-string/text/json
if columnType != "string" && columnType != "text" && columnType != "json" {
newColumnVal = fmt.Sprintf("%v", columnVal)
if currentDataType != model.StringDataType && currentDataType != model.TextDataType && currentDataType != model.JSONDataType {
newColumnVal = fmt.Sprintf("%v", value)
} else {
newColumnVal = columnVal
newColumnVal = value
}
} else if (columnType == "int" || columnType == "bigint") && existingDataType == "float" {
intVal, ok := columnVal.(int)
} else if (currentDataType == model.IntDataType || currentDataType == model.BigIntDataType) && existingDataType == model.FloatDataType {
intVal, ok := value.(int)
if !ok {
newColumnVal = nil
err = ErrIncompatibleSchemaConversion
} else {
newColumnVal = float64(intVal)
}
} else if columnType == "float" && (existingDataType == "int" || existingDataType == "bigint") {
floatVal, ok := columnVal.(float64)
} else if currentDataType == model.FloatDataType && (existingDataType == model.IntDataType || existingDataType == model.BigIntDataType) {
floatVal, ok := value.(float64)
if !ok {
newColumnVal = nil
err = ErrIncompatibleSchemaConversion
} else {
newColumnVal = int(floatVal)
}
} else if existingDataType == "json" {
var interfaceSliceSample []interface{}
if columnType == "int" || columnType == "float" || columnType == "boolean" {
newColumnVal = fmt.Sprintf("%v", columnVal)
} else if reflect.TypeOf(columnVal) == reflect.TypeOf(interfaceSliceSample) {
newColumnVal = columnVal
} else if existingDataType == model.JSONDataType {
var interfaceSliceSample []any
if currentDataType == model.IntDataType || currentDataType == model.FloatDataType || currentDataType == model.BooleanDataType {
newColumnVal = fmt.Sprintf("%v", value)
} else if reflect.TypeOf(value) == reflect.TypeOf(interfaceSliceSample) {
newColumnVal = value
} else {
newColumnVal = fmt.Sprintf(`"%v"`, columnVal)
newColumnVal = fmt.Sprintf(`"%v"`, value)
}
} else {
return nil, false
err = ErrSchemaConversionNotSupported
}

return newColumnVal, true
return newColumnVal, err
}

func (sh *SchemaHandleT) getLocalSchema() (currentSchema warehouseutils.SchemaT) {
Expand Down
Loading

0 comments on commit d01f98b

Please sign in to comment.