Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: backup support for new multitenant system #2549

Merged
merged 39 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7c98f6b
chore: backup support for new multitenant system
BonapartePC Oct 8, 2022
fa21b13
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 11, 2022
bbfb96d
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 13, 2022
a3cc925
add workers for creating and uploading backup files
BonapartePC Oct 17, 2022
830da1b
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 18, 2022
2f896da
check preferences before upload
BonapartePC Oct 18, 2022
19962be
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 19, 2022
105ddd6
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 19, 2022
ce530cf
overwrite storage setting config
BonapartePC Oct 20, 2022
448b8b9
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 21, 2022
b8a95f5
add max retries
BonapartePC Oct 21, 2022
9748693
limit workers
BonapartePC Oct 25, 2022
15e452e
fix comments and deepsource issues
BonapartePC Oct 25, 2022
ae187df
add backup_test.go in services
BonapartePC Oct 26, 2022
c94b8a9
Merge branch 'master' of github.com:rudderlabs/rudder-server into cho…
BonapartePC Oct 27, 2022
c9369f6
resolve conflicts
BonapartePC Oct 27, 2022
94f6657
add workspaceId to file name
BonapartePC Oct 27, 2022
cdb2aba
add mt tests
BonapartePC Oct 27, 2022
45d2e8c
Merge branch 'master' into chore.dataRetention
BonapartePC Oct 27, 2022
367715e
make fmt
BonapartePC Oct 27, 2022
0c33eed
Merge branch 'chore.dataRetention' of github.com:rudderlabs/rudder-se…
BonapartePC Oct 27, 2022
6417324
add multiple file handlers instead of multiple postgres connection wo…
BonapartePC Nov 1, 2022
79e14b7
fix tests
BonapartePC Nov 1, 2022
3b688fb
Merge branch 'master' into chore.dataRetention
BonapartePC Nov 1, 2022
154588a
add backup support for proc_error, rt, batch_rt
BonapartePC Nov 2, 2022
80ae4bc
Merge branch 'chore.dataRetention' of github.com:rudderlabs/rudder-se…
BonapartePC Nov 2, 2022
5b319ea
Merge branch 'master' into chore.dataRetention
BonapartePC Nov 2, 2022
41fc657
peer programming
atzoum Nov 2, 2022
a1b4223
made changes according to review comments
BonapartePC Nov 2, 2022
2707c96
add tests
BonapartePC Nov 3, 2022
d8aa5b9
chore: minor renaming types, variables, functions
atzoum Nov 3, 2022
48a3fb9
chore: simplify stash code
atzoum Nov 3, 2022
db1b1a7
Merge branch 'master' into chore.dataRetention
BonapartePC Nov 3, 2022
9cde50c
fix tests
BonapartePC Nov 3, 2022
ea720ae
add tests for gzwriter
BonapartePC Nov 3, 2022
5fe9bc9
Merge branch 'master' into chore.dataRetention
BonapartePC Nov 3, 2022
ca36259
slight changes in gzwriter tests
BonapartePC Nov 4, 2022
04c269f
Merge branch 'chore.dataRetention' of github.com:rudderlabs/rudder-se…
BonapartePC Nov 4, 2022
774836b
address review comments
BonapartePC Nov 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/multitenant"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -77,6 +78,9 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
prebackupHandlers := []prebackup.Handler{
prebackup.DropSourceIds(transientSources.SourceIdsSupplier()),
}

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
Expand All @@ -90,6 +94,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -98,6 +103,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -106,6 +112,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -114,6 +121,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)

var tenantRouterDB jobsdb.MultiTenantJobsDB
Expand Down Expand Up @@ -150,7 +158,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
return fmt.Errorf("unsupported deployment type: %q", deploymentType)
}

proc := processor.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, rsourcesService)
proc := processor.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, fileUploaderProvider, rsourcesService)
rtFactory := &router.Factory{
Reporting: reportingI,
Multitenant: multitenantStats,
Expand Down Expand Up @@ -223,7 +231,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
var replayDB jobsdb.HandleT
err := replayDB.Setup(
jobsdb.ReadWrite, options.ClearDB, "replay",
true, prebackupHandlers,
true, prebackupHandlers, fileUploaderProvider,
)
if err != nil {
return fmt.Errorf("could not setup replayDB: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
ratelimiter "github.com/rudderlabs/rudder-server/rate-limiter"
"github.com/rudderlabs/rudder-server/services/db"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
Expand Down Expand Up @@ -48,11 +49,14 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.

sourcedebugger.Setup(backendconfig.DefaultBackendConfig)

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gatewayDB.Close()
if err := gatewayDB.Start(); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rudderlabs/rudder-server/services/db"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/multitenant"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -110,6 +111,9 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
prebackupHandlers := []prebackup.Handler{
prebackup.DropSourceIds(transientSources.SourceIdsSupplier()),
}

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
Expand All @@ -121,6 +125,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gwDBForProcessor.Close()
gatewayDB = gwDBForProcessor
Expand All @@ -130,6 +135,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -138,6 +144,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer batchRouterDB.Close()
errDB := jobsdb.NewForReadWrite(
Expand All @@ -146,6 +153,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
var tenantRouterDB jobsdb.MultiTenantJobsDB
var multitenantStats multitenant.MultiTenantI
Expand Down Expand Up @@ -181,7 +189,7 @@ func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app
return fmt.Errorf("unsupported deployment type: %q", deploymentType)
}

p := proc.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, rsourcesService)
p := proc.New(ctx, &options.ClearDB, gwDBForProcessor, routerDB, batchRouterDB, errDB, multitenantStats, reportingI, transientSources, fileUploaderProvider, rsourcesService)

rtFactory := &router.Factory{
Reporting: reportingI,
Expand Down
3 changes: 2 additions & 1 deletion app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/enterprise/reporting"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transientsource"

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestDynamicClusterManager(t *testing.T) {
"batch_rt": &jobsdb.MultiTenantLegacy{HandleT: brtDB},
})

processor := processor.New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mockMTI, &reporting.NOOP{}, transientsource.NewEmptyService(), rsources.NewNoOpService())
processor := processor.New(ctx, &clearDb, gwDB, rtDB, brtDB, errDB, mockMTI, &reporting.NOOP{}, transientsource.NewEmptyService(), fileuploader.NewDefaultProvider(), rsources.NewNoOpService())
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
mockBackendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
Expand Down
39 changes: 39 additions & 0 deletions config/backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,45 @@ type ConfigT struct {
Sources []SourceT `json:"sources"`
Libraries LibrariesT `json:"libraries"`
ConnectionFlags ConnectionFlags `json:"flags"`
Settings Settings `json:"settings"`
}

type Settings struct {
DataRetention DataRetention `json:"dataRetention"`
}

type DataRetention struct {
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
}

type StorageBucket struct {
Type string `json:"type"`
Config map[string]interface{}
}

type StoragePreferences struct {
ProcErrors bool `json:"procErrors"`
GatewayDumps bool `json:"gatewayDumps"`
ProcErrorDumps bool `json:"procErrorDumps"`
RouterDumps bool `json:"routerDumps"`
BatchRouterDumps bool `json:"batchRouterDumps"`
}

func (sp StoragePreferences) Backup(tableprefix string) bool {
switch tableprefix {
case "gw":
return sp.GatewayDumps
case "rt":
return sp.RouterDumps
case "batch_rt":
return sp.BatchRouterDumps
case "proc_error":
return sp.ProcErrorDumps
default:
return false
}
}

type ConnectionFlags struct {
Expand Down
6 changes: 6 additions & 0 deletions enterprise/reporting/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ func TestFeatureSetup(t *testing.T) {
require.Equal(t, instanceA, instanceB)
require.Equal(t, instanceB, instanceC)
require.Equal(t, instanceC, instanceD)

f = &Factory{}
instanceE := f.Setup(&backendconfig.NOOP{})
instanceF := f.GetReportingInstance()
require.Equal(t, instanceE, instanceF)
require.NotEqual(t, instanceE, backendconfig.NOOP{})
}
Loading