diff --git a/CHANGELOG.md b/CHANGELOG.md index a895e73e6d41a..2ce591769d90d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [6435](https://github.com/grafana/loki/pull/6435) **MichelHollands**: Remove the `whole-stream-deletion` mode. * [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling. * [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target * [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index e5032981b9297..46750788441b1 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2093,11 +2093,6 @@ compacts index shards to more performant forms. # CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] -# Which deletion mode to use. Supported values are: disabled, -# whole-stream-deletion, filter-only, filter-and-delete -# CLI flag: -boltdb.shipper.compactor.deletion-mode -[deletion_mode: | default = "whole-stream-deletion"] - # Maximum number of tables to compact in parallel. # While increasing this value, please make sure compactor has enough disk space # allocated to be able to store and compact as many tables. @@ -2105,11 +2100,11 @@ compacts index shards to more performant forms. [max_compaction_parallelism: | default = 1] # Deletion mode. -# Can be one of "disabled", "whole-stream-deletion", "filter-only", or "filter-and-delete". -# When set to the default value of "whole-stream-deletion", and if +# Can be one of "disabled", "filter-only", or "filter-and-delete". +# When set to "filter-only" or "filter-and-delete", and if # retention_enabled is true, then the log entry deletion API endpoints are available. # CLI flag: -boltdb.shipper.compactor.deletion-mode -[deletion_mode: | default = "whole-stream-deletion"] +[deletion_mode: | default = "disabled"] # The hash ring configuration used by compactors to elect a single instance for running compactions # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring diff --git a/docs/sources/operations/storage/logs-deletion.md b/docs/sources/operations/storage/logs-deletion.md index acc75a390d500..95675e185d79c 100644 --- a/docs/sources/operations/storage/logs-deletion.md +++ b/docs/sources/operations/storage/logs-deletion.md @@ -9,7 +9,6 @@ Log entry deletion is supported _only_ for the BoltDB Shipper index store. Grafana Loki supports the deletion of log entries from a specified stream. Log entries that fall within a specified time window and match an optional line filter are those that will be deleted. - The Compactor component exposes REST endpoints that process delete requests. Hitting the endpoint specifies the streams and the time window. The deletion of the log entries takes place after a configurable cancellation time period expires. @@ -18,9 +17,8 @@ Log entry deletion relies on configuration of the custom logs retention workflow ## Configuration -Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `whole-stream-deletion`, `filter-only`, or `filter-and-delete` in the compactor's configuration. See the example in [Retention configuration](../retention#retention-configuration). +Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `filter-only` or `filter-and-delete` in the compactor's configuration. -With `whole-stream-deletion`, all the log entries matching the query given in the delete request are removed. With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks. With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks. diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 1949d617ba4e1..6af2f1034b5ec 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -52,12 +52,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) { "-frontend.scheduler-address="+tQueryScheduler.GRPCURL().Host, "-frontend.default-validity=0s", "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host, + "-common.compactor-address="+tCompactor.HTTPURL().String(), ) _ = clu.AddComponent( "querier", "-target=querier", "-querier.scheduler-address="+tQueryScheduler.GRPCURL().Host, "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host, + "-common.compactor-address="+tCompactor.HTTPURL().String(), ) ) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 44abfb3daffd0..47d07eeca6f97 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -568,7 +568,7 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { } func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) { - filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode) + filteringEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode) if err != nil { return nil, err } @@ -864,16 +864,11 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) - if t.Cfg.CompactorConfig.RetentionEnabled { - switch t.compactor.DeleteMode() { - case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler())) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler())) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler())) - t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler())) - default: - break - } + if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode().DeleteEnabled() { + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler())) + t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler())) } return t.compactor, nil @@ -971,12 +966,12 @@ func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) { return deletion.NewNoOpDeleteRequestsStore(), nil } - filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode) + deleteEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode) if err != nil { return nil, err } - if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled { + if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !deleteEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index aa55ba11d7e67..e523acc330252 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -87,7 +87,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") - f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|"))) + f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "disabled", fmt.Sprintf("Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|"))) cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f) f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)") } @@ -231,12 +231,11 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem return err } - switch c.deleteMode { - case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete: + if c.deleteMode.DeleteEnabled() { if err := c.initDeletes(r, limits); err != nil { return err } - default: + } else { c.expirationChecker = newExpirationChecker( retention.NewExpirationChecker(limits), // This is a dummy deletion ExpirationChecker that never expires anything diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go index 2712084210d27..36e9e67a53af6 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go @@ -41,7 +41,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }{ { name: "no delete requests", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, expectedResp: resp{ isExpired: false, nonDeletedIntervals: nil, @@ -49,7 +49,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "no relevant delete requests", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: "different-user", @@ -65,7 +65,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "whole chunk deleted by single request", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -81,7 +81,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "deleted interval out of range", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -97,7 +97,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple delete requests with one deleting the whole chunk", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -119,7 +119,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple delete requests causing multiple holes", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -172,7 +172,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple overlapping requests deleting the whole chunk", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, @@ -194,7 +194,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, { name: "multiple non-overlapping requests deleting the whole chunk", - deletionMode: WholeStreamDeletion, + deletionMode: FilterAndDelete, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode.go b/pkg/storage/stores/shipper/compactor/deletion/mode.go index ca0dd624f8b9a..733ce27fad1ed 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/mode.go +++ b/pkg/storage/stores/shipper/compactor/deletion/mode.go @@ -11,8 +11,7 @@ var ( ) const ( - Disabled Mode = iota - WholeStreamDeletion // The existing log deletion that removes whole streams. + Disabled Mode = iota FilterOnly FilterAndDelete ) @@ -21,8 +20,6 @@ func (m Mode) String() string { switch m { case Disabled: return "disabled" - case WholeStreamDeletion: - return "whole-stream-deletion" case FilterOnly: return "filter-only" case FilterAndDelete: @@ -31,16 +28,18 @@ func (m Mode) String() string { return "unknown" } +func (m Mode) DeleteEnabled() bool { + return m == FilterOnly || m == FilterAndDelete +} + func AllModes() []string { - return []string{Disabled.String(), WholeStreamDeletion.String(), FilterOnly.String(), FilterAndDelete.String()} + return []string{Disabled.String(), FilterOnly.String(), FilterAndDelete.String()} } func ParseMode(in string) (Mode, error) { switch in { case "disabled": return Disabled, nil - case "whole-stream-deletion": - return WholeStreamDeletion, nil case "filter-only": return FilterOnly, nil case "filter-and-delete": @@ -49,11 +48,11 @@ func ParseMode(in string) (Mode, error) { return 0, errUnknownMode } -func FilteringEnabled(in string) (bool, error) { +func DeleteEnabled(in string) (bool, error) { deleteMode, err := ParseMode(in) if err != nil { return false, err } - return deleteMode == FilterOnly || deleteMode == FilterAndDelete, nil + return deleteMode.DeleteEnabled(), nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode_test.go b/pkg/storage/stores/shipper/compactor/deletion/mode_test.go index f8f2fb4899907..de9145d9c284b 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/mode_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/mode_test.go @@ -8,7 +8,7 @@ import ( func TestAllModes(t *testing.T) { modes := AllModes() - require.ElementsMatch(t, []string{"disabled", "whole-stream-deletion", "filter-only", "filter-and-delete"}, modes) + require.ElementsMatch(t, []string{"disabled", "filter-only", "filter-and-delete"}, modes) } func TestParseMode(t *testing.T) { @@ -16,10 +16,6 @@ func TestParseMode(t *testing.T) { require.NoError(t, err) require.Equal(t, Disabled, mode) - mode, err = ParseMode("whole-stream-deletion") - require.NoError(t, err) - require.Equal(t, WholeStreamDeletion, mode) - mode, err = ParseMode("filter-only") require.NoError(t, err) require.Equal(t, FilterOnly, mode) @@ -31,3 +27,21 @@ func TestParseMode(t *testing.T) { _, err = ParseMode("something-else") require.ErrorIs(t, errUnknownMode, err) } + +func TestDeleteEnabled(t *testing.T) { + enabled, err := DeleteEnabled("disabled") + require.NoError(t, err) + require.False(t, enabled) + + enabled, err = DeleteEnabled("filter-only") + require.NoError(t, err) + require.True(t, enabled) + + enabled, err = DeleteEnabled("filter-and-delete") + require.NoError(t, err) + require.True(t, enabled) + + enabled, err = DeleteEnabled("some other value") + require.Error(t, err) + require.False(t, enabled) +}