Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): unrecognized schema in warehouse #2638

Merged
merged 10 commits into from
Nov 3, 2022
11 changes: 9 additions & 2 deletions warehouse/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (as *HandleT) dropDanglingStagingTables() bool {
}

// FetchSchema queries SYNAPSE and returns the schema associated with provided namespace
func (as *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
func (as *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unRecognizedSchema warehouseutils.SchemaT, err error) {
as.Warehouse = warehouse
as.Namespace = warehouse.Namespace
dbHandle, err := connect(as.getConnectionCredentials())
Expand All @@ -785,6 +785,8 @@ func (as *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
defer dbHandle.Close()

schema = make(warehouseutils.SchemaT)
unRecognizedSchema = make(warehouseutils.SchemaT)

sqlStatement := fmt.Sprintf(`
SELECT
table_name,
Expand All @@ -807,7 +809,7 @@ func (as *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
}
if err == io.EOF {
pkgLogger.Infof("AZ: No rows, while fetching schema from destination:%v, query: %v", as.Warehouse.Identifier, sqlStatement)
return schema, nil
return schema, unRecognizedSchema, nil
}
defer rows.Close()
for rows.Next() {
Expand All @@ -823,6 +825,11 @@ func (as *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
if datatype, ok := mssqlDataTypesMapToRudder[cType]; ok {
schema[tName][cName] = datatype
} else {
if _, ok := unRecognizedSchema[tName]; !ok {
unRecognizedSchema[tName] = make(map[string]string)
}
unRecognizedSchema[tName][cName] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &as.Warehouse, warehouseutils.Tag{Name: "datatype", Value: cType}).Count(1)
}
}
Expand Down
15 changes: 11 additions & 4 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func (*HandleT) AlterColumn(_, _, _ string) (err error) {
}

// FetchSchema queries bigquery and returns the schema associated with provided namespace
func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unRecognizedSchema warehouseutils.SchemaT, err error) {
bq.warehouse = warehouse
bq.namespace = warehouse.Namespace
bq.projectID = strings.TrimSpace(warehouseutils.GetConfigValue(GCPProjectID, bq.warehouse))
Expand All @@ -860,6 +860,8 @@ func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
defer dbClient.Close()

schema = make(warehouseutils.SchemaT)
unRecognizedSchema = make(warehouseutils.SchemaT)

sqlStatement := fmt.Sprintf(`
SELECT
t.table_name,
Expand All @@ -885,10 +887,10 @@ func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
// if dataset resource is not found, return empty schema
if e.Code == 404 {
pkgLogger.Infof("BQ: No rows, while fetching schema from destination:%v, query: %v", bq.warehouse.Identifier, query)
return schema, nil
return schema, unRecognizedSchema, nil
}
pkgLogger.Errorf("BQ: Error in fetching schema from bigquery destination:%v, query: %v", bq.warehouse.Destination.ID, query)
return schema, e
return schema, unRecognizedSchema, e
}
pkgLogger.Errorf("BQ: Error in fetching schema from bigquery destination:%v, query: %v", bq.warehouse.Destination.ID, query)
return
Expand All @@ -902,7 +904,7 @@ func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
}
if err != nil {
pkgLogger.Errorf("BQ: Error in processing fetched schema from redshift destination:%v, error: %v", bq.warehouse.Destination.ID, err)
return nil, err
return nil, nil, err
}
var tName, cName, cType string
tName, _ = values[0].(string)
Expand All @@ -915,6 +917,11 @@ func (bq *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
// lower case all column names from bigquery
schema[tName][strings.ToLower(cName)] = datatype
} else {
if _, ok := unRecognizedSchema[tName]; !ok {
unRecognizedSchema[tName] = make(map[string]string)
}
unRecognizedSchema[tName][strings.ToLower(cName)] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &bq.warehouse, warehouseutils.Tag{Name: "datatype", Value: cType}).Count(1)
}
}
Expand Down
13 changes: 10 additions & 3 deletions warehouse/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ func (*HandleT) CrashRecover(_ warehouseutils.Warehouse) (err error) {
}

// FetchSchema queries clickhouse and returns the schema associated with provided namespace
func (ch *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
func (ch *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unRecognizedSchema warehouseutils.SchemaT, err error) {
ch.Warehouse = warehouse
ch.Namespace = warehouse.Namespace
dbHandle, err := Connect(ch.getConnectionCredentials(), true)
Expand All @@ -937,6 +937,8 @@ func (ch *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
defer dbHandle.Close()

schema = make(warehouseutils.SchemaT)
unRecognizedSchema = make(warehouseutils.SchemaT)

sqlStatement := fmt.Sprintf(`select table, name, type
from system.columns
where database = '%s'`, ch.Namespace)
Expand All @@ -945,14 +947,14 @@ func (ch *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
if err != nil && err != sql.ErrNoRows {
if exception, ok := err.(*clickhouse.Exception); ok && exception.Code == 81 {
pkgLogger.Infof("CH: No database found while fetching schema: %s from destination:%v, query: %v", ch.Namespace, ch.Warehouse.Destination.Name, sqlStatement)
return schema, nil
return schema, unRecognizedSchema, nil
}
pkgLogger.Errorf("CH: Error in fetching schema from clickhouse destination:%v, query: %v", ch.Warehouse.Destination.ID, sqlStatement)
return
}
if err == sql.ErrNoRows {
pkgLogger.Infof("CH: No rows, while fetching schema: %s from destination:%v, query: %v", ch.Namespace, ch.Warehouse.Destination.Name, sqlStatement)
return schema, nil
return schema, unRecognizedSchema, nil
}
defer rows.Close()
for rows.Next() {
Expand All @@ -968,6 +970,11 @@ func (ch *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
if datatype, ok := clickhouseDataTypesMapToRudder[cType]; ok {
schema[tName][cName] = datatype
} else {
if _, ok := unRecognizedSchema[tName]; !ok {
unRecognizedSchema[tName] = make(map[string]string)
}
unRecognizedSchema[tName][cName] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &ch.Warehouse, warehouseutils.Tag{Name: "datatype", Value: cType}).Count(1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/datalake/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (*HandleT) CrashRecover(_ warehouseutils.Warehouse) (err error) {
return nil
}

func (wh *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, error) {
func (wh *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error) {
return wh.SchemaRepository.FetchSchema(warehouse)
}

Expand Down
14 changes: 10 additions & 4 deletions warehouse/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func NewGlueSchemaRepository(wh warehouseutils.Warehouse) (*GlueSchemaRepository
return &gl, nil
}

func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, error) {
func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error) {
schema := warehouseutils.SchemaT{}
unRecognizedSchema := warehouseutils.SchemaT{}
var err error

var getTablesOutput *glue.GetTablesOutput
Expand All @@ -64,7 +65,7 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
pkgLogger.Debugf("FetchSchema: database %s not found in glue. returning empty schema", warehouse.Namespace)
err = nil
}
return schema, err
return schema, unRecognizedSchema, err
}

for _, table := range getTablesOutput.TableList {
Expand All @@ -78,6 +79,11 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
if _, ok := dataTypesMapToRudder[*col.Type]; ok {
schema[tableName][*col.Name] = dataTypesMapToRudder[*col.Type]
} else {
if _, ok := unRecognizedSchema[tableName]; !ok {
unRecognizedSchema[tableName] = make(map[string]string)
}
unRecognizedSchema[tableName][*col.Name] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &warehouse, warehouseutils.Tag{Name: "datatype", Value: *col.Type}).Count(1)
}
}
Expand All @@ -90,7 +96,7 @@ func (gl *GlueSchemaRepository) FetchSchema(warehouse warehouseutils.Warehouse)
}
}

return schema, err
return schema, unRecognizedSchema, err
}

func (gl *GlueSchemaRepository) CreateSchema() (err error) {
Expand Down Expand Up @@ -137,7 +143,7 @@ func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []wareh
}

// fetch schema from glue
schema, err := gl.FetchSchema(gl.Warehouse)
schema, _, err := gl.FetchSchema(gl.Warehouse)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions warehouse/datalake/schema-repository/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ func NewLocalSchemaRepository(wh warehouseutils.Warehouse, uploader warehouseuti
return &ls, nil
}

func (ls *LocalSchemaRepository) FetchSchema(_ warehouseutils.Warehouse) (warehouseutils.SchemaT, error) {
func (ls *LocalSchemaRepository) FetchSchema(_ warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error) {
schema := ls.uploader.GetLocalSchema()
if schema == nil {
schema = warehouseutils.SchemaT{}
}
return schema, nil
return schema, warehouseutils.SchemaT{}, nil
}

func (*LocalSchemaRepository) CreateSchema() (err error) {
Expand All @@ -34,7 +34,7 @@ func (*LocalSchemaRepository) CreateSchema() (err error) {

func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[string]string) (err error) {
// fetch schema from local db
schema, err := ls.FetchSchema(ls.warehouse)
schema, _, err := ls.FetchSchema(ls.warehouse)
if err != nil {
return err
}
Expand All @@ -52,7 +52,7 @@ func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[str

func (ls *LocalSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
// fetch schema from local db
schema, err := ls.FetchSchema(ls.warehouse)
schema, _, err := ls.FetchSchema(ls.warehouse)
if err != nil {
return err
}
Expand All @@ -72,7 +72,7 @@ func (ls *LocalSchemaRepository) AddColumns(tableName string, columnsInfo []ware

func (ls *LocalSchemaRepository) AlterColumn(tableName, columnName, columnType string) (err error) {
// fetch schema from local db
schema, err := ls.FetchSchema(ls.warehouse)
schema, _, err := ls.FetchSchema(ls.warehouse)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/datalake/schema-repository/schema-repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
)

type SchemaRepository interface {
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, error)
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error)
CreateSchema() (err error)
CreateTable(tableName string, columnMap map[string]string) (err error)
AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
Expand Down
12 changes: 9 additions & 3 deletions warehouse/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ func (*HandleT) AlterColumn(_, _, _ string) (err error) {
}

// FetchSchema queries delta lake and returns the schema associated with provided namespace
func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unRecognizedSchema warehouseutils.SchemaT, err error) {
dl.Warehouse = warehouse
dl.Namespace = warehouse.Namespace
dbHandle, err := dl.connectToWarehouse()
Expand All @@ -932,6 +932,7 @@ func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh

// Schema Initialization
schema = make(warehouseutils.SchemaT)
unRecognizedSchema = make(warehouseutils.SchemaT)

// Fetching the tables
tableNames, err := dl.fetchTables(dbHandle, dl.Namespace)
Expand Down Expand Up @@ -970,10 +971,10 @@ func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
Table: tableName,
})
if err != nil {
return schema, fmt.Errorf("%s Error while fetching table attributes: %v", dl.GetLogIdentifier(), err)
return schema, unRecognizedSchema, fmt.Errorf("%s Error while fetching table attributes: %v", dl.GetLogIdentifier(), err)
}
if !checkAndIgnoreAlreadyExistError(fetchTableAttributesResponse.GetErrorCode(), tableOrViewNotFound) {
return schema, fmt.Errorf("%s Error while fetching table attributes with response: %v", dl.GetLogIdentifier(), fetchTableAttributesResponse.GetErrorMessage())
return schema, unRecognizedSchema, fmt.Errorf("%s Error while fetching table attributes with response: %v", dl.GetLogIdentifier(), fetchTableAttributesResponse.GetErrorMessage())
}

// Populating the schema for the table
Expand All @@ -988,6 +989,11 @@ func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
if datatype, ok := dataTypesMapToRudder[item.GetDataType()]; ok {
schema[tableName][item.GetColName()] = datatype
} else {
if _, ok := unRecognizedSchema[tableName]; !ok {
unRecognizedSchema[tableName] = make(map[string]string)
}
unRecognizedSchema[tableName][item.GetColName()] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &dl.Warehouse, warehouseutils.Tag{Name: "datatype", Value: item.GetDataType()}).Count(1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse
}
job.schemaHandle = &schemaHandle

job.schemaHandle.schemaInWarehouse, err = whManager.FetchSchema(job.warehouse)
job.schemaHandle.schemaInWarehouse, job.schemaHandle.unRecognizedSchemaInWarehouse, err = whManager.FetchSchema(job.warehouse)
if err != nil {
pkgLogger.Errorf(`[WH]: Failed fetching schema from warehouse: %v`, err)
job.setUploadError(err, Aborted)
Expand Down
2 changes: 1 addition & 1 deletion warehouse/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
type ManagerI interface {
Setup(warehouse warehouseutils.Warehouse, uploader warehouseutils.UploaderI) error
CrashRecover(warehouse warehouseutils.Warehouse) (err error)
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, error)
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, warehouseutils.SchemaT, error)
CreateSchema() (err error)
CreateTable(tableName string, columnMap map[string]string) (err error)
AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
Expand Down
11 changes: 9 additions & 2 deletions warehouse/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func (ms *HandleT) dropDanglingStagingTables() bool {
}

// FetchSchema queries mssql and returns the schema associated with provided namespace
func (ms *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
func (ms *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unRecognizedSchema warehouseutils.SchemaT, err error) {
ms.Warehouse = warehouse
ms.Namespace = warehouse.Namespace
dbHandle, err := Connect(ms.getConnectionCredentials())
Expand All @@ -826,6 +826,8 @@ func (ms *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
defer dbHandle.Close()

schema = make(warehouseutils.SchemaT)
unRecognizedSchema = make(warehouseutils.SchemaT)

sqlStatement := fmt.Sprintf(`
SELECT
table_name,
Expand All @@ -847,7 +849,7 @@ func (ms *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
}
if err == io.EOF {
pkgLogger.Infof("MS: No rows, while fetching schema from destination:%v, query: %v", ms.Warehouse.Identifier, sqlStatement)
return schema, nil
return schema, unRecognizedSchema, nil
}
defer rows.Close()
for rows.Next() {
Expand All @@ -863,6 +865,11 @@ func (ms *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
if datatype, ok := mssqlDataTypesMapToRudder[cType]; ok {
schema[tName][cName] = datatype
} else {
if _, ok := unRecognizedSchema[tName]; !ok {
unRecognizedSchema[tName] = make(map[string]string)
}
unRecognizedSchema[tName][cName] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &ms.Warehouse, warehouseutils.Tag{Name: "datatype", Value: cType}).Count(1)
}
}
Expand Down
11 changes: 9 additions & 2 deletions warehouse/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func (pg *HandleT) dropDanglingStagingTables() bool {
}

// FetchSchema queries postgres and returns the schema associated with provided namespace
func (pg *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema warehouseutils.SchemaT, err error) {
func (pg *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema, unRecognizedSchema warehouseutils.SchemaT, err error) {
pg.Warehouse = warehouse
pg.Namespace = warehouse.Namespace
dbHandle, err := Connect(pg.getConnectionCredentials())
Expand All @@ -793,6 +793,8 @@ func (pg *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
defer dbHandle.Close()

schema = make(warehouseutils.SchemaT)
unRecognizedSchema = make(warehouseutils.SchemaT)

sqlStatement := `
SELECT
table_name,
Expand All @@ -815,7 +817,7 @@ func (pg *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
}
if err == sql.ErrNoRows {
pkgLogger.Infof("PG: No rows, while fetching schema from destination:%v, query: %v", pg.Warehouse.Identifier, sqlStatement)
return schema, nil
return schema, unRecognizedSchema, nil
}
defer rows.Close()
for rows.Next() {
Expand All @@ -832,6 +834,11 @@ func (pg *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
if datatype, ok := postgresDataTypesMapToRudder[cType.String]; ok {
schema[tName.String][cName.String] = datatype
} else {
if _, ok := unRecognizedSchema[tName.String]; !ok {
unRecognizedSchema[tName.String] = make(map[string]string)
}
unRecognizedSchema[tName.String][cType.String] = warehouseutils.MISSING_DATATYPE

warehouseutils.WHCounterStat(warehouseutils.RUDDER_MISSING_DATATYPE, &pg.Warehouse, warehouseutils.Tag{Name: "datatype", Value: cType.String}).Count(1)
}
}
Expand Down
Loading