diff --git a/warehouse/errors.go b/warehouse/errors.go index 167f643e21..b5365a4e8d 100644 --- a/warehouse/errors.go +++ b/warehouse/errors.go @@ -1,6 +1,14 @@ package warehouse -import "fmt" +import ( + "errors" + "fmt" +) + +var ( + ErrIncompatibleSchemaConversion = errors.New("incompatible schema conversion") + ErrSchemaConversionNotSupported = errors.New("schema conversion not supported") +) type InvalidDestinationCredErr struct { Base error diff --git a/warehouse/model/schema.go b/warehouse/model/schema.go new file mode 100644 index 0000000000..6b5187e9bb --- /dev/null +++ b/warehouse/model/schema.go @@ -0,0 +1,13 @@ +package model + +type SchemaType string + +const ( + StringDataType SchemaType = "string" + BooleanDataType SchemaType = "boolean" + IntDataType SchemaType = "int" + BigIntDataType SchemaType = "bigint" + FloatDataType SchemaType = "float" + JSONDataType SchemaType = "json" + TextDataType SchemaType = "text" +) diff --git a/warehouse/schema.go b/warehouse/schema.go index 05b40346ae..5e7f84ae0f 100644 --- a/warehouse/schema.go +++ b/warehouse/schema.go @@ -6,6 +6,8 @@ import ( "fmt" "reflect" + "github.com/rudderlabs/rudder-server/warehouse/model" + "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/timeutil" "github.com/rudderlabs/rudder-server/warehouse/manager" @@ -22,42 +24,47 @@ type SchemaHandleT struct { uploadSchema warehouseutils.SchemaT } -func HandleSchemaChange(existingDataType, columnType string, columnVal interface{}) (newColumnVal interface{}, ok bool) { - if existingDataType == "string" || existingDataType == "text" { +func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error) { + var ( + newColumnVal any + err error + ) + + if existingDataType == model.StringDataType || existingDataType == model.TextDataType { // only stringify if the previous type is non-string/text/json - if columnType != "string" && columnType != "text" && columnType != "json" { - newColumnVal = fmt.Sprintf("%v", columnVal) + if currentDataType != model.StringDataType && currentDataType != model.TextDataType && currentDataType != model.JSONDataType { + newColumnVal = fmt.Sprintf("%v", value) } else { - newColumnVal = columnVal + newColumnVal = value } - } else if (columnType == "int" || columnType == "bigint") && existingDataType == "float" { - intVal, ok := columnVal.(int) + } else if (currentDataType == model.IntDataType || currentDataType == model.BigIntDataType) && existingDataType == model.FloatDataType { + intVal, ok := value.(int) if !ok { - newColumnVal = nil + err = ErrIncompatibleSchemaConversion } else { newColumnVal = float64(intVal) } - } else if columnType == "float" && (existingDataType == "int" || existingDataType == "bigint") { - floatVal, ok := columnVal.(float64) + } else if currentDataType == model.FloatDataType && (existingDataType == model.IntDataType || existingDataType == model.BigIntDataType) { + floatVal, ok := value.(float64) if !ok { - newColumnVal = nil + err = ErrIncompatibleSchemaConversion } else { newColumnVal = int(floatVal) } - } else if existingDataType == "json" { - var interfaceSliceSample []interface{} - if columnType == "int" || columnType == "float" || columnType == "boolean" { - newColumnVal = fmt.Sprintf("%v", columnVal) - } else if reflect.TypeOf(columnVal) == reflect.TypeOf(interfaceSliceSample) { - newColumnVal = columnVal + } else if existingDataType == model.JSONDataType { + var interfaceSliceSample []any + if currentDataType == model.IntDataType || currentDataType == model.FloatDataType || currentDataType == model.BooleanDataType { + newColumnVal = fmt.Sprintf("%v", value) + } else if reflect.TypeOf(value) == reflect.TypeOf(interfaceSliceSample) { + newColumnVal = value } else { - newColumnVal = fmt.Sprintf(`"%v"`, columnVal) + newColumnVal = fmt.Sprintf(`"%v"`, value) } } else { - return nil, false + err = ErrSchemaConversionNotSupported } - return newColumnVal, true + return newColumnVal, err } func (sh *SchemaHandleT) getLocalSchema() (currentSchema warehouseutils.SchemaT) { diff --git a/warehouse/schema_test.go b/warehouse/schema_test.go index 3a1b123605..95de9b2916 100644 --- a/warehouse/schema_test.go +++ b/warehouse/schema_test.go @@ -3,225 +3,276 @@ package warehouse import ( + "testing" + + "github.com/rudderlabs/rudder-server/warehouse/model" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/stretchr/testify/require" ) -var _ = Describe("Schema", func() { - Describe("Handle schema change", func() { - Context("No discards", func() { - It("should send int values if existing datatype is int", func() { - var newColumnVal, columnVal, convertedVal interface{} - var ok bool - - columnVal = 1.501 - convertedVal = 1 - newColumnVal, ok = HandleSchemaChange("int", "float", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - }) - - It("should send float values if existing datatype is float", func() { - var newColumnVal, columnVal, convertedVal interface{} - var ok bool - - columnVal = 1 - convertedVal = 1.0 - newColumnVal, ok = HandleSchemaChange("float", "int", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - }) - - It("should send string values if existing datatype is string", func() { - var newColumnVal, columnVal, convertedVal interface{} - var ok bool - - columnVal = false - convertedVal = "false" - newColumnVal, ok = HandleSchemaChange("string", "boolean", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = 1 - convertedVal = "1" - newColumnVal, ok = HandleSchemaChange("string", "int", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = 1.501 - convertedVal = "1.501" - newColumnVal, ok = HandleSchemaChange("string", "float", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = "2022-05-05T00:00:00.000Z" - convertedVal = "2022-05-05T00:00:00.000Z" - newColumnVal, ok = HandleSchemaChange("string", "datetime", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = `{"json":true}` - convertedVal = `{"json":true}` - newColumnVal, ok = HandleSchemaChange("string", "json", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - }) - - It("should send json string values if existing datatype is json", func() { - var newColumnVal, columnVal, convertedVal interface{} - var ok bool - - columnVal = false - convertedVal = "false" - newColumnVal, ok = HandleSchemaChange("json", "boolean", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = 1 - convertedVal = "1" - newColumnVal, ok = HandleSchemaChange("json", "int", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = 1.501 - convertedVal = "1.501" - newColumnVal, ok = HandleSchemaChange("json", "float", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = "2022-05-05T00:00:00.000Z" - convertedVal = `"2022-05-05T00:00:00.000Z"` - newColumnVal, ok = HandleSchemaChange("json", "datetime", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - columnVal = "string value" - convertedVal = `"string value"` - newColumnVal, ok = HandleSchemaChange("json", "string", columnVal) - Expect(newColumnVal).To(Equal(convertedVal)) - Expect(ok).To(BeTrue()) - - var columnArrVal []interface{} - columnArrVal = append(columnArrVal, false, 1, "string value") - newColumnVal, ok = HandleSchemaChange("json", "string", columnArrVal) - Expect(newColumnVal).To(Equal(columnArrVal)) - Expect(ok).To(BeTrue()) - }) - }) - - Context("Discards", func() { - It("existing datatype is boolean", func() { - var newColumnVal, columnVal interface{} - var ok bool - - columnVal = 1 - newColumnVal, ok = HandleSchemaChange("boolean", "int", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = 1.501 - newColumnVal, ok = HandleSchemaChange("boolean", "float", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "string value" - newColumnVal, ok = HandleSchemaChange("boolean", "string", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "2022-05-05T00:00:00.000Z" - newColumnVal, ok = HandleSchemaChange("boolean", "datetime", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = `{"json":true}` - newColumnVal, ok = HandleSchemaChange("boolean", "json", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - }) - - It("existing datatype is int", func() { - var newColumnVal, columnVal interface{} - var ok bool - - columnVal = false - newColumnVal, ok = HandleSchemaChange("int", "boolean", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "string value" - newColumnVal, ok = HandleSchemaChange("int", "string", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "2022-05-05T00:00:00.000Z" - newColumnVal, ok = HandleSchemaChange("int", "datetime", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = `{"json":true}` - newColumnVal, ok = HandleSchemaChange("int", "json", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - }) - - It("existing datatype is float", func() { - var newColumnVal, columnVal interface{} - var ok bool - - columnVal = false - newColumnVal, ok = HandleSchemaChange("float", "boolean", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "string value" - newColumnVal, ok = HandleSchemaChange("float", "string", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "2022-05-05T00:00:00.000Z" - newColumnVal, ok = HandleSchemaChange("float", "datetime", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = `{"json":true}` - newColumnVal, ok = HandleSchemaChange("float", "json", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - }) - - It("existing datatype is datetime", func() { - var newColumnVal, columnVal interface{} - var ok bool - columnVal = false - newColumnVal, ok = HandleSchemaChange("datetime", "boolean", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = "string value" - newColumnVal, ok = HandleSchemaChange("datetime", "string", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = 1 - newColumnVal, ok = HandleSchemaChange("datetime", "int", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = 1.501 - newColumnVal, ok = HandleSchemaChange("datetime", "float", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - - columnVal = `{"json":true}` - newColumnVal, ok = HandleSchemaChange("datetime", "json", columnVal) - Expect(newColumnVal).To(BeNil()) - Expect(ok).To(BeFalse()) - }) +func TestHandleSchemaChange(t *testing.T) { + inputs := []struct { + name string + existingDatatype string + currentDataType string + value any + + newColumnVal any + convError error + }{ + { + name: "should send int values if existing datatype is int, new datatype is float", + existingDatatype: "int", + currentDataType: "float", + value: 1.501, + newColumnVal: 1, + }, + { + name: "should send float values if existing datatype is float, new datatype is int", + existingDatatype: "float", + currentDataType: "int", + value: 1, + newColumnVal: 1.0, + }, + { + name: "should send string values if existing datatype is string, new datatype is boolean", + existingDatatype: "string", + currentDataType: "boolean", + value: false, + newColumnVal: "false", + }, + { + name: "should send string values if existing datatype is string, new datatype is int", + existingDatatype: "string", + currentDataType: "int", + value: 1, + newColumnVal: "1", + }, + { + name: "should send string values if existing datatype is string, new datatype is float", + existingDatatype: "string", + currentDataType: "float", + value: 1.501, + newColumnVal: "1.501", + }, + { + name: "should send string values if existing datatype is string, new datatype is datetime", + existingDatatype: "string", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + newColumnVal: "2022-05-05T00:00:00.000Z", + }, + { + name: "should send string values if existing datatype is string, new datatype is string", + existingDatatype: "string", + currentDataType: "json", + value: `{"json":true}`, + newColumnVal: `{"json":true}`, + }, + { + name: "should send json string values if existing datatype is json, new datatype is boolean", + existingDatatype: "json", + currentDataType: "boolean", + value: false, + newColumnVal: "false", + }, + { + name: "should send json string values if existing datatype is jso, new datatype is int", + existingDatatype: "json", + currentDataType: "int", + value: 1, + newColumnVal: "1", + }, + { + name: "should send json string values if existing datatype is json, new datatype is float", + existingDatatype: "json", + currentDataType: "float", + value: 1.501, + newColumnVal: "1.501", + }, + { + name: "should send json string values if existing datatype is json, new datatype is json", + existingDatatype: "json", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + newColumnVal: `"2022-05-05T00:00:00.000Z"`, + }, + { + name: "should send json string values if existing datatype is json, new datatype is string", + existingDatatype: "json", + currentDataType: "string", + value: "string value", + newColumnVal: `"string value"`, + }, + { + name: "should send json string values if existing datatype is json, new datatype is array", + existingDatatype: "json", + currentDataType: "array", + value: []any{false, 1, "string value"}, + newColumnVal: []any{false, 1, "string value"}, + }, + { + name: "existing datatype is boolean, new datatype is int", + existingDatatype: "boolean", + currentDataType: "int", + value: 1, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is float", + existingDatatype: "boolean", + currentDataType: "float", + value: 1.501, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is string", + existingDatatype: "boolean", + currentDataType: "string", + value: "string value", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is datetime", + existingDatatype: "boolean", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is json", + existingDatatype: "boolean", + currentDataType: "json", + value: `{"json":true}`, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is boolean", + existingDatatype: "int", + currentDataType: "boolean", + value: false, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is string", + existingDatatype: "int", + currentDataType: "string", + value: "string value", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is datetime", + existingDatatype: "int", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is json", + existingDatatype: "int", + currentDataType: "json", + value: `{"json":true}`, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is float", + existingDatatype: "int", + currentDataType: "float", + value: 1, + convError: ErrIncompatibleSchemaConversion, + }, + { + name: "existing datatype is float, new datatype is boolean", + existingDatatype: "float", + currentDataType: "boolean", + value: false, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is float, new datatype is int", + existingDatatype: "float", + currentDataType: "int", + value: 1.0, + convError: ErrIncompatibleSchemaConversion, + }, + { + name: "existing datatype is float, new datatype is string", + existingDatatype: "float", + currentDataType: "string", + value: "string value", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is float, new datatype is datetime", + existingDatatype: "float", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is float, new datatype is json", + existingDatatype: "float", + currentDataType: "json", + value: `{"json":true}`, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is boolean", + existingDatatype: "datetime", + currentDataType: "boolean", + value: false, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is string", + existingDatatype: "datetime", + currentDataType: "string", + value: "string value", + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is int", + existingDatatype: "datetime", + currentDataType: "int", + value: 1, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is float", + existingDatatype: "datetime", + currentDataType: "float", + value: 1.501, + convError: ErrSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is json", + existingDatatype: "datetime", + currentDataType: "json", + value: `{"json":true}`, + convError: ErrSchemaConversionNotSupported, + }, + } + for _, ip := range inputs { + tc := ip + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + newColumnVal, convError := HandleSchemaChange( + model.SchemaType(tc.existingDatatype), + model.SchemaType(tc.currentDataType), + tc.value, + ) + require.Equal(t, newColumnVal, tc.newColumnVal) + require.Equal(t, convError, tc.convError) }) - }) + } +} +var _ = Describe("Schema", func() { DescribeTable("Get table schema diff", func(tableName string, currentSchema, uploadSchema warehouseutils.SchemaT, expected warehouseutils.TableSchemaDiffT) { Expect(getTableSchemaDiff(tableName, currentSchema, uploadSchema)).To(Equal(expected)) }, diff --git a/warehouse/slave.go b/warehouse/slave.go index 05f840dcff..9620368fe2 100644 --- a/warehouse/slave.go +++ b/warehouse/slave.go @@ -14,6 +14,8 @@ import ( "strings" "time" + "github.com/rudderlabs/rudder-server/warehouse/model" + "github.com/rudderlabs/rudder-server/config" "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/services/pgnotifier" @@ -479,7 +481,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l columnType := columnInfo.Type columnVal := columnInfo.Value - if columnType == "int" || columnType == "bigint" { + if model.SchemaType(columnType) == model.IntDataType || model.SchemaType(columnType) == model.BigIntDataType { floatVal, ok := columnVal.(float64) if !ok { eventLoader.AddEmptyColumn(columnName) @@ -491,8 +493,12 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l dataTypeInSchema, ok := job.UploadSchema[tableName][columnName] violatedConstraints := ViolatedConstraints(job.DestinationType, &batchRouterEvent, columnName) if ok && ((columnType != dataTypeInSchema) || (violatedConstraints.IsViolated)) { - newColumnVal, ok := HandleSchemaChange(dataTypeInSchema, columnType, columnVal) - if !ok || violatedConstraints.IsViolated { + newColumnVal, convError := HandleSchemaChange( + model.SchemaType(dataTypeInSchema), + model.SchemaType(columnType), + columnVal, + ) + if convError != nil || violatedConstraints.IsViolated { if violatedConstraints.IsViolated { eventLoader.AddColumn(columnName, job.UploadSchema[tableName][columnName], violatedConstraints.ViolatedIdentifier) } else { @@ -514,10 +520,6 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l jobRun.tableEventCountMap[discardsTable]++ continue } - if newColumnVal == nil { - eventLoader.AddEmptyColumn(columnName) - continue - } columnVal = newColumnVal }