-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add objstore support for Swift using thanos.io/objstore #11672
Merged
Merged
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
0a946a2
add TLS support for swift
btaani 88df54d
fix typo
btaani 5f821dd
fix error from unit test
btaani 749f64b
Merge branch 'main' into swift-tls
btaani f5bcc9c
Merge branch 'main' into swift-tls
btaani 06ef15b
Merge branch 'main' into swift-tls
btaani fb562c4
remove condition on transport type
btaani b1b203b
remove duplicate initialization
btaani 7b23e51
modify swift config in object client
btaani d5ad3e7
use bucket_http config in swift bucket client
btaani 6b70293
Merge branch 'main' into swift-tls
btaani 6a266e0
add swift thanos object client implementation
btaani fd98f34
undo docs change
btaani 959aa77
change all objectClient implementations
btaani 05acfd9
add hedged client to swift thanos client
btaani ce0c7e6
add hedged client to swift thanos client
btaani 3659f19
Merge branch 'main' into swift-tls
btaani 7fe24b8
update thanos-io/objstore
btaani 53fd4ee
add TLS support
btaani b02afa8
Merge branch 'main' into swift-tls
btaani 0c6526e
Merge branch 'main' into swift-tls
btaani 5a511b9
Merge branch 'main' into swift-tls
JoaoBraveCoding bc2eeb8
chore: Move Swift client to new ObjectClientAdapter
JoaoBraveCoding e64f09e
docs: fixed docs
JoaoBraveCoding 11970b9
Merge branch 'main' into swift-tls
JoaoBraveCoding 380c0b8
fix: fix tests
JoaoBraveCoding 61c50f3
address review comments
JoaoBraveCoding 776c5c4
make s3 use http config
JoaoBraveCoding 99a0cf3
Merge branch 'main' into swift-tls
JoaoBraveCoding 32f92fe
address review comments and fix merge result
JoaoBraveCoding ad09648
Duplicate Swift config to old client
JoaoBraveCoding bc652bd
Merge branch 'main' into swift-tls
ashwanthgoli 340b953
fix build
ashwanthgoli fb3f953
make format
ashwanthgoli dd26273
ensure cli flags are same as existing ones
ashwanthgoli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
110 changes: 110 additions & 0 deletions
110
pkg/storage/chunk/client/openstack/swift_thanos_object_client.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package openstack | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"strings" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/thanos-io/objstore" | ||
|
||
"github.com/grafana/loki/pkg/storage/bucket" | ||
"github.com/grafana/loki/pkg/storage/chunk/client" | ||
"github.com/grafana/loki/pkg/storage/chunk/client/hedging" | ||
) | ||
|
||
type SwiftThanosObjectClient struct { | ||
client objstore.Bucket | ||
} | ||
|
||
func NewSwiftThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, reg prometheus.Registerer) (*SwiftThanosObjectClient, error) { | ||
// TODO Add Hedging client once we are able to configure HTTP on Swift provider | ||
return newSwiftThanosObjectClient(ctx, cfg, component, logger, reg) | ||
} | ||
|
||
func newSwiftThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, reg prometheus.Registerer) (*SwiftThanosObjectClient, error) { | ||
bucket, err := bucket.NewClient(ctx, cfg, component, logger, reg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &SwiftThanosObjectClient{ | ||
client: bucket, | ||
}, nil | ||
} | ||
|
||
func (s *SwiftThanosObjectClient) Stop() {} | ||
|
||
// ObjectExists checks if a given objectKey exists in the Swift bucket | ||
func (s *SwiftThanosObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { | ||
return s.client.Exists(ctx, objectKey) | ||
} | ||
|
||
// PutObject puts the specified bytes into the configured Swift bucket at the provided key | ||
func (s *SwiftThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { | ||
return s.client.Upload(ctx, objectKey, object) | ||
} | ||
|
||
// GetObject returns a reader and the size for the specified object key from the configured Swift bucket. | ||
func (s *SwiftThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { | ||
reader, err := s.client.Get(ctx, objectKey) | ||
if err != nil { | ||
return nil, 0, err | ||
} | ||
|
||
attr, err := s.client.Attributes(ctx, objectKey) | ||
if err != nil { | ||
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey) | ||
} | ||
|
||
return reader, attr.Size, err | ||
} | ||
|
||
// List objects with given prefix. | ||
func (s *SwiftThanosObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { | ||
var storageObjects []client.StorageObject | ||
var commonPrefixes []client.StorageCommonPrefix | ||
var iterParams []objstore.IterOption | ||
|
||
// If delimiter is empty we want to list all files | ||
if delimiter == "" { | ||
iterParams = append(iterParams, objstore.WithRecursiveIter) | ||
} | ||
|
||
s.client.Iter(ctx, prefix, func(objectKey string) error { | ||
// CommonPrefixes are keys that have the prefix and have the delimiter | ||
// as a suffix | ||
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) { | ||
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey)) | ||
return nil | ||
} | ||
attr, err := s.client.Attributes(ctx, objectKey) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to get attributes for %s", objectKey) | ||
} | ||
|
||
storageObjects = append(storageObjects, client.StorageObject{ | ||
Key: objectKey, | ||
ModifiedAt: attr.LastModified, | ||
}) | ||
|
||
return nil | ||
|
||
}, iterParams...) | ||
|
||
return storageObjects, commonPrefixes, nil | ||
} | ||
|
||
// DeleteObject deletes the specified object key from the configured Swift bucket. | ||
func (s *SwiftThanosObjectClient) DeleteObject(ctx context.Context, objectKey string) error { | ||
return s.client.Delete(ctx, objectKey) | ||
} | ||
|
||
// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. | ||
func (s *SwiftThanosObjectClient) IsObjectNotFoundErr(err error) bool { | ||
return s.client.IsObjNotFoundErr(err) | ||
} | ||
|
||
// IsRetryableErr returns true if the request failed due to some retryable server-side scenario | ||
func (s *SwiftThanosObjectClient) IsRetryableErr(err error) bool { return false } | ||
122 changes: 122 additions & 0 deletions
122
pkg/storage/chunk/client/openstack/swift_thanos_object_client_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package openstack | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"sort" | ||
"testing" | ||
|
||
"github.com/grafana/loki/pkg/storage/bucket/filesystem" | ||
"github.com/grafana/loki/pkg/storage/chunk/client" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSwiftThanosObjStore_List(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
prefix string | ||
delimiter string | ||
storageObjKeys []string | ||
storageCommonPref []client.StorageCommonPrefix | ||
wantErr error | ||
}{ | ||
{ | ||
"list_top_level_only", | ||
"", | ||
"/", | ||
[]string{"top-level-file-1", "top-level-file-2"}, | ||
[]client.StorageCommonPrefix{"dir-1/", "dir-2/", "depply/"}, | ||
nil, | ||
}, | ||
{ | ||
"list_all_dir_1", | ||
"dir-1", | ||
"", | ||
[]string{"dir-1/file-1", "dir-1/file-2"}, | ||
nil, | ||
nil, | ||
}, | ||
{ | ||
"list_recursive", | ||
"", | ||
"", | ||
[]string{ | ||
"top-level-file-1", | ||
"top-level-file-2", | ||
"dir-1/file-1", | ||
"dir-1/file-2", | ||
"dir-2/file-3", | ||
"dir-2/file-4", | ||
"dir-2/file-5", | ||
"depply/nested/folder/a", | ||
"depply/nested/folder/b", | ||
"depply/nested/folder/c", | ||
}, | ||
nil, | ||
nil, | ||
}, | ||
{ | ||
"unknown_prefix", | ||
"test", | ||
"", | ||
[]string{}, | ||
nil, | ||
nil, | ||
}, | ||
{ | ||
"only_storage_common_prefix", | ||
"depply/", | ||
"/", | ||
[]string{}, | ||
[]client.StorageCommonPrefix{ | ||
"depply/nested/", | ||
}, | ||
nil, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
config := filesystem.Config{ | ||
Directory: t.TempDir(), | ||
} | ||
newBucket, err := filesystem.NewBucketClient(config) | ||
require.NoError(t, err) | ||
|
||
buff := bytes.NewBufferString("foo") | ||
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-1", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-2", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-1", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-2", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-3", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-4", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-5", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/a", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff)) | ||
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff)) | ||
|
||
openstackClient := &SwiftThanosObjectClient{} | ||
openstackClient.client = newBucket | ||
|
||
storageObj, storageCommonPref, err := openstackClient.List(context.Background(), tt.prefix, tt.delimiter) | ||
if tt.wantErr != nil { | ||
require.Equal(t, tt.wantErr.Error(), err.Error()) | ||
continue | ||
} | ||
|
||
keys := []string{} | ||
for _, key := range storageObj { | ||
keys = append(keys, key.Key) | ||
} | ||
|
||
sort.Slice(tt.storageObjKeys, func(i, j int) bool { | ||
return tt.storageObjKeys[i] < tt.storageObjKeys[j] | ||
}) | ||
sort.Slice(tt.storageCommonPref, func(i, j int) bool { | ||
return tt.storageCommonPref[i] < tt.storageCommonPref[j] | ||
}) | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, tt.storageObjKeys, keys) | ||
require.Equal(t, tt.storageCommonPref, storageCommonPref) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you edit this file directly? Or is this change automatically generated? The configuration reference should be updated in docs/sources/configure/index.template.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did so directly.. thanks for pointing this out!