Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): added support for warehouse column count limit #2723

Merged
merged 20 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
81c70f4
fix(warehouse): increase async job timeout to 180 seconds
achettyiitr Nov 22, 2022
8b40519
feat(warehouse): added support for warehouse column count limit
achettyiitr Nov 23, 2022
ba0bb46
Merge branch 'chore.warehouse-column-count' of github.com:rudderlabs/…
achettyiitr Nov 24, 2022
5014cd9
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 24, 2022
557fda5
review comments
achettyiitr Nov 24, 2022
c76286a
review comments
achettyiitr Nov 24, 2022
629e3eb
master pull
achettyiitr Nov 24, 2022
c7bf70c
some more changes
achettyiitr Nov 24, 2022
d210084
For data lakes, we should skip column count alert.
achettyiitr Nov 24, 2022
9fc6530
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 24, 2022
b5a3345
some more changes
achettyiitr Nov 25, 2022
00c611b
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 27, 2022
2f9c4c8
master pull
achettyiitr Nov 27, 2022
983f443
Merge branch 'master' into chore.warehouse-column-count
achettyiitr Nov 28, 2022
a3ae675
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
achettyiitr Nov 29, 2022
ca99359
added memstats for column count limit changes.
achettyiitr Nov 29, 2022
d62da7a
make fmt changes
achettyiitr Nov 29, 2022
4b34825
Merge branch 'master' into chore.warehouse-column-count
achettyiitr Nov 30, 2022
eea720c
code cleanup
achettyiitr Nov 30, 2022
7a7d4da
Merge branch 'chore.warehouse-column-count' of github.com:rudderlabs/…
achettyiitr Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions warehouse/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse
dbHandle: wh.dbHandle,
pgNotifier: &wh.notifier,
destinationValidator: validations.NewDestinationValidator(),
stats: stats.Default,
}

tableUploadsCreated := areTableUploadsCreated(job.upload.ID)
Expand Down
6 changes: 5 additions & 1 deletion warehouse/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/services/stats"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/rudder-server/config"
Expand Down Expand Up @@ -48,6 +50,7 @@ type JobRunT struct {
tableEventCountMap map[string]int
stagingFileReader *gzip.Reader
whIdentifier string
stats stats.Stats
}

func (jobRun *JobRunT) setStagingFileReader() (reader *gzip.Reader, endOfFile bool) {
Expand Down Expand Up @@ -387,6 +390,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l
jobRun := JobRunT{
job: job,
whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stats.Default,
}

defer jobRun.counterStat("staging_files_processed", tag{name: "worker_id", value: strconv.Itoa(workerIndex)}).Count(1)
Expand Down Expand Up @@ -455,7 +459,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l
tableName := batchRouterEvent.Metadata.Table
columnData := batchRouterEvent.Data

if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountThresholds[warehouseutils.S3_DATALAKE] {
if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountLimitMap[warehouseutils.S3_DATALAKE] {
pkgLogger.Errorf("[WH]: Huge staging file columns : columns in upload schema: %v for StagingFileID: %v", len(sortedTableColumnMap[tableName]), job.StagingFileID)
return nil, fmt.Errorf("staging file schema limit exceeded for stagingFileID: %d, actualCount: %d", job.StagingFileID, len(sortedTableColumnMap[tableName]))
}
Expand Down
10 changes: 5 additions & 5 deletions warehouse/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.TimerType, tags)
return job.stats.NewTaggedStat(name, stats.TimerType, tags)
}

func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -59,7 +59,7 @@ func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurem
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.CountType, tags)
return job.stats.NewTaggedStat(name, stats.CountType, tags)
}

func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -75,7 +75,7 @@ func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.GaugeType, tags)
return job.stats.NewTaggedStat(name, stats.GaugeType, tags)
}

func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -90,7 +90,7 @@ func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measuremen
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.TimerType, tags)
return jobRun.stats.NewTaggedStat(name, stats.TimerType, tags)
}

func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurement {
Expand All @@ -105,7 +105,7 @@ func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurem
for _, extraTag := range extraTags {
tags[extraTag.name] = extraTag.value
}
return stats.Default.NewTaggedStat(name, stats.CountType, tags)
return jobRun.stats.NewTaggedStat(name, stats.CountType, tags)
}

func (job *UploadJobT) generateUploadSuccessMetrics() {
Expand Down
60 changes: 8 additions & 52 deletions warehouse/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
. "github.com/onsi/gomega"

"github.com/ory/dockertest/v3"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
"github.com/rudderlabs/rudder-server/utils/logger"
Expand All @@ -22,17 +20,11 @@ import (

var _ = Describe("Stats", Ordered, func() {
var (
sourceID = "test-sourceID"
destinationID = "test-destinationID"
destinationType = "test-desinationType"
destinationName = "test-destinationName"
sourceName = "test-sourceName"
statName = "test-statName"
g = GinkgoT()
pgResource *destination.PostgresResource
err error
uploadID = int64(1)
cleanup = &testhelper.Cleanup{}
g = GinkgoT()
pgResource *destination.PostgresResource
err error
uploadID = int64(1)
cleanup = &testhelper.Cleanup{}
)

BeforeAll(func() {
Expand All @@ -59,39 +51,6 @@ var _ = Describe("Stats", Ordered, func() {
cleanup.Run()
})

BeforeEach(func() {
defaultStats := stats.Default

DeferCleanup(func() {
stats.Default = defaultStats
})
})

Describe("Jobs stats", func() {
BeforeEach(func() {
mockStats, mockMeasurement := getMockStats(g)
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement)
mockMeasurement.EXPECT().Count(gomock.Any()).AnyTimes()

stats.Default = mockStats
})

It("Upload status stat", func() {
getUploadStatusStat(statName, warehouseutils.Warehouse{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed these 2 tests because it does not test any behavior.

WorkspaceID: "workspaceID",
Source: backendconfig.SourceT{ID: sourceID, Name: sourceName},
Destination: backendconfig.DestinationT{ID: destinationID, Name: destinationName},
Namespace: "",
Type: destinationType,
Identifier: "",
})
})

It("Persist ssl file error stat", func() {
persistSSLFileErrorStat("workspaceID", destinationType, destinationName, destinationID, sourceName, sourceID, "")
})
})

Describe("Generate upload success/aborted metrics", func() {
var job *UploadJobT

Expand All @@ -101,8 +60,6 @@ var _ = Describe("Stats", Ordered, func() {
mockMeasurement.EXPECT().Count(4).Times(2)
mockMeasurement.EXPECT().Count(1).Times(1)

stats.Default = mockStats

job = &UploadJobT{
upload: &Upload{
ID: uploadID,
Expand All @@ -112,6 +69,7 @@ var _ = Describe("Stats", Ordered, func() {
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
})

Expand All @@ -130,8 +88,6 @@ var _ = Describe("Stats", Ordered, func() {
mockMeasurement.EXPECT().Count(4).Times(2)
mockMeasurement.EXPECT().Since(gomock.Any()).Times(1)

stats.Default = mockStats

job := &UploadJobT{
upload: &Upload{
WorkspaceID: "workspaceID",
Expand All @@ -142,6 +98,7 @@ var _ = Describe("Stats", Ordered, func() {
warehouse: warehouseutils.Warehouse{
Type: "POSTGRES",
},
stats: mockStats,
}
job.recordTableLoad("tracks", 4)
})
Expand All @@ -151,8 +108,6 @@ var _ = Describe("Stats", Ordered, func() {
mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement)
mockMeasurement.EXPECT().SendTiming(gomock.Any()).Times(1)

stats.Default = mockStats

job := &UploadJobT{
upload: &Upload{
ID: uploadID,
Expand All @@ -163,6 +118,7 @@ var _ = Describe("Stats", Ordered, func() {
Type: "POSTGRES",
},
dbHandle: pgResource.DB,
stats: mockStats,
}

err = job.recordLoadFileGenerationTimeStat(1, 4)
Expand Down
68 changes: 40 additions & 28 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type UploadJobT struct {
hasAllTablesSkipped bool
tableUploadStatuses []*TableUploadStatusT
destinationValidator validations.DestinationValidator
stats stats.Stats
}

type UploadColumnT struct {
Expand All @@ -145,8 +146,8 @@ var (
)

var (
maxParallelLoads map[string]int
columnCountThresholds map[string]int
maxParallelLoads map[string]int
columnCountLimitMap map[string]int
)

func Init() {
Expand All @@ -167,15 +168,14 @@ func setMaxParallelLoads() {
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3),
warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3),
}
columnCountThresholds = map[string]int{
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800),
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000),
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800),
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800),
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200),
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200),
warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not planning to add this for SNOWFLAKE because in case of SNOWFLAKE we have a limit on the row size to be 16MB and this will most likely not meet because of the payload max size we have.

warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000),
columnCountLimitMap = map[string]int{
warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024),
warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000),
warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000),
warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024),
warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600),
warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600),
warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000),
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} {
case <-time.After(longRunningUploadStatThresholdInMin):
pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier)

stats.Default.NewTaggedStat(
job.stats.NewTaggedStat(
"warehouse.long_running_upload",
stats.CountType,
stats.Tags{
Expand Down Expand Up @@ -1067,15 +1067,37 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) {
job.recordTableLoad(tName, numEvents)
}

if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok {
columnCount := len(job.schemaHandle.schemaInWarehouse[tName])
if columnCount > columnThreshold {
job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount)
}
}
job.columnCountStat(tName)

return
}

// columnCountStat sent the column count for a table to statsd
// skip sending for S3_DATALAKE, GCS_DATALAKE, AZURE_DATALAKE
func (job *UploadJobT) columnCountStat(tableName string) {
var (
columnCountLimit int
ok bool
)

switch job.warehouse.Type {
case warehouseutils.S3_DATALAKE, warehouseutils.GCS_DATALAKE, warehouseutils.AZURE_DATALAKE:
return
}

if columnCountLimit, ok = columnCountLimitMap[job.warehouse.Type]; !ok {
return
}

tags := []tag{
{name: "tableName", value: strings.ToLower(tableName)},
}
currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName])

job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount)
job.counterStat(`warehouse_load_table_column_limit`, tags...).Count(columnCountLimit)
}

func (job *UploadJobT) loadUserTables(loadFilesTableMap map[tableNameT]bool) ([]error, error) {
var hasLoadFiles bool
userTables := []string{job.identifiesTableName(), job.usersTableName()}
Expand Down Expand Up @@ -1333,9 +1355,6 @@ func (job *UploadJobT) setUploadStatus(statusOpts UploadStatusOpts) (err error)
return err
}
return job.setUploadColumns(uploadColumnOpts)
// return job.setUploadColumns(
// additionalFields...,
// )
}

// SetUploadSchema
Expand All @@ -1345,9 +1364,6 @@ func (job *UploadJobT) setUploadSchema(consolidatedSchema warehouseutils.SchemaT
panic(err)
}
job.upload.UploadSchema = consolidatedSchema
// return job.setUploadColumns(
// UploadColumnT{Column: UploadSchemaField, Value: marshalledSchema},
// )
return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumnT{{Column: UploadSchemaField, Value: marshalledSchema}}})
}

Expand All @@ -1365,10 +1381,6 @@ func (job *UploadJobT) setLoadFileIDs(startLoadFileID, endLoadFileID int64) erro
job.upload.StartLoadFileID = startLoadFileID
job.upload.EndLoadFileID = endLoadFileID

// return job.setUploadColumns(
// UploadColumnT{Column: UploadStartLoadFileIDField, Value: startLoadFileID},
// UploadColumnT{Column: UploadEndLoadFileIDField, Value: endLoadFileID},
// )
return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumnT{{Column: UploadStartLoadFileIDField, Value: startLoadFileID}, {Column: UploadEndLoadFileIDField, Value: endLoadFileID}}})
}

Expand Down
Loading