Skip to content

Commit

Permalink
Only switch back to Active state if partition is Inactive.
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany committed Feb 15, 2024
1 parent 1ae1641 commit 48e3174
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
30 changes: 20 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3359,7 +3359,7 @@ func (i *Ingester) unsetPrepareShutdown() {
// INACTIVE state happened.
//
// - DELETE
// Sets partition back to ACTIVE state.
// Sets partition back from INACTIVE to ACTIVE state.
func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) {
logger := log.With(i.logger, "partition", i.ingestPartitionID)

Expand Down Expand Up @@ -3401,17 +3401,27 @@ func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *ht
}

case http.MethodDelete:
// We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency
// in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried.
// Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design.
// We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than
// "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer
// than "lookback period" ago, it looks to be an edge case not worth to address.
if err := i.ingestPartitionLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to active", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
state, _, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// If partition is inactive, make it active. We ignore other states Active and especially Pending.
if state == ring.PartitionInactive {
// We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency
// in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried.
// Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design.
// We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than
// "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer
// than "lookback period" ago, it looks to be an edge case not worth to address.
if err := i.ingestPartitionLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to active", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

state, stateTimestamp, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context())
Expand Down
25 changes: 25 additions & 0 deletions pkg/ingester/ingester_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,31 @@ func TestIngester_PreparePartitionDownscaleHandler(t *testing.T) {
return slices.Equal(watcher.PartitionRing().PendingPartitionIDs(), []int32{0})
}, time.Second, 10*time.Millisecond)
})

t.Run("DELETE is ignored if the partition is in PENDING state", func(t *testing.T) {
t.Parallel()

// To keep the partition in PENDING state we set a minimum number of owners
// higher than the actual number of ingesters we're going to run.
cfg := defaultIngesterTestConfig(t)
cfg.IngesterPartitionRing.MinOwnersCount = 2

ingester, watcher := setup(t, cfg)

// Pre-condition: the partition is PENDING.
require.Eventually(t, func() bool {
return slices.Equal(watcher.PartitionRing().PendingPartitionIDs(), []int32{0})
}, time.Second, 10*time.Millisecond)

res := httptest.NewRecorder()
ingester.PreparePartitionDownscaleHandler(res, httptest.NewRequest(http.MethodDelete, "/ingester/prepare-partition-downscale", nil))
require.Equal(t, http.StatusOK, res.Code)

// We expect the partition to be in PENDING state.
require.Eventually(t, func() bool {
return slices.Equal(watcher.PartitionRing().PendingPartitionIDs(), []int32{0})
}, time.Second, 10*time.Millisecond)
})
}

func TestIngester_ShouldNotCreatePartitionIfThereIsShutdownMarker(t *testing.T) {
Expand Down

0 comments on commit 48e3174

Please sign in to comment.