diff --git a/physical/gcs/gcs_ha.go b/physical/gcs/gcs_ha.go index feca0a7a14a9..e6934df68c9b 100644 --- a/physical/gcs/gcs_ha.go +++ b/physical/gcs/gcs_ha.go @@ -180,10 +180,30 @@ func (l *Lock) Unlock() error { } l.stopLock.Unlock() - // Delete + // Read the record value before deleting. This needs to be a CAS operation or + // else we might be deleting someone else's lock. ctx := context.Background() - if err := l.backend.Delete(ctx, l.key); err != nil { - return err + r, err := l.get(ctx) + if err != nil { + return errwrap.Wrapf("failed to read lock for deletion: {{err}}", err) + } + if r != nil && r.Identity == l.identity { + ctx := context.Background() + conds := storage.Conditions{ + GenerationMatch: r.attrs.Generation, + MetagenerationMatch: r.attrs.Metageneration, + } + + obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key) + if err := obj.If(conds).Delete(ctx); err != nil { + // If the pre-condition failed, it means that someone else has already + // acquired the lock and we don't want to delete it. + if terr, ok := err.(*googleapi.Error); ok && terr.Code == 412 { + l.backend.logger.Debug("unlock: preconditions failed (lock already taken by someone else?)") + } else { + return errwrap.Wrapf("failed to delete lock: {{err}}", err) + } + } } // We are no longer holding the lock @@ -254,20 +274,26 @@ func (l *Lock) watchLock() { retries := 0 ticker := time.NewTicker(l.watchRetryInterval) +OUTER: for { // Check if the channel is already closed select { case <-l.stopCh: + break OUTER default: } // Check if we've exceeded retries if retries >= l.watchRetryMax-1 { - break + break OUTER } // Wait for the timer - <-ticker.C + select { + case <-ticker.C: + case <-l.stopCh: + break OUTER + } // Attempt to read the key r, err := l.get(context.Background()) @@ -278,7 +304,7 @@ func (l *Lock) watchLock() { // Verify the identity is the same if r == nil || r.Identity != l.identity { - break + break OUTER } } diff --git a/physical/spanner/spanner_ha.go b/physical/spanner/spanner_ha.go index cc4b8bfe2633..2e7b73965d69 100644 --- a/physical/spanner/spanner_ha.go +++ b/physical/spanner/spanner_ha.go @@ -277,20 +277,26 @@ func (l *Lock) watchLock() { retries := 0 ticker := time.NewTicker(l.watchRetryInterval) +OUTER: for { // Check if the channel is already closed select { case <-l.stopCh: + break OUTER default: } // Check if we've exceeded retries if retries >= l.watchRetryMax-1 { - break + break OUTER } // Wait for the timer - <-ticker.C + select { + case <-ticker.C: + case <-l.stopCh: + break OUTER + } // Attempt to read the key r, err := l.get(context.Background()) @@ -301,7 +307,7 @@ func (l *Lock) watchLock() { // Verify the identity is the same if r == nil || r.Identity != l.identity { - break + break OUTER } }