Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): added support for warehouse column count limit #2723

Merged
merged 20 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
81c70f4
fix(warehouse): increase async job timeout to 180 seconds
achettyiitr Nov 22, 2022
8b40519
feat(warehouse): added support for warehouse column count limit
achettyiitr Nov 23, 2022
ba0bb46
Merge branch 'chore.warehouse-column-count' of github.com:rudderlabs/…
achettyiitr Nov 24, 2022
5014cd9
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 24, 2022
557fda5
review comments
achettyiitr Nov 24, 2022
c76286a
review comments
achettyiitr Nov 24, 2022
629e3eb
master pull
achettyiitr Nov 24, 2022
c7bf70c
some more changes
achettyiitr Nov 24, 2022
d210084
For data lakes, we should skip column count alert.
achettyiitr Nov 24, 2022
9fc6530
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 24, 2022
b5a3345
some more changes
achettyiitr Nov 25, 2022
00c611b
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 27, 2022
2f9c4c8
master pull
achettyiitr Nov 27, 2022
983f443
Merge branch 'master' into chore.warehouse-column-count
achettyiitr Nov 28, 2022
a3ae675
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 29, 2022
ca99359
added memstats for column count limit changes.
achettyiitr Nov 29, 2022
d62da7a
make fmt changes
achettyiitr Nov 29, 2022
4b34825
Merge branch 'master' into chore.warehouse-column-count
achettyiitr Nov 30, 2022
eea720c
code cleanup
achettyiitr Nov 30, 2022
7a7d4da
Merge branch 'chore.warehouse-column-count' of github.com:rudderlabs/…
achettyiitr Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions warehouse/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse
dbHandle: wh.dbHandle,
pgNotifier: &wh.notifier,
destinationValidator: validations.NewDestinationValidator(),
stats: stats.Default,
}

tableUploadsCreated := areTableUploadsCreated(job.upload.ID)
Expand Down
6 changes: 5 additions & 1 deletion warehouse/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/services/stats"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/rudder-server/config"
Expand Down Expand Up @@ -48,6 +50,7 @@ type JobRunT struct {
tableEventCountMap map[string]int
stagingFileReader *gzip.Reader
whIdentifier string
stats stats.Stats
}

func (jobRun *JobRunT) setStagingFileReader() (reader *gzip.Reader, endOfFile bool) {
Expand Down Expand Up @@ -387,6 +390,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l
jobRun := JobRunT{
job: job,
whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stats.Default,
}

defer jobRun.counterStat("staging_files_processed", tag{name: "worker_id", value: strconv.Itoa(workerIndex)}).Count(1)
Expand Down Expand Up @@ -455,7 +459,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l
tableName := batchRouterEvent.Metadata.Table
columnData := batchRouterEvent.Data

if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountThresholds[warehouseutils.S3_DATALAKE] {
if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountLimitMap[warehouseutils.S3_DATALAKE] {
pkgLogger.Errorf("[WH]: Huge staging file columns : columns in upload schema: %v for StagingFileID: %v", len(sortedTableColumnMap[tableName]), job.StagingFileID)
return nil, fmt.Errorf("staging file schema limit exceeded for stagingFileID: %d, actualCount: %d", job.StagingFileID, len(sortedTableColumnMap[tableName]))
}
Expand Down
10 changes: 5 additions & 5 deletions warehouse/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.TimerType, tags)
return job.stats.NewTaggedStat(name, stats.TimerType, tags)
}

func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -59,7 +59,7 @@ func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurem
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.CountType, tags)
return job.stats.NewTaggedStat(name, stats.CountType, tags)
}

func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -75,7 +75,7 @@ func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.GaugeType, tags)
return job.stats.NewTaggedStat(name, stats.GaugeType, tags)
}

func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -90,7 +90,7 @@ func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.TimerType, tags)
return jobRun.stats.NewTaggedStat(name, stats.TimerType, tags)
}

func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -105,7 +105,7 @@ func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurem
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.CountType, tags)
return jobRun.stats.NewTaggedStat(name, stats.CountType, tags)
}

func (job *UploadJobT) generateUploadSuccessMetrics() {
Expand Down
60 changes: 8 additions & 52 deletions warehouse/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
. "github.com/onsi/gomega"

"github.com/ory/dockertest/v3"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/utils/logger"
Expand All @@ -22,17 +20,11 @@ import (

var _ = Describe("Stats", Ordered, func() {
var (
sourceID = "test-sourceID"
destinationID = "test-destinationID"
destinationType = "test-desinationType"
destinationName = "test-destinationName"
sourceName = "test-sourceName"
statName = "test-statName"
g = GinkgoT()
pgResource *destination.PostgresResource
err error
uploadID = int64(1)
cleanup = &testhelper.Cleanup{}
g = GinkgoT()
pgResource *destination.PostgresResource
err error
uploadID = int64(1)
cleanup = &testhelper.Cleanup{}
)

BeforeAll(func() {
Expand All @@ -59,39 +51,6 @@ var _ = Describe("Stats", Ordered, func() {
cleanup.Run()
})

BeforeEach(func() {
defaultStats := stats.Default

DeferCleanup(func() {
stats.Default = defaultStats
})
})

Describe("Jobs stats", func() {
BeforeEach(func() {
mockStats, mockMeasurement := getMockStats(g)
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement)
mockMeasurement.EXPECT().Count(gomock.Any()).AnyTimes()

stats.Default = mockStats
})

It("Upload status stat", func() {
getUploadStatusStat(statName, warehouseutils.Warehouse{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed these 2 tests because it does not test any behavior.

WorkspaceID: "workspaceID",
Source: backendconfig.SourceT{ID: sourceID, Name: sourceName},
Destination: backendconfig.DestinationT{ID: destinationID, Name: destinationName},
Namespace: "",
Type: destinationType,
Identifier: "",
})
})

It("Persist ssl file error stat", func() {
persistSSLFileErrorStat("workspaceID", destinationType, destinationName, destinationID, sourceName, sourceID, "")
})
})

Describe("Generate upload success/aborted metrics", func() {
var job *UploadJobT

Expand All @@ -101,8 +60,6 @@ var _ = Describe("Stats", Ordered, func() {
mockMeasurement.EXPECT().Count(4).Times(2)
mockMeasurement.EXPECT().Count(1).Times(1)

stats.Default = mockStats

job = &UploadJobT{
upload: &Upload{
ID: uploadID,
Expand All @@ -112,6 +69,7 @@ var _ = Describe("Stats", Ordered, func() {
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
})

Expand All @@ -130,8 +88,6 @@ var _ = Describe("Stats", Ordered, func() {
mockMeasurement.EXPECT().Count(4).Times(2)
mockMeasurement.EXPECT().Since(gomock.Any()).Times(1)

stats.Default = mockStats

job := &UploadJobT{
upload: &Upload{
WorkspaceID: "workspaceID",
Expand All @@ -142,6 +98,7 @@ var _ = Describe("Stats", Ordered, func() {
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
job.recordTableLoad("tracks", 4)
})
Expand All @@ -151,8 +108,6 @@ var _ = Describe("Stats", Ordered, func() {
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement)
mockMeasurement.EXPECT().SendTiming(gomock.Any()).Times(1)

stats.Default = mockStats

job := &UploadJobT{
upload: &Upload{
ID: uploadID,
Expand All @@ -163,6 +118,7 @@ var _ = Describe("Stats", Ordered, func() {
Type: "POSTGRES",
},
dbHandle: pgResource.DB,
stats: mockStats,
}

err = job.recordLoadFileGenerationTimeStat(1, 4)
Expand Down
53 changes: 37 additions & 16 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type UploadJobT struct {
hasAllTablesSkipped bool
tableUploadStatuses []*TableUploadStatusT
destinationValidator validations.DestinationValidator
stats stats.Stats
}

type UploadColumnT struct {
Expand All @@ -145,8 +146,8 @@ var (
)

var (
maxParallelLoads map[string]int
columnCountThresholds map[string]int
maxParallelLoads map[string]int
columnCountLimitMap map[string]int
)

func Init() {
Expand All @@ -167,15 +168,15 @@ func setMaxParallelLoads() {
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3),
warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3),
}
columnCountThresholds = map[string]int{
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800),
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000),
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800),
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800),
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200),
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200),
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not planning to add this for SNOWFLAKE because in case of SNOWFLAKE we have a limit on the row size to be 16MB and this will most likely not meet because of the payload max size we have.

warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000),
columnCountLimitMap = map[string]int{
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024),
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000),
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000),
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024),
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600),
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600),
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountLimit", 5000),
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000),
}
}

Expand Down Expand Up @@ -204,7 +205,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} {
case <-time.After(longRunningUploadStatThresholdInMin):
pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier)

stats.Default.NewTaggedStat(
job.stats.NewTaggedStat(
"warehouse.long_running_upload",
stats.CountType,
stats.Tags{
Expand Down Expand Up @@ -1067,11 +1068,31 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) {
job.recordTableLoad(tName, numEvents)
}

if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok {
columnCount := len(job.schemaHandle.schemaInWarehouse[tName])
if columnCount > columnThreshold {
job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount)
job.columnCountStat(tName)

return
}

func (job *UploadJobT) columnCountStat(tableName string) {
var (
columnCountLimit int
ok bool
)
if columnCountLimit, ok = columnCountLimitMap[job.warehouse.Type]; !ok {
return
}
currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName])
if currentColumnsCount > int(float64(columnCountLimit)*columnCountLimitThreshold) {
tags := []tag{
{
name: "tableName", value: strings.ToLower(tableName),
},
{
name: "columnCountLimit", value: strconv.Itoa(columnCountLimit),
},
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to report two metrics?

warehouse_load_table_column_count

warehouse_load_table_column_limit

And instead of computing the threshold here, we compute it on the alert manager, by doing a similar operation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add other identifiers as well, like workspaceId and destinationId ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate the motivation behind computing it at the alert manager rather than here ? @lvrach

Copy link
Member Author

@achettyiitr achettyiitr Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add other identifiers as well, like workspaceId and destinationId ?

We are already adding these inside job.counterStat(warehouse_load_table_column_count, tags...).Count(currentColumnsCount)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warehouse_load_table_column_limit This is basically a constant. We should not probably send it as a stat.

Copy link
Member

@lvrach lvrach Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate the motivation behind computing it at the alert manager rather than here? @lvrach

Since the threshold dictates whether an alert should be sent, it is more natural for me to have it on the alert manager. It makes it easier to change it. Also, the fact that we add tableName & columnCountLimit only if the threshold passes prevents us from doing back-testing (going to grafana dashboard and checking when the alert will be triggered with another threshold).

warehouse_load_table_column_limit This is a constant. We should probably not send it as a stat.

If you want to visualise the limit as a line in grafana it is easier if it's metric.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the fact that we add tableName & columnCountLimit only if the threshold passes prevents us from doing back-testing (going to grafana dashboard and checking when the alert will be triggered with another threshold).

Good point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warehouse_load_table_column_limit This is a constant. We should probably not send it as a stat.

If you want to visualise the limit as a line in grafana it is easier if it's metric.

I still feel this information is not much useful. Recording in separate stat and adding tags warehouseId, tableName for an almost constant value (changes only on destination type) would be tedious. Adds computational cost to kapacitor or prometheus executor(future) too.
wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better for debugging and visualization.

job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount)
}
return
}
Expand Down
Loading