Skip to content

Commit

Permalink
physical/gcs: use separate client for updating locks (#9424)
Browse files Browse the repository at this point in the history
* physical/gcs: use separate client for updating locks

* Address review comments

Co-authored-by: Calvin Leung Huang <[email protected]>
  • Loading branch information
2 people authored and andaley committed Jul 17, 2020
1 parent 7658898 commit c76e1e3
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
48 changes: 30 additions & 18 deletions physical/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,23 @@ type Backend struct {
// chunkSize is the chunk size to use for requests.
chunkSize int

// client is the underlying API client for talking to gcs.
client *storage.Client
// client is the API client and permitPool is the allowed concurrent uses of
// the client.
client *storage.Client
permitPool *physical.PermitPool

// haEnabled indicates if HA is enabled.
haEnabled bool

// logger and permitPool are internal constructs
logger log.Logger
permitPool *physical.PermitPool
// haClient is the API client. This is managed separately from the main client
// because a flood of requests should not block refreshing the TTLs on the
// lock.
//
// This value will be nil if haEnabled is false.
haClient *storage.Client

// logger is an internal logger.
logger log.Logger
}

// NewBackend constructs a Google Cloud Storage backend with the given
Expand Down Expand Up @@ -115,6 +123,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
chunkSize = chunkSize * 1024

// HA configuration
haClient := (*storage.Client)(nil)
haEnabled := false
haEnabledStr := os.Getenv(envHAEnabled)
if haEnabledStr == "" {
Expand All @@ -127,6 +136,15 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err)
}
}
if haEnabled {
logger.Debug("creating client")
var err error
ctx := context.Background()
haClient, err = storage.NewClient(ctx, option.WithUserAgent(useragent.String()))
if err != nil {
return nil, errwrap.Wrapf("failed to create HA storage client: {{err}}", err)
}
}

// Max parallel
maxParallel, err := extractInt(c["max_parallel"])
Expand All @@ -140,30 +158,24 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
"ha_enabled", haEnabled,
"max_parallel", maxParallel,
)
logger.Debug("creating client")

// Client
opts := []option.ClientOption{option.WithUserAgent(useragent.String())}
if credentialsFile := c["credentials_file"]; credentialsFile != "" {
logger.Warn("specifying credentials_file as an option is " +
"deprecated. Please use the GOOGLE_APPLICATION_CREDENTIALS environment " +
"variable or instance credentials instead.")
opts = append(opts, option.WithCredentialsFile(credentialsFile))
}

logger.Debug("creating client")
ctx := context.Background()
client, err := storage.NewClient(ctx, opts...)
client, err := storage.NewClient(ctx, option.WithUserAgent(useragent.String()))
if err != nil {
return nil, errwrap.Wrapf("failed to create storage client: {{err}}", err)
}

return &Backend{
bucket: bucket,
haEnabled: haEnabled,
chunkSize: chunkSize,
client: client,
permitPool: physical.NewPermitPool(maxParallel),
logger: logger,

haEnabled: haEnabled,
haClient: haClient,

logger: logger,
}, nil
}

Expand Down
16 changes: 4 additions & 12 deletions physical/gcs/gcs_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
// metricLockUnlock is the metric to register for a lock delete.
metricLockUnlock = []string{"gcs", "lock", "unlock"}

// metricLockGet is the metric to register for a lock get.
// metricLockLock is the metric to register for a lock get.
metricLockLock = []string{"gcs", "lock", "lock"}

// metricLockValue is the metric to register for a lock create/update.
Expand Down Expand Up @@ -194,7 +194,7 @@ func (l *Lock) Unlock() error {
MetagenerationMatch: r.attrs.Metageneration,
}

obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key)
obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key)
if err := obj.If(conds).Delete(ctx); err != nil {
// If the pre-condition failed, it means that someone else has already
// acquired the lock and we don't want to delete it.
Expand Down Expand Up @@ -324,10 +324,6 @@ OUTER:
// - if key is empty or identity is the same or timestamp exceeds TTL
// - update the lock to self
func (l *Lock) writeLock() (bool, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Create a transaction to read and the update (maybe)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -376,7 +372,7 @@ func (l *Lock) writeLock() (bool, error) {
}

// Write the object
obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key)
obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key)
w := obj.If(conds).NewWriter(ctx)
w.ObjectAttrs.CacheControl = "no-cache; no-store; max-age=0"
w.ObjectAttrs.Metadata = map[string]string{
Expand All @@ -395,12 +391,8 @@ func (l *Lock) writeLock() (bool, error) {

// get retrieves the value for the lock.
func (l *Lock) get(ctx context.Context) (*LockRecord, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Read
attrs, err := l.backend.client.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx)
attrs, err := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx)
if err == storage.ErrObjectNotExist {
return nil, nil
}
Expand Down

0 comments on commit c76e1e3

Please sign in to comment.