Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add objstore support for Swift using thanos.io/objstore #11672

Merged
merged 35 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0a946a2
add TLS support for swift
btaani Jan 12, 2024
88df54d
fix typo
btaani Jan 12, 2024
5f821dd
fix error from unit test
btaani Jan 12, 2024
749f64b
Merge branch 'main' into swift-tls
btaani Jan 12, 2024
f5bcc9c
Merge branch 'main' into swift-tls
btaani Jan 16, 2024
06ef15b
Merge branch 'main' into swift-tls
btaani Jan 18, 2024
fb562c4
remove condition on transport type
btaani Jan 18, 2024
b1b203b
remove duplicate initialization
btaani Jan 18, 2024
7b23e51
modify swift config in object client
btaani Feb 5, 2024
d5ad3e7
use bucket_http config in swift bucket client
btaani Feb 5, 2024
6b70293
Merge branch 'main' into swift-tls
btaani Feb 5, 2024
6a266e0
add swift thanos object client implementation
btaani Feb 9, 2024
fd98f34
undo docs change
btaani Feb 13, 2024
959aa77
change all objectClient implementations
btaani Feb 13, 2024
05acfd9
add hedged client to swift thanos client
btaani Feb 13, 2024
ce0c7e6
add hedged client to swift thanos client
btaani Feb 13, 2024
3659f19
Merge branch 'main' into swift-tls
btaani Feb 14, 2024
7fe24b8
update thanos-io/objstore
btaani Feb 14, 2024
53fd4ee
add TLS support
btaani Feb 14, 2024
b02afa8
Merge branch 'main' into swift-tls
btaani Feb 15, 2024
0c6526e
Merge branch 'main' into swift-tls
btaani Mar 4, 2024
5a511b9
Merge branch 'main' into swift-tls
JoaoBraveCoding Oct 30, 2024
bc2eeb8
chore: Move Swift client to new ObjectClientAdapter
JoaoBraveCoding Oct 30, 2024
e64f09e
docs: fixed docs
JoaoBraveCoding Oct 30, 2024
11970b9
Merge branch 'main' into swift-tls
JoaoBraveCoding Oct 30, 2024
380c0b8
fix: fix tests
JoaoBraveCoding Oct 30, 2024
61c50f3
address review comments
JoaoBraveCoding Nov 11, 2024
776c5c4
make s3 use http config
JoaoBraveCoding Nov 11, 2024
99a0cf3
Merge branch 'main' into swift-tls
JoaoBraveCoding Nov 15, 2024
32f92fe
address review comments and fix merge result
JoaoBraveCoding Nov 15, 2024
ad09648
Duplicate Swift config to old client
JoaoBraveCoding Nov 15, 2024
bc652bd
Merge branch 'main' into swift-tls
ashwanthgoli Nov 25, 2024
340b953
fix build
ashwanthgoli Nov 25, 2024
fb3f953
make format
ashwanthgoli Nov 25, 2024
dd26273
ensure cli flags are same as existing ones
ashwanthgoli Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5174,6 +5174,16 @@ The `swift_storage_config` block configures the connection to OpenStack Object S
# is received on a request.
# CLI flag: -<prefix>.swift.request-timeout
[request_timeout: <duration> | default = 5s]

# Set to false to skip verifying the certificate chain and hostname.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you edit this file directly? Or is this change automatically generated? The configuration reference should be updated in docs/sources/configure/index.template.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did so directly.. thanks for pointing this out!

# Set to true to skip verifying the certificate chain and hostname.
# CLI flag: -<prefix>.swift.insecure-skip-verify
[insecure_skip_verify: <boolean> | default = false]

# Path to the trusted CA file that signed the SSL certificate of the swift
# endpoint.
# CLI flag: -<prefix>.swift.ca-file
[ca_file: <string> | default = ""]
```

### cos_storage_config
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/bucket/swift/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ package swift

import (
"flag"
"net/http"
"time"

bucket_http "github.com/grafana/loki/pkg/storage/bucket/http"
)

// HTTPConfig stores the http.Transport configuration for the swift minio client.
type HTTPConfig struct {
bucket_http.Config `yaml:",inline"`
Transport http.RoundTripper `yaml:"-"`
}

// Config holds the config options for Swift backend
type Config struct {
AuthVersion int `yaml:"auth_version"`
Expand All @@ -26,6 +35,12 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`
HTTP HTTPConfig `yaml:"http"`
}

// RegisterFlagsWithPrefix registers the flags for swift storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix+"swift.", f)
}

// RegisterFlags registers the flags for Swift storage
Expand Down Expand Up @@ -54,6 +69,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.")
f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.")
f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.")
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *Config) Validate() error {
Expand Down
31 changes: 30 additions & 1 deletion pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package openstack
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/ncw/swift"
Expand All @@ -26,6 +29,15 @@ var defaultTransport http.RoundTripper = &http.Transport{
ExpectContinueTimeout: 5 * time.Second,
}

// HTTPConfig stores the http.Transport configuration
type HTTPConfig struct {
Timeout time.Duration `yaml:"timeout"`
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
CAFile string `yaml:"ca_file"`
}

type SwiftObjectClient struct {
conn *swift.Connection
hedgingConn *swift.Connection
Expand All @@ -35,6 +47,7 @@ type SwiftObjectClient struct {
// SwiftConfig is config for the Swift Chunk Client.
type SwiftConfig struct {
bucket_swift.Config `yaml:",inline"`
HTTPConfig HTTPConfig `yaml:"http_config"`
}

// RegisterFlags registers flags.
Expand All @@ -50,6 +63,8 @@ func (cfg *SwiftConfig) Validate() error {
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix, f)
f.DurationVar(&cfg.HTTPConfig.Timeout, prefix+"swift.http.timeout", 0, "Timeout specifies a time limit for requests made by swift Client.")
f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"swift.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the Swift endpoint.")
}

// NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift.
Expand All @@ -76,7 +91,19 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj
}

func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) {
// Create a connection
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify,
}
if cfg.HTTPConfig.CAFile != "" {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
defaultTransport := defaultTransport.(*http.Transport)
defaultTransport.TLSClientConfig = tlsConfig
}
c := &swift.Connection{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Expand All @@ -97,6 +124,8 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
Transport: defaultTransport,
}

// Create a connection

switch {
JoaoBraveCoding marked this conversation as resolved.
Show resolved Hide resolved
case cfg.UserDomainName != "":
c.Domain = cfg.UserDomainName
Expand Down
110 changes: 110 additions & 0 deletions pkg/storage/chunk/client/openstack/swift_thanos_object_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package openstack

import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/pkg/storage/bucket"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
)

type SwiftThanosObjectClient struct {
client objstore.Bucket
}

func NewSwiftThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, reg prometheus.Registerer) (*SwiftThanosObjectClient, error) {

Check warning on line 22 in pkg/storage/chunk/client/openstack/swift_thanos_object_client.go

View workflow job for this annotation

GitHub Actions / checks

unused-parameter: parameter 'hedgingCfg' seems to be unused, consider removing or renaming it as _ (revive)
// TODO Add Hedging client once we are able to configure HTTP on Swift provider
return newSwiftThanosObjectClient(ctx, cfg, component, logger, reg)
}

func newSwiftThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, reg prometheus.Registerer) (*SwiftThanosObjectClient, error) {
bucket, err := bucket.NewClient(ctx, cfg, component, logger, reg)
if err != nil {
return nil, err
}
return &SwiftThanosObjectClient{
client: bucket,
}, nil
}

func (s *SwiftThanosObjectClient) Stop() {}

// ObjectExists checks if a given objectKey exists in the Swift bucket
func (s *SwiftThanosObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return s.client.Exists(ctx, objectKey)
}

// PutObject puts the specified bytes into the configured Swift bucket at the provided key
func (s *SwiftThanosObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return s.client.Upload(ctx, objectKey, object)
}

// GetObject returns a reader and the size for the specified object key from the configured Swift bucket.
func (s *SwiftThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := s.client.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

attr, err := s.client.Attributes(ctx, objectKey)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

return reader, attr.Size, err
}

// List objects with given prefix.
func (s *SwiftThanosObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var iterParams []objstore.IterOption

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
}

s.client.Iter(ctx, prefix, func(objectKey string) error {

Check failure on line 75 in pkg/storage/chunk/client/openstack/swift_thanos_object_client.go

View workflow job for this annotation

GitHub Actions / checks

Error return value of `s.client.Iter` is not checked (errcheck)
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}
attr, err := s.client.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
})

return nil

}, iterParams...)

return storageObjects, commonPrefixes, nil
}

// DeleteObject deletes the specified object key from the configured Swift bucket.
func (s *SwiftThanosObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return s.client.Delete(ctx, objectKey)
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (s *SwiftThanosObjectClient) IsObjectNotFoundErr(err error) bool {
return s.client.IsObjNotFoundErr(err)
}

// IsRetryableErr returns true if the request failed due to some retryable server-side scenario
func (s *SwiftThanosObjectClient) IsRetryableErr(err error) bool { return false }

Check warning on line 110 in pkg/storage/chunk/client/openstack/swift_thanos_object_client.go

View workflow job for this annotation

GitHub Actions / checks

unused-parameter: parameter 'err' seems to be unused, consider removing or renaming it as _ (revive)
122 changes: 122 additions & 0 deletions pkg/storage/chunk/client/openstack/swift_thanos_object_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package openstack

import (
"bytes"
"context"
"sort"
"testing"

"github.com/grafana/loki/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/stretchr/testify/require"
)

func TestSwiftThanosObjStore_List(t *testing.T) {
tests := []struct {
name string
prefix string
delimiter string
storageObjKeys []string
storageCommonPref []client.StorageCommonPrefix
wantErr error
}{
{
"list_top_level_only",
"",
"/",
[]string{"top-level-file-1", "top-level-file-2"},
[]client.StorageCommonPrefix{"dir-1/", "dir-2/", "depply/"},
nil,
},
{
"list_all_dir_1",
"dir-1",
"",
[]string{"dir-1/file-1", "dir-1/file-2"},
nil,
nil,
},
{
"list_recursive",
"",
"",
[]string{
"top-level-file-1",
"top-level-file-2",
"dir-1/file-1",
"dir-1/file-2",
"dir-2/file-3",
"dir-2/file-4",
"dir-2/file-5",
"depply/nested/folder/a",
"depply/nested/folder/b",
"depply/nested/folder/c",
},
nil,
nil,
},
{
"unknown_prefix",
"test",
"",
[]string{},
nil,
nil,
},
{
"only_storage_common_prefix",
"depply/",
"/",
[]string{},
[]client.StorageCommonPrefix{
"depply/nested/",
},
nil,
},
}

for _, tt := range tests {
config := filesystem.Config{
Directory: t.TempDir(),
}
newBucket, err := filesystem.NewBucketClient(config)
require.NoError(t, err)

buff := bytes.NewBufferString("foo")
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-1", buff))
require.NoError(t, newBucket.Upload(context.Background(), "top-level-file-2", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-1", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-1/file-2", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-3", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-4", buff))
require.NoError(t, newBucket.Upload(context.Background(), "dir-2/file-5", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/a", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

openstackClient := &SwiftThanosObjectClient{}
openstackClient.client = newBucket

storageObj, storageCommonPref, err := openstackClient.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
require.Equal(t, tt.wantErr.Error(), err.Error())
continue
}

keys := []string{}
for _, key := range storageObj {
keys = append(keys, key.Key)
}

sort.Slice(tt.storageObjKeys, func(i, j int) bool {
return tt.storageObjKeys[i] < tt.storageObjKeys[j]
})
sort.Slice(tt.storageCommonPref, func(i, j int) bool {
return tt.storageCommonPref[i] < tt.storageCommonPref[j]
})

require.NoError(t, err)
require.Equal(t, tt.storageObjKeys, keys)
require.Equal(t, tt.storageCommonPref, storageCommonPref)
}
}
Loading