Skip to content
This repository has been archived by the owner on Feb 18, 2025. It is now read-only.

Commit

Permalink
Merge pull request #894 from github/kv-pair-auditing
Browse files Browse the repository at this point in the history
Consul KV consistency checks
  • Loading branch information
Shlomi Noach authored May 26, 2019
2 parents a0ee2af + 1c37be3 commit f6fc25c
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 100 deletions.
5 changes: 5 additions & 0 deletions docs/configuration-kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"KVClusterMasterPrefix": "mysql/master",
"ConsulAddress": "127.0.0.1:8500",
"ZkAddress": "srv-a,srv-b:12181,srv-c",
"ConsulCrossDataCenterDistribution": true,
```

`KVClusterMasterPrefix` is the prefix to use for master discovery entries. As example, your cluster alias is `mycluster` and the master host is `some.host-17.com` then you will expect an entry where:
Expand Down Expand Up @@ -41,3 +42,7 @@ If specified, `ZkAddress` indicates one or more ZooKeeper servers to connect to.
- `srv-a,srv-b:12181,srv-c`
- `srv-a,srv-b:12181,srv-c:2181`
- `srv-a:2181,srv-b:12181,srv-c:2181`

### Consul specific

See [kv](kv.md) documentation for Consul specific settings.
16 changes: 16 additions & 0 deletions docs/kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,19 @@ If your Consul runs cross-DC replication, then it is possible that the same KV u
If your Consul setups do not replicate from each other, `orchestrator` is the _only_ means by which your master discovery is made consistent across your Consul clusters. You get all the nice traits that come with `raft`: if one DC is network partitioned, the `orchestrator` node in that DC will not receive the KV update event, and for a while, neither will the Consul cluster. However, once network access is regained, `orchestartor` will catch up with event log and apply the KV update to the local Consul cluster. The setup is eventual-consistent.

Shortly following a master failover, `orchestrator` generates a `raft` snapshot. This isn't strictly required but is a useful operation: in the event the `orchestrator` node restarts, the snapshot prevents `orchestrator` from replaying the KV write. This is in particular interesting in an event of failover-and-failback, where a remote KV like consul might get two updates for the same cluster. The snapshot mitigates such incidents.

### Consul specific

Optionally, you may configure:

```json
"ConsulCrossDataCenterDistribution": true,
```

...which can (and will) take place in addition to the flow illustrated above.

With `ConsulCrossDataCenterDistribution`, `orchestrator` runs an additional, periodic update to an extended list of Consul clusters.

Once per minute, `orchestrator` leader node queries its configured Consul server for the list of [known datacenters](https://www.consul.io/api/catalog.html#list-datacenters). It then iterates throught those data center clusters, and updates each and every one with the current identities of masters.

This functionality is required in case one has more Consul datacenters than just one-local-consul-per-orchestrator-node. We illustrated above how in a `orchestrator/raft` setup, each node updates its local Consul cluster. However, Consul clusters that are not local to any `orchestrator` node are unaffected by that approach. `ConsulCrossDataCenterDistribution` is the way to include all those other DCs.
19 changes: 2 additions & 17 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ type Configuration struct {
RecoverIntermediateMasterClusterFilters []string // Only do IM recovery on clusters matching these regexp patterns (of course the ".*" pattern matches everything)
ProcessesShellCommand string // Shell that executes command scripts
OnFailureDetectionProcesses []string // Processes to execute when detecting a failover scenario (before making a decision whether to failover or not). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {autoMasterRecovery}, {autoIntermediateMasterRecovery}
PreGracefulTakeoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}
PreFailoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}
PreGracefulTakeoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {countReplicas}, {replicaHosts}, {isDowntimed}
PreFailoverProcesses []string // Processes to execute before doing a failover (aborting operation should any once of them exits with non-zero code; order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {countReplicas}, {replicaHosts}, {isDowntimed}
PostFailoverProcesses []string // Processes to execute after doing a failover (order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {isSuccessful}, {lostReplicas}, {countLostReplicas}
PostUnsuccessfulFailoverProcesses []string // Processes to execute after a not-completely-successful failover (order of execution undefined). May and should use some of these placeholders: {failureType}, {failureDescription}, {command}, {failedHost}, {failureCluster}, {failureClusterAlias}, {failureClusterDomain}, {failedPort}, {successorHost}, {successorPort}, {successorAlias}, {countReplicas}, {replicaHosts}, {isDowntimed}, {isSuccessful}, {lostReplicas}, {countLostReplicas}
PostMasterFailoverProcesses []string // Processes to execute after doing a master failover (order of execution undefined). Uses same placeholders as PostFailoverProcesses
Expand All @@ -248,9 +248,6 @@ type Configuration struct {
DelayMasterPromotionIfSQLThreadNotUpToDate bool // when true, and a master failover takes place, if candidate master has not consumed all relay logs, delay promotion until the sql thread has caught up
PostponeSlaveRecoveryOnLagMinutes uint // Synonym to PostponeReplicaRecoveryOnLagMinutes
PostponeReplicaRecoveryOnLagMinutes uint // On crash recovery, replicas that are lagging more than given minutes are only resurrected late in the recovery process, after master/IM has been elected and processes executed. Value of 0 disables this feature
RemoteSSHForMasterFailover bool // Should orchestrator attempt a remote-ssh relaylog-synching upon master failover? Requires RemoteSSHCommand
RemoteSSHCommand string // A `ssh` command to be used by recovery process to read/apply relaylogs. If provided, this variable must contain the text "{hostname}". The remote SSH login must have the privileges to read/write relay logs. Example: "setuidgid remoteuser ssh {hostname}"
RemoteSSHCommandUseSudo bool // Should orchestrator apply 'sudo' on the remote host upon SSH command
OSCIgnoreHostnameFilters []string // OSC replicas recommendation will ignore replica hostnames matching given patterns
GraphiteAddr string // Optional; address of graphite port. If supplied, metrics will be written here
GraphitePath string // Prefix for graphite path. May include {hostname} magic placeholder
Expand Down Expand Up @@ -412,9 +409,6 @@ func newConfiguration() *Configuration {
FailMasterPromotionIfSQLThreadNotUpToDate: false,
DelayMasterPromotionIfSQLThreadNotUpToDate: false,
PostponeSlaveRecoveryOnLagMinutes: 0,
RemoteSSHForMasterFailover: false,
RemoteSSHCommand: "",
RemoteSSHCommandUseSudo: true,
OSCIgnoreHostnameFilters: []string{},
GraphiteAddr: "",
GraphitePath: "",
Expand Down Expand Up @@ -531,21 +525,12 @@ func (this *Configuration) postReadAdjustments() error {
this.URLPrefix = "/" + this.URLPrefix
}

if this.RemoteSSHCommand != "" {
if !strings.Contains(this.RemoteSSHCommand, "{hostname}") {
return fmt.Errorf("config's RemoteSSHCommand must either be empty, or contain a '{hostname}' placeholder")
}
}

if this.IsSQLite() && this.SQLite3DataFile == "" {
return fmt.Errorf("SQLite3DataFile must be set when BackendDB is sqlite3")
}
if this.IsSQLite() {
// this.HostnameResolveMethod = "none"
}
if this.RemoteSSHForMasterFailover && this.RemoteSSHCommand == "" {
return fmt.Errorf("RemoteSSHCommand is required when RemoteSSHForMasterFailover is set")
}
if this.RaftEnabled && this.RaftDataDir == "" {
return fmt.Errorf("RaftDataDir must be defined since raft is enabled (RaftEnabled)")
}
Expand Down
91 changes: 72 additions & 19 deletions go/kv/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,32 @@
package kv

import (
consulapi "github.com/armon/consul-api"
"fmt"
"sync"
"sync/atomic"

"github.com/github/orchestrator/go/config"

consulapi "github.com/armon/consul-api"
"github.com/patrickmn/go-cache"

"github.com/openark/golib/log"
)

// A Consul store based on config's `ConsulAddress` and `ConsulKVPrefix`
type consulStore struct {
client *consulapi.Client
client *consulapi.Client
kvCache *cache.Cache
pairsDistributionSuccessMutex sync.Mutex
distributionReentry int64
}

// NewConsulStore creates a new consul store. It is possible that the client for this store is nil,
// which is the case if no consul config is provided.
func NewConsulStore() KVStore {
store := &consulStore{}
store := &consulStore{
kvCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
}

if config.Config.ConsulAddress != "" {
consulConfig := consulapi.DefaultConfig()
Expand Down Expand Up @@ -66,29 +78,70 @@ func (this *consulStore) GetKeyValue(key string) (value string, found bool, err
return string(pair.Value), (pair != nil), nil
}

func (this *consulStore) AddKeyValue(key string, value string) (added bool, err error) {
err = this.PutKeyValue(key, value)
return (err != nil), err
}

func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
if config.Config.ConsulCrossDataCenterDistribution {
datacenters, err := this.client.Catalog().Datacenters()
if err != nil {
return err
}
consulPairs := [](*consulapi.KVPair){}
for _, kvPair := range kvPairs {
consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)})
}
for _, datacenter := range datacenters {
// This function is non re-entrant (it can only be running once at any point in time)
if atomic.CompareAndSwapInt64(&this.distributionReentry, 0, 1) {
defer atomic.StoreInt64(&this.distributionReentry, 0)
} else {
return
}

if !config.Config.ConsulCrossDataCenterDistribution {
return nil
}

datacenters, err := this.client.Catalog().Datacenters()
if err != nil {
return err
}
log.Debugf("consulStore.DistributePairs(): distributing %d pairs to %d datacenters", len(kvPairs), len(datacenters))
consulPairs := [](*consulapi.KVPair){}
for _, kvPair := range kvPairs {
consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)})
}
var wg sync.WaitGroup
for _, datacenter := range datacenters {
datacenter := datacenter
wg.Add(1)
go func() {
defer wg.Done()

writeOptions := &consulapi.WriteOptions{Datacenter: datacenter}
queryOptions := &consulapi.QueryOptions{Datacenter: datacenter}
skipped := 0
existing := 0
written := 0
failed := 0

for _, consulPair := range consulPairs {
val := string(consulPair.Value)
kcCacheKey := fmt.Sprintf("%s;%s", datacenter, consulPair.Key)

if value, found := this.kvCache.Get(kcCacheKey); found && val == value {
skipped++
continue
}
if pair, _, err := this.client.KV().Get(consulPair.Key, queryOptions); err == nil && pair != nil {
if val == string(pair.Value) {
existing++
this.kvCache.SetDefault(kcCacheKey, val)
continue
}
}

if _, e := this.client.KV().Put(consulPair, writeOptions); e != nil {
log.Errorf("consulStore.DistributePairs(): failed %s", kcCacheKey)
failed++
err = e
} else {
log.Debugf("consulStore.DistributePairs(): written %s=%s", kcCacheKey, val)
written++
this.kvCache.SetDefault(kcCacheKey, val)
}
}
}
log.Debugf("consulStore.DistributePairs(): datacenter: %s; skipped: %d, existing: %d, written: %d, failed: %d", datacenter, skipped, existing, written, failed)
}()
}
wg.Wait()
return err
}
22 changes: 1 addition & 21 deletions go/kv/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,6 @@ func (this *internalKVStore) GetKeyValue(key string) (value string, found bool,
return value, found, log.Errore(err)
}

func (this *internalKVStore) AddKeyValue(key string, value string) (added bool, err error) {
sqlResult, err := db.ExecOrchestrator(`
insert ignore
into kv_store (
store_key, store_value, last_updated
) values (
?, ?, now()
)
`, key, value,
)
if err != nil {
return false, log.Errore(err)
}
rowsAffected, err := sqlResult.RowsAffected()
if err != nil {
return false, log.Errore(err)
}
return (rowsAffected > 0), nil
}

func (this *internalKVStore) DistributePairs(pairs [](*KVPair)) (err error) {
func (this *internalKVStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
return nil
}
27 changes: 3 additions & 24 deletions go/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func (this *KVPair) String() string {
type KVStore interface {
PutKeyValue(key string, value string) (err error)
GetKeyValue(key string) (value string, found bool, err error)
AddKeyValue(key string, value string) (added bool, err error)
DistributePairs(pairs [](*KVPair)) (err error)
DistributePairs(kvPairs [](*KVPair)) (err error)
}

var kvMutex sync.Mutex
Expand Down Expand Up @@ -92,29 +91,9 @@ func PutKVPair(kvPair *KVPair) (err error) {
return PutValue(kvPair.Key, kvPair.Value)
}

func AddValue(key string, value string) (err error) {
func DistributePairs(kvPairs [](*KVPair)) (err error) {
for _, store := range getKVStores() {
added, err := store.AddKeyValue(key, value)
if err != nil {
return err
}
if !added {
return nil
}
}
return nil
}

func AddKVPair(kvPair *KVPair) (err error) {
if kvPair == nil {
return nil
}
return AddValue(kvPair.Key, kvPair.Value)
}

func DistributePairs(pairs [](*KVPair)) (err error) {
for _, store := range getKVStores() {
if err := store.DistributePairs(pairs); err != nil {
if err := store.DistributePairs(kvPairs); err != nil {
return err
}
}
Expand Down
7 changes: 1 addition & 6 deletions go/kv/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func (this *zkStore) GetKeyValue(key string) (value string, found bool, err erro
return string(result), true, nil
}

func (this *zkStore) AddKeyValue(key string, value string) (added bool, err error) {
err = this.PutKeyValue(key, value)
return (err != nil), err
}

func (this *zkStore) DistributePairs(pairs [](*KVPair)) (err error) {
func (this *zkStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
return nil
}
12 changes: 0 additions & 12 deletions go/logic/command_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func (applier *CommandApplier) ApplyCommand(op string, value []byte) interface{}
return applier.enableGlobalRecoveries(value)
case "put-key-value":
return applier.putKeyValue(value)
case "add-key-value":
return applier.addKeyValue(value)
case "put-instance-tag":
return applier.putInstanceTag(value)
case "delete-instance-tag":
Expand Down Expand Up @@ -266,16 +264,6 @@ func (applier *CommandApplier) putKeyValue(value []byte) interface{} {
return err
}

func (applier *CommandApplier) addKeyValue(value []byte) interface{} {
kvPair := kv.KVPair{}
if err := json.Unmarshal(value, &kvPair); err != nil {
return log.Errore(err)
}
err := kv.AddKVPair(&kvPair)

return err
}

func (applier *CommandApplier) putInstanceTag(value []byte) interface{} {
instanceTag := inst.InstanceTag{}
if err := json.Unmarshal(value, &instanceTag); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion go/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP
selectedError = err
}
}
kv.DistributePairs(submitKvPairs)
if err := kv.DistributePairs(kvPairs); err != nil {
log.Errore(err)
}
return kvPairs, submittedCount, log.Errore(selectedError)
}

Expand Down

0 comments on commit f6fc25c

Please sign in to comment.