-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplicaset.go
150 lines (126 loc) · 3.71 KB
/
replicaset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"fmt"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type ReplicaSet struct {
Name string
Members []string
}
func (r *ReplicaSet) init() error {
session, err := mgo.DialWithTimeout(fmt.Sprintf(
"%v?connect=direct", r.Members[0]), 5*time.Second)
if err != nil {
return errors.Wrapf(err, "%v connection failed", r.Members[0])
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
memberList := make([]map[string]interface{}, len(r.Members))
for i, item := range r.Members {
memberList[i] = bson.M{"_id": i, "host": item}
}
config := bson.M{
"_id": r.Name,
"members": memberList,
}
result := bson.M{}
if err := session.Run(bson.M{"replSetInitiate": config}, &result); err != nil {
if err.Error() == "already initialized" {
logrus.Warnf("%v replica set already initialized", r.Name)
} else {
return errors.Wrapf(err, "%v replSetInitiate failed", r.Name)
}
}
return nil
}
func (r *ReplicaSet) InitWithRetry(retry int, wait int) error {
for _, member := range r.Members {
err := pingWithRetry(member, retry, wait)
if err != nil {
return errors.Wrap(err, "ReplicaSet init failed")
} else {
logrus.Infof("%v member %v is online", r.Name, member)
}
}
err := r.init()
if err != nil {
return errors.Wrap(err, "ReplicaSet init failed")
}
return nil
}
func (r *ReplicaSet) hasPrimary() (bool, error) {
session, err := mgo.DialWithTimeout(fmt.Sprintf(
"%v?connect=direct", r.Members[0]), 5*time.Second)
if err != nil {
return false, errors.Wrapf(err, "%v connection failed", r.Members[0])
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
status := &ReplicaSetStatus{}
if err := session.Run("replSetGetStatus", &status); err != nil {
return false, errors.Wrapf(err, "%v replSetGetStatus failed", r.Name)
} else {
for _, m := range status.Members {
if m.StateStr == "PRIMARY" {
return true, nil
}
}
}
return false, nil
}
func (r *ReplicaSet) WaitForPrimary(retry int, wait int) (bool, error) {
var hasPrimary bool
var err error
for retry > 0 {
hasPrimary, err = r.hasPrimary()
if err != nil {
return false, errors.Wrapf(err, "connection failed")
}
if hasPrimary {
return true, nil
}
retry--
logrus.Warnf("No primary detected for set %v retying in %v seconds", r.Name, wait)
time.Sleep(time.Duration(wait) * time.Second)
}
return hasPrimary, nil
}
func (r *ReplicaSet) PrintStatus() error {
session, err := mgo.DialWithTimeout(fmt.Sprintf(
"%v?connect=direct", r.Members[0]), 5*time.Second)
if err != nil {
return errors.Wrapf(err, "%v connection failed", r.Members[0])
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
status := &ReplicaSetStatus{}
if err := session.Run("replSetGetStatus", &status); err != nil {
return errors.Wrapf(err, "%v replSetGetStatus failed", r.Name)
} else {
for _, m := range status.Members {
logrus.Infof("%v member %v state %v", status.Name, m.Name, m.StateStr)
if len(m.ErrMsg) > 0 {
logrus.Warnf("%v member %v error %v", status.Name, m.Name, m.ErrMsg)
}
}
}
return nil
}
// replica set replSetGetStatus response object
type ReplicaSetStatus struct {
Name string `bson:"set" json:"set"`
Members []ReplicaSetMemberStatus `bson:"members" json:"members"`
}
// replica set member replSetGetStatus response object
type ReplicaSetMemberStatus struct {
Id int `bson:"_id" json:"id"`
Name string `bson:"name" json:"name"`
ErrMsg string `bson:"errmsg" json:"errmsg"`
Healthy bool `bson:"health" json:"healthy"`
StateStr string `bson:"stateStr" json:"state"`
Uptime time.Duration `bson:"uptime" json:"uptime"`
}