-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathhelpers.go
175 lines (150 loc) · 5.47 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package azure
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"regexp"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
blob "github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"
var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`)
func init() {
// Disable `ForceLog` in Azure storage module
// As the time of this patch, the logging function in the storage module isn't correctly
// detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
// https://github.com/Azure/azure-storage-blob-go/issues/214
//
// This needs to be done at startup because the underlying variable is not thread safe.
// https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
pipeline.SetForceLogEnabled(false)
}
func getAzureStorageCredentials(logger log.Logger, conf Config) (blob.Credential, error) {
if conf.MSIResource != "" {
msiConfig := auth.NewMSIConfig()
msiConfig.Resource = conf.MSIResource
spt, err := msiConfig.ServicePrincipalToken()
if err != nil {
return nil, err
}
if err := spt.Refresh(); err != nil {
return nil, err
}
return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
err := spt.Refresh()
if err != nil {
level.Error(logger).Log("msg", "could not refresh MSI token", "err", err)
// Retry later as the error can be related to API throttling
return 30 * time.Second
}
tc.SetToken(spt.Token().AccessToken)
return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
}), nil
}
credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
if err != nil {
return nil, err
}
return credential, nil
}
func getContainerURL(ctx context.Context, logger log.Logger, conf Config) (blob.ContainerURL, error) {
credentials, err := getAzureStorageCredentials(logger, conf)
if err != nil {
return blob.ContainerURL{}, err
}
retryOptions := blob.RetryOptions{
MaxTries: conf.PipelineConfig.MaxTries,
TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
}
if deadline, ok := ctx.Deadline(); ok {
retryOptions.TryTimeout = time.Until(deadline)
}
p := blob.NewPipeline(credentials, blob.PipelineOptions{
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Thanos"},
RequestLog: blob.RequestLogOptions{
// Log a warning if an operation takes longer than the specified duration.
// (-1=no logging; 0=default 3s threshold)
LogWarningIfTryOverThreshold: -1,
},
Log: pipeline.LogOptions{
ShouldLog: nil,
},
HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
client := http.Client{
Transport: DefaultTransport(conf),
}
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
}),
})
u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
if err != nil {
return blob.ContainerURL{}, err
}
service := blob.NewServiceURL(*u, p)
return service.NewContainerURL(conf.ContainerName), nil
}
func DefaultTransport(config Config) *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: config.HTTPConfig.MaxIdleConns,
MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
DisableCompression: config.HTTPConfig.DisableCompression,
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
}
}
func getContainer(ctx context.Context, logger log.Logger, conf Config) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, logger, conf)
if err != nil {
return blob.ContainerURL{}, err
}
// Getting container properties to check if it exists or not. Returns error which will be parsed further.
_, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
return c, err
}
func createContainer(ctx context.Context, logger log.Logger, conf Config) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, logger, conf)
if err != nil {
return blob.ContainerURL{}, err
}
_, err = c.Create(
ctx,
blob.Metadata{},
blob.PublicAccessNone)
return c, err
}
func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
return c.NewBlockBlobURL(blobName)
}
func parseError(errorCode string) string {
match := errorCodeRegex.FindStringSubmatch(errorCode)
if len(match) == 2 {
return match[1]
}
return errorCode
}