Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-2.6.x] [k102] Backport Remove whole stream deletion mode (#6435) #6497

Merged
merged 1 commit into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [6435](https://github.com/grafana/loki/pull/6435) **MichelHollands**: Remove the `whole-stream-deletion` mode.
* [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling.
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
Expand Down
11 changes: 3 additions & 8 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2093,23 +2093,18 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]

# Which deletion mode to use. Supported values are: disabled,
# whole-stream-deletion, filter-only, filter-and-delete
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]

# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.
# CLI flag: -boltdb.shipper.compactor.max-compaction-parallelism
[max_compaction_parallelism: <int> | default = 1]

# Deletion mode.
# Can be one of "disabled", "whole-stream-deletion", "filter-only", or "filter-and-delete".
# When set to the default value of "whole-stream-deletion", and if
# Can be one of "disabled", "filter-only", or "filter-and-delete".
# When set to "filter-only" or "filter-and-delete", and if
# retention_enabled is true, then the log entry deletion API endpoints are available.
# CLI flag: -boltdb.shipper.compactor.deletion-mode
[deletion_mode: <string> | default = "whole-stream-deletion"]
[deletion_mode: <string> | default = "disabled"]

# The hash ring configuration used by compactors to elect a single instance for running compactions
# The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring
Expand Down
4 changes: 1 addition & 3 deletions docs/sources/operations/storage/logs-deletion.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ Log entry deletion is supported _only_ for the BoltDB Shipper index store.
Grafana Loki supports the deletion of log entries from a specified stream.
Log entries that fall within a specified time window and match an optional line filter are those that will be deleted.


The Compactor component exposes REST endpoints that process delete requests.
Hitting the endpoint specifies the streams and the time window.
The deletion of the log entries takes place after a configurable cancellation time period expires.
Expand All @@ -18,9 +17,8 @@ Log entry deletion relies on configuration of the custom logs retention workflow

## Configuration

Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `whole-stream-deletion`, `filter-only`, or `filter-and-delete` in the compactor's configuration. See the example in [Retention configuration](../retention#retention-configuration).
Enable log entry deletion by setting `retention_enabled` to true and `deletion_mode` to `filter-only` or `filter-and-delete` in the compactor's configuration.

With `whole-stream-deletion`, all the log entries matching the query given in the delete request are removed.
With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks.
With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks.

Expand Down
2 changes: 2 additions & 0 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL().Host,
"-frontend.default-validity=0s",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
"-common.compactor-address="+tCompactor.HTTPURL().String(),
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL().Host,
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
"-common.compactor-address="+tCompactor.HTTPURL().String(),
)
)

Expand Down
21 changes: 8 additions & 13 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
}

func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
filteringEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -864,16 +864,11 @@ func (t *Loki) initCompactor() (services.Service, error) {

t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)

if t.Cfg.CompactorConfig.RetentionEnabled {
switch t.compactor.DeleteMode() {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
default:
break
}
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode().DeleteEnabled() {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
}

return t.compactor, nil
Expand Down Expand Up @@ -971,12 +966,12 @@ func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
deleteEnabled, err := deletion.DeleteEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
}

if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled {
if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !deleteEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "disabled", fmt.Sprintf("Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)")
}
Expand Down Expand Up @@ -231,12 +231,11 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig config.Schem
return err
}

switch c.deleteMode {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
if c.deleteMode.DeleteEnabled() {
if err := c.initDeletes(r, limits); err != nil {
return err
}
default:
} else {
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}{
{
name: "no delete requests",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
},
},
{
name: "no relevant delete requests",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: "different-user",
Expand All @@ -65,7 +65,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "whole chunk deleted by single request",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -81,7 +81,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "deleted interval out of range",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -97,7 +97,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple delete requests with one deleting the whole chunk",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -119,7 +119,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple delete requests causing multiple holes",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple overlapping requests deleting the whole chunk",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand All @@ -194,7 +194,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
{
name: "multiple non-overlapping requests deleting the whole chunk",
deletionMode: WholeStreamDeletion,
deletionMode: FilterAndDelete,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Expand Down
17 changes: 8 additions & 9 deletions pkg/storage/stores/shipper/compactor/deletion/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ var (
)

const (
Disabled Mode = iota
WholeStreamDeletion // The existing log deletion that removes whole streams.
Disabled Mode = iota
FilterOnly
FilterAndDelete
)
Expand All @@ -21,8 +20,6 @@ func (m Mode) String() string {
switch m {
case Disabled:
return "disabled"
case WholeStreamDeletion:
return "whole-stream-deletion"
case FilterOnly:
return "filter-only"
case FilterAndDelete:
Expand All @@ -31,16 +28,18 @@ func (m Mode) String() string {
return "unknown"
}

func (m Mode) DeleteEnabled() bool {
return m == FilterOnly || m == FilterAndDelete
}

func AllModes() []string {
return []string{Disabled.String(), WholeStreamDeletion.String(), FilterOnly.String(), FilterAndDelete.String()}
return []string{Disabled.String(), FilterOnly.String(), FilterAndDelete.String()}
}

func ParseMode(in string) (Mode, error) {
switch in {
case "disabled":
return Disabled, nil
case "whole-stream-deletion":
return WholeStreamDeletion, nil
case "filter-only":
return FilterOnly, nil
case "filter-and-delete":
Expand All @@ -49,11 +48,11 @@ func ParseMode(in string) (Mode, error) {
return 0, errUnknownMode
}

func FilteringEnabled(in string) (bool, error) {
func DeleteEnabled(in string) (bool, error) {
deleteMode, err := ParseMode(in)
if err != nil {
return false, err
}

return deleteMode == FilterOnly || deleteMode == FilterAndDelete, nil
return deleteMode.DeleteEnabled(), nil
}
24 changes: 19 additions & 5 deletions pkg/storage/stores/shipper/compactor/deletion/mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@ import (

func TestAllModes(t *testing.T) {
modes := AllModes()
require.ElementsMatch(t, []string{"disabled", "whole-stream-deletion", "filter-only", "filter-and-delete"}, modes)
require.ElementsMatch(t, []string{"disabled", "filter-only", "filter-and-delete"}, modes)
}

func TestParseMode(t *testing.T) {
mode, err := ParseMode("disabled")
require.NoError(t, err)
require.Equal(t, Disabled, mode)

mode, err = ParseMode("whole-stream-deletion")
require.NoError(t, err)
require.Equal(t, WholeStreamDeletion, mode)

mode, err = ParseMode("filter-only")
require.NoError(t, err)
require.Equal(t, FilterOnly, mode)
Expand All @@ -31,3 +27,21 @@ func TestParseMode(t *testing.T) {
_, err = ParseMode("something-else")
require.ErrorIs(t, errUnknownMode, err)
}

func TestDeleteEnabled(t *testing.T) {
enabled, err := DeleteEnabled("disabled")
require.NoError(t, err)
require.False(t, enabled)

enabled, err = DeleteEnabled("filter-only")
require.NoError(t, err)
require.True(t, enabled)

enabled, err = DeleteEnabled("filter-and-delete")
require.NoError(t, err)
require.True(t, enabled)

enabled, err = DeleteEnabled("some other value")
require.Error(t, err)
require.False(t, enabled)
}