-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(warehouse): added support for warehouse column count limit #2723
Changes from 7 commits
81c70f4
8b40519
ba0bb46
5014cd9
557fda5
c76286a
629e3eb
c7bf70c
d210084
9fc6530
b5a3345
00c611b
2f9c4c8
983f443
a3ae675
ca99359
d62da7a
4b34825
eea720c
7a7d4da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,6 +119,7 @@ type UploadJobT struct { | |
hasAllTablesSkipped bool | ||
tableUploadStatuses []*TableUploadStatusT | ||
destinationValidator validations.DestinationValidator | ||
stats stats.Stats | ||
} | ||
|
||
type UploadColumnT struct { | ||
|
@@ -145,8 +146,8 @@ var ( | |
) | ||
|
||
var ( | ||
maxParallelLoads map[string]int | ||
columnCountThresholds map[string]int | ||
maxParallelLoads map[string]int | ||
columnCountLimitMap map[string]int | ||
) | ||
|
||
func Init() { | ||
|
@@ -167,15 +168,15 @@ func setMaxParallelLoads() { | |
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3), | ||
warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3), | ||
} | ||
columnCountThresholds = map[string]int{ | ||
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800), | ||
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000), | ||
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800), | ||
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800), | ||
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200), | ||
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200), | ||
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not planning to add this for SNOWFLAKE because in case of SNOWFLAKE we have a limit on the row size to be 16MB and this will most likely not meet because of the payload max size we have. |
||
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000), | ||
columnCountLimitMap = map[string]int{ | ||
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024), | ||
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000), | ||
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000), | ||
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024), | ||
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600), | ||
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600), | ||
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountLimit", 5000), | ||
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000), | ||
} | ||
} | ||
|
||
|
@@ -204,7 +205,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} { | |
case <-time.After(longRunningUploadStatThresholdInMin): | ||
pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier) | ||
|
||
stats.Default.NewTaggedStat( | ||
job.stats.NewTaggedStat( | ||
"warehouse.long_running_upload", | ||
stats.CountType, | ||
stats.Tags{ | ||
|
@@ -1067,11 +1068,31 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) { | |
job.recordTableLoad(tName, numEvents) | ||
} | ||
|
||
if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok { | ||
columnCount := len(job.schemaHandle.schemaInWarehouse[tName]) | ||
if columnCount > columnThreshold { | ||
job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount) | ||
job.columnCountStat(tName) | ||
|
||
return | ||
} | ||
|
||
func (job *UploadJobT) columnCountStat(tableName string) { | ||
var ( | ||
columnCountLimit int | ||
ok bool | ||
) | ||
if columnCountLimit, ok = columnCountLimitMap[job.warehouse.Type]; !ok { | ||
return | ||
} | ||
currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName]) | ||
if currentColumnsCount > int(float64(columnCountLimit)*columnCountLimitThreshold) { | ||
tags := []tag{ | ||
{ | ||
name: "tableName", value: strings.ToLower(tableName), | ||
}, | ||
{ | ||
name: "columnCountLimit", value: strconv.Itoa(columnCountLimit), | ||
}, | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to report two metrics?
And instead of computing the threshold here, we compute it on the alert manager, by doing a similar operation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add other identifiers as well, like workspaceId and destinationId ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate the motivation behind computing it at the alert manager rather than here ? @lvrach There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We are already adding these inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since the threshold dictates whether an alert should be sent, it is more natural for me to have it on the alert manager. It makes it easier to change it. Also, the fact that we add
If you want to visualise the limit as a line in grafana it is easier if it's metric. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I still feel this information is not much useful. Recording in separate stat and adding tags warehouseId, tableName for an almost constant value (changes only on destination type) would be tedious. Adds computational cost to kapacitor or prometheus executor(future) too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better for debugging and visualization. |
||
job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount) | ||
} | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed these 2 tests because it does not test any behavior.