diff --git a/warehouse/identities.go b/warehouse/identities.go index a97cf03061..a2826eea2f 100644 --- a/warehouse/identities.go +++ b/warehouse/identities.go @@ -427,6 +427,7 @@ func (wh *HandleT) populateHistoricIdentities(warehouse warehouseutils.Warehouse dbHandle: wh.dbHandle, pgNotifier: &wh.notifier, destinationValidator: validations.NewDestinationValidator(), + stats: stats.Default, } tableUploadsCreated := areTableUploadsCreated(job.upload.ID) diff --git a/warehouse/slave.go b/warehouse/slave.go index 2c5b35498f..5f78687baa 100644 --- a/warehouse/slave.go +++ b/warehouse/slave.go @@ -14,6 +14,8 @@ import ( "strings" "time" + "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/config" @@ -48,6 +50,7 @@ type JobRunT struct { tableEventCountMap map[string]int stagingFileReader *gzip.Reader whIdentifier string + stats stats.Stats } func (jobRun *JobRunT) setStagingFileReader() (reader *gzip.Reader, endOfFile bool) { @@ -387,6 +390,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l jobRun := JobRunT{ job: job, whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID), + stats: stats.Default, } defer jobRun.counterStat("staging_files_processed", tag{name: "worker_id", value: strconv.Itoa(workerIndex)}).Count(1) @@ -455,7 +459,7 @@ func processStagingFile(job Payload, workerIndex int) (loadFileUploadOutputs []l tableName := batchRouterEvent.Metadata.Table columnData := batchRouterEvent.Data - if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountThresholds[warehouseutils.S3_DATALAKE] { + if job.DestinationType == warehouseutils.S3_DATALAKE && len(sortedTableColumnMap[tableName]) > columnCountLimitMap[warehouseutils.S3_DATALAKE] { pkgLogger.Errorf("[WH]: Huge staging file columns : columns in upload schema: %v for StagingFileID: %v", len(sortedTableColumnMap[tableName]), job.StagingFileID) return nil, fmt.Errorf("staging file schema limit exceeded for stagingFileID: %d, actualCount: %d", job.StagingFileID, len(sortedTableColumnMap[tableName])) } diff --git a/warehouse/stats.go b/warehouse/stats.go index cd412a128f..6bd17837fe 100644 --- a/warehouse/stats.go +++ b/warehouse/stats.go @@ -44,7 +44,7 @@ func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measuremen for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.TimerType, tags) + return job.stats.NewTaggedStat(name, stats.TimerType, tags) } func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurement { @@ -59,7 +59,7 @@ func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurem for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.CountType, tags) + return job.stats.NewTaggedStat(name, stats.CountType, tags) } func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measurement { @@ -75,7 +75,7 @@ func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measuremen for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.GaugeType, tags) + return job.stats.NewTaggedStat(name, stats.GaugeType, tags) } func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measurement { @@ -90,7 +90,7 @@ func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measuremen for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.TimerType, tags) + return jobRun.stats.NewTaggedStat(name, stats.TimerType, tags) } func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurement { @@ -105,7 +105,7 @@ func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurem for _, extraTag := range extraTags { tags[extraTag.name] = extraTag.value } - return stats.Default.NewTaggedStat(name, stats.CountType, tags) + return jobRun.stats.NewTaggedStat(name, stats.CountType, tags) } func (job *UploadJobT) generateUploadSuccessMetrics() { diff --git a/warehouse/stats_test.go b/warehouse/stats_test.go index f7bf941bb4..8eec36eb9d 100644 --- a/warehouse/stats_test.go +++ b/warehouse/stats_test.go @@ -11,8 +11,6 @@ import ( . "github.com/onsi/gomega" "github.com/ory/dockertest/v3" - backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" - "github.com/rudderlabs/rudder-server/services/stats" "github.com/rudderlabs/rudder-server/testhelper" "github.com/rudderlabs/rudder-server/testhelper/destination" "github.com/rudderlabs/rudder-server/utils/logger" @@ -22,17 +20,11 @@ import ( var _ = Describe("Stats", Ordered, func() { var ( - sourceID = "test-sourceID" - destinationID = "test-destinationID" - destinationType = "test-desinationType" - destinationName = "test-destinationName" - sourceName = "test-sourceName" - statName = "test-statName" - g = GinkgoT() - pgResource *destination.PostgresResource - err error - uploadID = int64(1) - cleanup = &testhelper.Cleanup{} + g = GinkgoT() + pgResource *destination.PostgresResource + err error + uploadID = int64(1) + cleanup = &testhelper.Cleanup{} ) BeforeAll(func() { @@ -59,39 +51,6 @@ var _ = Describe("Stats", Ordered, func() { cleanup.Run() }) - BeforeEach(func() { - defaultStats := stats.Default - - DeferCleanup(func() { - stats.Default = defaultStats - }) - }) - - Describe("Jobs stats", func() { - BeforeEach(func() { - mockStats, mockMeasurement := getMockStats(g) - mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) - mockMeasurement.EXPECT().Count(gomock.Any()).AnyTimes() - - stats.Default = mockStats - }) - - It("Upload status stat", func() { - getUploadStatusStat(statName, warehouseutils.Warehouse{ - WorkspaceID: "workspaceID", - Source: backendconfig.SourceT{ID: sourceID, Name: sourceName}, - Destination: backendconfig.DestinationT{ID: destinationID, Name: destinationName}, - Namespace: "", - Type: destinationType, - Identifier: "", - }) - }) - - It("Persist ssl file error stat", func() { - persistSSLFileErrorStat("workspaceID", destinationType, destinationName, destinationID, sourceName, sourceID, "") - }) - }) - Describe("Generate upload success/aborted metrics", func() { var job *UploadJobT @@ -101,8 +60,6 @@ var _ = Describe("Stats", Ordered, func() { mockMeasurement.EXPECT().Count(4).Times(2) mockMeasurement.EXPECT().Count(1).Times(1) - stats.Default = mockStats - job = &UploadJobT{ upload: &Upload{ ID: uploadID, @@ -112,6 +69,7 @@ var _ = Describe("Stats", Ordered, func() { warehouse: warehouseutils.Warehouse{ Type: "POSTGRES", }, + stats: mockStats, } }) @@ -130,8 +88,6 @@ var _ = Describe("Stats", Ordered, func() { mockMeasurement.EXPECT().Count(4).Times(2) mockMeasurement.EXPECT().Since(gomock.Any()).Times(1) - stats.Default = mockStats - job := &UploadJobT{ upload: &Upload{ WorkspaceID: "workspaceID", @@ -142,6 +98,7 @@ var _ = Describe("Stats", Ordered, func() { warehouse: warehouseutils.Warehouse{ Type: "POSTGRES", }, + stats: mockStats, } job.recordTableLoad("tracks", 4) }) @@ -151,8 +108,6 @@ var _ = Describe("Stats", Ordered, func() { mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) mockMeasurement.EXPECT().SendTiming(gomock.Any()).Times(1) - stats.Default = mockStats - job := &UploadJobT{ upload: &Upload{ ID: uploadID, @@ -163,6 +118,7 @@ var _ = Describe("Stats", Ordered, func() { Type: "POSTGRES", }, dbHandle: pgResource.DB, + stats: mockStats, } err = job.recordLoadFileGenerationTimeStat(1, 4) diff --git a/warehouse/upload.go b/warehouse/upload.go index be10cf5d2e..46020da9fc 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -119,6 +119,7 @@ type UploadJobT struct { hasAllTablesSkipped bool tableUploadStatuses []*TableUploadStatusT destinationValidator validations.DestinationValidator + stats stats.Stats } type UploadColumnT struct { @@ -145,8 +146,8 @@ var ( ) var ( - maxParallelLoads map[string]int - columnCountThresholds map[string]int + maxParallelLoads map[string]int + columnCountLimitMap map[string]int ) func Init() { @@ -167,15 +168,14 @@ func setMaxParallelLoads() { warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.maxParallelLoads", 3), warehouseutils.DELTALAKE: config.GetInt("Warehouse.deltalake.maxParallelLoads", 3), } - columnCountThresholds = map[string]int{ - warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountThreshold", 800), - warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountThreshold", 8000), - warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountThreshold", 800), - warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountThreshold", 800), - warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountThreshold", 1200), - warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountThreshold", 1200), - warehouseutils.SNOWFLAKE: config.GetInt("Warehouse.snowflake.columnCountThreshold", 1600), - warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountThreshold", 10000), + columnCountLimitMap = map[string]int{ + warehouseutils.AZURE_SYNAPSE: config.GetInt("Warehouse.azure_synapse.columnCountLimit", 1024), + warehouseutils.BQ: config.GetInt("Warehouse.bigquery.columnCountLimit", 10000), + warehouseutils.CLICKHOUSE: config.GetInt("Warehouse.clickhouse.columnCountLimit", 1000), + warehouseutils.MSSQL: config.GetInt("Warehouse.mssql.columnCountLimit", 1024), + warehouseutils.POSTGRES: config.GetInt("Warehouse.postgres.columnCountLimit", 1600), + warehouseutils.RS: config.GetInt("Warehouse.redshift.columnCountLimit", 1600), + warehouseutils.S3_DATALAKE: config.GetInt("Warehouse.s3_datalake.columnCountLimit", 10000), } } @@ -204,7 +204,7 @@ func (job *UploadJobT) trackLongRunningUpload() chan struct{} { case <-time.After(longRunningUploadStatThresholdInMin): pkgLogger.Infof("[WH]: Registering stat for long running upload: %d, dest: %s", job.upload.ID, job.warehouse.Identifier) - stats.Default.NewTaggedStat( + job.stats.NewTaggedStat( "warehouse.long_running_upload", stats.CountType, stats.Tags{ @@ -1067,15 +1067,37 @@ func (job *UploadJobT) loadTable(tName string) (alteredSchema bool, err error) { job.recordTableLoad(tName, numEvents) } - if columnThreshold, ok := columnCountThresholds[job.warehouse.Type]; ok { - columnCount := len(job.schemaHandle.schemaInWarehouse[tName]) - if columnCount > columnThreshold { - job.counterStat(`warehouse_load_table_column_count`, tag{name: "tableName", value: strings.ToLower(tName)}).Count(columnCount) - } - } + job.columnCountStat(tName) + return } +// columnCountStat sent the column count for a table to statsd +// skip sending for S3_DATALAKE, GCS_DATALAKE, AZURE_DATALAKE +func (job *UploadJobT) columnCountStat(tableName string) { + var ( + columnCountLimit int + ok bool + ) + + switch job.warehouse.Type { + case warehouseutils.S3_DATALAKE, warehouseutils.GCS_DATALAKE, warehouseutils.AZURE_DATALAKE: + return + } + + if columnCountLimit, ok = columnCountLimitMap[job.warehouse.Type]; !ok { + return + } + + tags := []tag{ + {name: "tableName", value: strings.ToLower(tableName)}, + } + currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName]) + + job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount) + job.counterStat(`warehouse_load_table_column_limit`, tags...).Count(columnCountLimit) +} + func (job *UploadJobT) loadUserTables(loadFilesTableMap map[tableNameT]bool) ([]error, error) { var hasLoadFiles bool userTables := []string{job.identifiesTableName(), job.usersTableName()} @@ -1333,9 +1355,6 @@ func (job *UploadJobT) setUploadStatus(statusOpts UploadStatusOpts) (err error) return err } return job.setUploadColumns(uploadColumnOpts) - // return job.setUploadColumns( - // additionalFields..., - // ) } // SetUploadSchema @@ -1345,9 +1364,6 @@ func (job *UploadJobT) setUploadSchema(consolidatedSchema warehouseutils.SchemaT panic(err) } job.upload.UploadSchema = consolidatedSchema - // return job.setUploadColumns( - // UploadColumnT{Column: UploadSchemaField, Value: marshalledSchema}, - // ) return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumnT{{Column: UploadSchemaField, Value: marshalledSchema}}}) } @@ -1365,10 +1381,6 @@ func (job *UploadJobT) setLoadFileIDs(startLoadFileID, endLoadFileID int64) erro job.upload.StartLoadFileID = startLoadFileID job.upload.EndLoadFileID = endLoadFileID - // return job.setUploadColumns( - // UploadColumnT{Column: UploadStartLoadFileIDField, Value: startLoadFileID}, - // UploadColumnT{Column: UploadEndLoadFileIDField, Value: endLoadFileID}, - // ) return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumnT{{Column: UploadStartLoadFileIDField, Value: startLoadFileID}, {Column: UploadEndLoadFileIDField, Value: endLoadFileID}}}) } diff --git a/warehouse/upload_test.go b/warehouse/upload_test.go index 032c4aa862..46cb683df5 100644 --- a/warehouse/upload_test.go +++ b/warehouse/upload_test.go @@ -9,6 +9,8 @@ import ( "testing" "github.com/rudderlabs/rudder-server/services/stats" + "github.com/rudderlabs/rudder-server/services/stats/memstats" + "github.com/stretchr/testify/require" "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" @@ -76,6 +78,113 @@ func TestExtractUploadErrorsByState(t *testing.T) { } } +func TestColumnCountStat(t *testing.T) { + Init() + Init4() + + var ( + workspaceID = "test-workspaceID" + destinationID = "test-desinationID" + destinationName = "test-desinationName" + sourceID = "test-sourceID" + sourceName = "test-sourceName" + tableName = "test-table" + ) + + inputs := []struct { + name string + columnCountLimit int + destinationType string + statExpected bool + }{ + { + name: "Datalakes destination", + destinationType: warehouseutils.S3_DATALAKE, + columnCountLimit: 1, + }, + { + name: "Unknown destination", + destinationType: "unknown-destination", + }, + { + name: "Greater than threshold", + destinationType: "test-destination", + columnCountLimit: 1, + statExpected: true, + }, + { + name: "Lesser than threshold", + destinationType: "test-destination", + columnCountLimit: 10, + statExpected: true, + }, + } + + store := memstats.New() + + for _, tc := range inputs { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + columnCountLimitMap = map[string]int{ + "test-destination": tc.columnCountLimit, + } + + j := UploadJobT{ + upload: &Upload{ + WorkspaceID: workspaceID, + DestinationID: destinationID, + SourceID: sourceID, + }, + warehouse: warehouseutils.Warehouse{ + Type: tc.destinationType, + Destination: backendconfig.DestinationT{ + ID: destinationID, + Name: destinationName, + }, + Source: backendconfig.SourceT{ + ID: sourceID, + Name: sourceName, + }, + }, + stats: store, + schemaHandle: &SchemaHandleT{ + schemaInWarehouse: warehouseutils.SchemaT{ + tableName: map[string]string{ + "test-column-1": "string", + "test-column-2": "string", + "test-column-3": "string", + }, + }, + }, + } + + tags := stats.Tags{ + "module": moduleName, + "destType": tc.destinationType, + "warehouseID": j.warehouseID(), + "workspaceId": workspaceID, + "destID": destinationID, + "sourceID": sourceID, + "tableName": tableName, + } + + j.columnCountStat(tableName) + + m1 := store.Get("warehouse_load_table_column_count", tags) + m2 := store.Get("warehouse_load_table_column_limit", tags) + + if tc.statExpected { + require.EqualValues(t, m1.LastValue(), len(j.schemaHandle.schemaInWarehouse[tableName])) + require.EqualValues(t, m2.LastValue(), tc.columnCountLimit) + } else { + require.Nil(t, m1) + require.Nil(t, m2) + } + }) + } +} + var _ = Describe("Upload", Ordered, func() { var ( sourceID = "test-sourceID" @@ -208,14 +317,6 @@ var _ = Describe("Upload", Ordered, func() { }) Describe("Staging files and load files events match", func() { - BeforeEach(func() { - defaultStats := stats.Default - - DeferCleanup(func() { - stats.Default = defaultStats - }) - }) - When("Matched", func() { It("Should not send stats", func() { job.matchRowsInStagingAndLoadFiles() @@ -228,8 +329,7 @@ var _ = Describe("Upload", Ordered, func() { mockStats.EXPECT().NewTaggedStat(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(mockMeasurement) mockMeasurement.EXPECT().Gauge(gomock.Any()).Times(1) - stats.Default = mockStats - + job.stats = mockStats job.stagingFileIDs = []int64{1, 2} job.matchRowsInStagingAndLoadFiles() }) diff --git a/warehouse/warehouse.go b/warehouse/warehouse.go index 439b4e83db..66653acb2d 100644 --- a/warehouse/warehouse.go +++ b/warehouse/warehouse.go @@ -923,6 +923,7 @@ func (wh *HandleT) getUploadsToProcess(ctx context.Context, availableWorkers int uploadJob := UploadJobT{ upload: &upload, dbHandle: wh.dbHandle, + stats: stats.Default, } err := fmt.Errorf("unable to find source : %s or destination : %s, both or the connection between them", upload.SourceID, upload.DestinationID) _, _ = uploadJob.setUploadError(err, model.Aborted) @@ -965,6 +966,7 @@ func (wh *HandleT) getUploadsToProcess(ctx context.Context, availableWorkers int dbHandle: wh.dbHandle, pgNotifier: &wh.notifier, destinationValidator: validations.NewDestinationValidator(), + stats: stats.Default, } uploadJobs = append(uploadJobs, &uploadJob)