-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(warehouse): added support for warehouse column count limit #2723
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
81c70f4
fix(warehouse): increase async job timeout to 180 seconds
achettyiitr 8b40519
feat(warehouse): added support for warehouse column count limit
achettyiitr ba0bb46
Merge branch 'chore.warehouse-column-count' of github.com:rudderlabs/…
achettyiitr 5014cd9
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr 557fda5
review comments
achettyiitr c76286a
review comments
achettyiitr 629e3eb
master pull
achettyiitr c7bf70c
some more changes
achettyiitr d210084
For data lakes, we should skip column count alert.
achettyiitr 9fc6530
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr b5a3345
some more changes
achettyiitr 00c611b
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr 2f9c4c8
master pull
achettyiitr 983f443
Merge branch 'master' into chore.warehouse-column-count
achettyiitr a3ae675
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr ca99359
added memstats for column count limit changes.
achettyiitr d62da7a
make fmt changes
achettyiitr 4b34825
Merge branch 'master' into chore.warehouse-column-count
achettyiitr eea720c
code cleanup
achettyiitr 7a7d4da
Merge branch 'chore.warehouse-column-count' of github.com:rudderlabs/…
achettyiitr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,6 +119,7 @@ type UploadJobT struct { | |
hasAllTablesSkipped bool | ||
tableUploadStatuses []*TableUploadStatusT | ||
destinationValidator validations.DestinationValidator | ||
stats stats.Stats | ||
} | ||
|
||
type UploadColumnT struct { | ||
|
@@ -145,8 +146,8 @@ var ( | |
) | ||
|
||
var ( | ||
maxParallelLoads map[string]int | ||
columnCountThresholds map[string]int | ||
maxParallelLoads map[string]int | ||
columnCountLimitMap map[string]int | ||
) | ||
|
||
func Init() { | ||
|
@@ -167,15 +168,14 @@ func setMaxParallelLoads() { | |
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3), | ||
warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3), | ||
} | ||
columnCountThresholds = map[string]int{ | ||
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800), | ||
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000), | ||
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800), | ||
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800), | ||
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200), | ||
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200), | ||
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not planning to add this for SNOWFLAKE because in case of SNOWFLAKE we have a limit on the row size to be 16MB and this will most likely not meet because of the payload max size we have. |
||
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000), | ||
columnCountLimitMap = map[string]int{ | ||
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024), | ||
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000), | ||
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000), | ||
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024), | ||
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600), | ||
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600), | ||
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000), | ||
} | ||
} | ||
|
||
|
@@ -204,7 +204,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} { | |
case <-time.After(longRunningUploadStatThresholdInMin): | ||
pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier) | ||
|
||
stats.Default.NewTaggedStat( | ||
job.stats.NewTaggedStat( | ||
"warehouse.long_running_upload", | ||
stats.CountType, | ||
stats.Tags{ | ||
|
@@ -1067,15 +1067,37 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) { | |
job.recordTableLoad(tName, numEvents) | ||
} | ||
|
||
if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok { | ||
columnCount := len(job.schemaHandle.schemaInWarehouse[tName]) | ||
if columnCount > columnThreshold { | ||
job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount) | ||
} | ||
} | ||
job.columnCountStat(tName) | ||
|
||
return | ||
} | ||
|
||
// columnCountStat sent the column count for a table to statsd | ||
// skip sending for S3_DATALAKE, GCS_DATALAKE, AZURE_DATALAKE | ||
func (job *UploadJobT) columnCountStat(tableName string) { | ||
var ( | ||
columnCountLimit int | ||
ok bool | ||
) | ||
|
||
switch job.warehouse.Type { | ||
case warehouseutils.S3_DATALAKE, warehouseutils.GCS_DATALAKE, warehouseutils.AZURE_DATALAKE: | ||
return | ||
} | ||
|
||
if columnCountLimit, ok = columnCountLimitMap[job.warehouse.Type]; !ok { | ||
return | ||
} | ||
|
||
tags := []tag{ | ||
{name: "tableName", value: strings.ToLower(tableName)}, | ||
} | ||
currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName]) | ||
|
||
job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount) | ||
job.counterStat(`warehouse_load_table_column_limit`, tags...).Count(columnCountLimit) | ||
} | ||
|
||
func (job *UploadJobT) loadUserTables(loadFilesTableMap map[tableNameT]bool) ([]error, error) { | ||
var hasLoadFiles bool | ||
userTables := []string{job.identifiesTableName(), job.usersTableName()} | ||
|
@@ -1333,9 +1355,6 @@ func (job *UploadJobT) setUploadStatus(statusOpts UploadStatusOpts) (err error) | |
return err | ||
} | ||
return job.setUploadColumns(uploadColumnOpts) | ||
// return job.setUploadColumns( | ||
// additionalFields..., | ||
// ) | ||
} | ||
|
||
// SetUploadSchema | ||
|
@@ -1345,9 +1364,6 @@ func (job *UploadJobT) setUploadSchema(consolidatedSchema warehouseutils.SchemaT | |
panic(err) | ||
} | ||
job.upload.UploadSchema = consolidatedSchema | ||
// return job.setUploadColumns( | ||
// UploadColumnT{Column: UploadSchemaField, Value: marshalledSchema}, | ||
// ) | ||
return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumnT{{Column: UploadSchemaField, Value: marshalledSchema}}}) | ||
} | ||
|
||
|
@@ -1365,10 +1381,6 @@ func (job *UploadJobT) setLoadFileIDs(startLoadFileID, endLoadFileID int64) erro | |
job.upload.StartLoadFileID = startLoadFileID | ||
job.upload.EndLoadFileID = endLoadFileID | ||
|
||
// return job.setUploadColumns( | ||
// UploadColumnT{Column: UploadStartLoadFileIDField, Value: startLoadFileID}, | ||
// UploadColumnT{Column: UploadEndLoadFileIDField, Value: endLoadFileID}, | ||
// ) | ||
return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumnT{{Column: UploadStartLoadFileIDField, Value: startLoadFileID}, {Column: UploadEndLoadFileIDField, Value: endLoadFileID}}}) | ||
} | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed these 2 tests because it does not test any behavior.