diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 351a265100..819a6b3cbe 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -374,17 +374,14 @@ var dbFillGapsCmd = &cobra.Command{ } func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error { + var err error + if reingestForce && parallelWorkers > 1 { return errors.New("--force is incompatible with --parallel-workers > 1") } - horizonSession, err := db.Open("postgres", config.DatabaseURL) - if err != nil { - return fmt.Errorf("cannot open Horizon DB: %v", err) - } ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, - HistorySession: horizonSession, HistoryArchiveURL: config.HistoryArchiveURLs[0], CheckpointFrequency: config.CheckpointFrequency, MaxReingestRetries: int(retries), @@ -400,15 +397,18 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, RoundingSlippageFilter: config.RoundingSlippageFilter, } - if !ingestConfig.EnableCaptiveCore { + if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + + if !config.EnableCaptiveCoreIngestion { if config.StellarCoreDatabaseURL == "" { return fmt.Errorf("flag --%s cannot be empty", horizon.StellarCoreDBURLFlagName) } - coreSession, dbErr := db.Open("postgres", config.StellarCoreDatabaseURL) - if dbErr != nil { - return fmt.Errorf("cannot open Core DB: %v", dbErr) + if ingestConfig.CoreSession, err = db.Open("postgres", config.StellarCoreDatabaseURL); err != nil { + ingestConfig.HistorySession.Close() + return fmt.Errorf("cannot open Core DB: %v", err) } - ingestConfig.CoreSession = coreSession } if parallelWorkers > 1 { @@ -427,6 +427,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, if systemErr != nil { return systemErr } + defer system.Shutdown() err = system.ReingestRange(ledgerRanges, reingestForce) if err != nil { @@ -479,6 +480,7 @@ func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) { if err != nil { return nil, err } + defer horizonSession.Close() q := &history.Q{horizonSession} return q.GetLedgerGaps(context.Background()) } @@ -488,6 +490,7 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history if err != nil { return nil, err } + defer horizonSession.Close() q := &history.Q{horizonSession} return q.GetLedgerGapsInRange(context.Background(), start, end) } diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 2fdd76b3f4..0e05b8dc26 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -268,6 +268,7 @@ type IngestionQ interface { BeginTx(*sql.TxOptions) error Commit() error CloneIngestionQ() IngestionQ + Close() error Rollback() error GetTx() *sqlx.Tx GetIngestVersion(context.Context) (int, error) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 26f66bddb0..02956f74e6 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -697,6 +697,7 @@ func (s *system) Shutdown() { s.cancel() // wait for ingestion state machine to terminate s.wg.Wait() + s.historyQ.Close() if err := s.ledgerBackend.Close(); err != nil { log.WithError(err).Info("could not close ledger backend") } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index bd56ef5bd7..ae54acc0f1 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -265,6 +265,11 @@ func (m *mockDBQ) Commit() error { return args.Error(0) } +func (m *mockDBQ) Close() error { + args := m.Called() + return args.Error(0) +} + func (m *mockDBQ) Rollback() error { args := m.Called() return args.Error(0) diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 547098ed0d..b3c163689d 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -47,6 +47,16 @@ func newParallelSystems(config Config, workerCount uint, systemFactory func(Conf }, nil } +func (ps *ParallelSystems) Shutdown() { + log.Info("Shutting down parallel ingestion system...") + if ps.config.HistorySession != nil { + ps.config.HistorySession.Close() + } + if ps.config.CoreSession != nil { + ps.config.CoreSession.Close() + } +} + func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan history.LedgerRange) rangeError { for { @@ -128,6 +138,9 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat // the user needs to start again to prevent the gaps. lowestRangeErr *rangeError ) + + defer ps.Shutdown() + if err := validateRanges(ledgerRanges); err != nil { return err } diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 376af8e607..9da5ed59ad 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -444,21 +444,7 @@ func TestReingestDB(t *testing.T) { itest, reachedLedger := initializeDBIntegrationTest(t) tt := assert.New(t) - // Create a fresh Horizon database - newDB := dbtest.Postgres(t) - // TODO: Unfortunately Horizon's ingestion System leaves open sessions behind,leading to - // a "database is being accessed by other users" error when trying to drop it - // defer newDB.Close() - freshHorizonPostgresURL := newDB.DSN horizonConfig := itest.GetHorizonConfig() - horizonConfig.DatabaseURL = freshHorizonPostgresURL - // Initialize the DB schema - dbConn, err := db.Open("postgres", freshHorizonPostgresURL) - tt.NoError(err) - defer dbConn.Close() - _, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0) - tt.NoError(err) - t.Run("validate parallel range", func(t *testing.T) { horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db", @@ -559,7 +545,13 @@ func TestFillGaps(t *testing.T) { horizonConfig.DatabaseURL = freshHorizonPostgresURL // Initialize the DB schema dbConn, err := db.Open("postgres", freshHorizonPostgresURL) - defer dbConn.Close() + tt.NoError(err) + historyQ := history.Q{dbConn} + defer func() { + historyQ.Close() + newDB.Close() + }() + _, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0) tt.NoError(err) @@ -603,7 +595,6 @@ func TestFillGaps(t *testing.T) { // subprocesses to conflict. itest.StopHorizon() - historyQ := history.Q{dbConn} var oldestLedger, latestLedger int64 tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 0c675ac4a1..92e8740d59 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -278,10 +278,8 @@ func (i *Test) StartHorizon() error { if horizonPostgresURL == "" { postgres := dbtest.Postgres(i.t) i.shutdownCalls = append(i.shutdownCalls, func() { - // FIXME: Unfortunately, Horizon leaves open sessions behind, - // leading to a "database is being accessed by other users" - // error when trying to drop it. - // postgres.Close() + i.StopHorizon() + postgres.Close() }) horizonPostgresURL = postgres.DSN } @@ -490,7 +488,11 @@ func (i *Test) Horizon() *horizon.App { // StopHorizon shuts down the running Horizon process func (i *Test) StopHorizon() { - i.app.CloseDB() + if i.app == nil { + // horizon has already been stopped + return + } + i.app.Close() // Wait for Horizon to shut down completely. diff --git a/support/db/dbtest/db.go b/support/db/dbtest/db.go index bda67c49e8..6ab32b6926 100644 --- a/support/db/dbtest/db.go +++ b/support/db/dbtest/db.go @@ -132,6 +132,9 @@ func Postgres(t testing.TB) *DB { result.DSN = fmt.Sprintf("postgres://%s@localhost/%s?sslmode=disable&timezone=UTC", pgUser, result.dbName) result.closer = func() { + // pg_terminate_backend is a best effort, it does not gaurantee that it can close any lingering connections + // it sends a quit signal to each remaining connection in the db + execStatement(t, pgUser, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"+pq.QuoteIdentifier(result.dbName)+"';") execStatement(t, pgUser, "DROP DATABASE "+pq.QuoteIdentifier(result.dbName)) }