Skip to content

Commit

Permalink
retry after transient network failures
Browse files Browse the repository at this point in the history
  • Loading branch information
ryansb committed Nov 18, 2013
1 parent 57f8d5a commit 4d053d5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
6 changes: 3 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func NewConnFromConfig(conf Config) (c EurekaConnection) {
// NewConn is a default connection with just a list of ServiceUrls. Most basic
// way to make a new connection. Generally only if you know what you're doing
// and are going to do the configuration yourself some other way.
func NewConn(address ...string) (c EurekaConnection) {
c.ServiceUrls = address
return c
func NewConn(address ...string) (e EurekaConnection) {
e.ServiceUrls = address
return e
}

// UpdateApp creates a goroutine that continues to keep an application updated
Expand Down
28 changes: 19 additions & 9 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,22 @@ import (
"fmt"
"github.com/pmylund/go-cache"
"net/http"
"strings"
"time"
)

// expire cached items after 30 seconds, cleanup every 10
var eurekaCache = cache.New(30*time.Second, 10*time.Second)

func (e *EurekaConnection) generateUrl(slugs ...string) string {
return strings.Join(append([]string{e.SelectServiceURL()}, slugs...), "/")
}

// GetApp returns a single eureka application by name. This may be cached.
func (e *EurekaConnection) GetApp(name string) (Application, error) {
url := fmt.Sprintf("%s/%s/%s", e.SelectServiceURL(), EurekaURLSlugs["Apps"], name)
cachedApp, found := eurekaCache.Get(url)
slug := fmt.Sprintf("%s/%s", EurekaURLSlugs["Apps"], name)
url := e.generateUrl(slug)
cachedApp, found := eurekaCache.Get(slug)
if found {
log.Notice("Got %s from cache", url)
return cachedApp.(Application), nil
Expand All @@ -63,14 +69,15 @@ func (e *EurekaConnection) GetApp(name string) (Application, error) {
if rcode > 299 || rcode < 200 {
log.Warning("Non-200 rcode of %d", rcode)
}
eurekaCache.Set(url, v, 0)
eurekaCache.Set(slug, v, 0)
return v, nil
}

// GetApps returns a map of all Applications. Note: May be cached
func (e *EurekaConnection) GetApps() (map[string]Application, error) {
url := fmt.Sprintf("%s/%s", e.SelectServiceURL(), EurekaURLSlugs["Apps"])
cachedApps, found := eurekaCache.Get(url)
slug := EurekaURLSlugs["Apps"]
url := e.generateUrl(slug)
cachedApps, found := eurekaCache.Get(slug)
if found {
log.Notice("Got %s from cache", url)
return cachedApps.(map[string]Application), nil
Expand All @@ -94,15 +101,16 @@ func (e *EurekaConnection) GetApps() (map[string]Application, error) {
if rcode > 299 || rcode < 200 {
log.Warning("Non-200 rcode of %d", rcode)
}
eurekaCache.Set(url, apps, 0)
eurekaCache.Set(slug, apps, 0)
return apps, nil
}

// RegisterInstance will register the relevant Instance with eureka but DOES
// NOT automatically send heartbeats. See HeartBeatInstance for that
// functionality
func (e *EurekaConnection) RegisterInstance(ins *Instance) error {
url := fmt.Sprintf("%s/%s/%s", e.SelectServiceURL(), EurekaURLSlugs["Apps"], ins.App)
slug := fmt.Sprintf("%s/%s", EurekaURLSlugs["Apps"], ins.App)
url := e.generateUrl(slug)
log.Debug("Registering instance with url %s", url)
_, rcode, err := getXML(url + "/" + ins.HostName)
if err != nil {
Expand Down Expand Up @@ -141,7 +149,8 @@ func (e *EurekaConnection) RegisterInstance(ins *Instance) error {
// HeartBeatInstance sends a single eureka heartbeat. Does not continue sending
// heartbeats. Errors if the response is not 200.
func (e *EurekaConnection) HeartBeatInstance(ins *Instance) error {
url := fmt.Sprintf("%s/%s/%s/%s", e.SelectServiceURL(), EurekaURLSlugs["Apps"], ins.App, ins.HostName)
slug := fmt.Sprintf("%s/%s/%s", EurekaURLSlugs["Apps"], ins.App, ins.HostName)
url := e.generateUrl(slug)
log.Debug("Sending heartbeat with url %s", url)
req, err := http.NewRequest("PUT", url, nil)
if err != nil {
Expand All @@ -162,7 +171,8 @@ func (e *EurekaConnection) HeartBeatInstance(ins *Instance) error {

func (e *EurekaConnection) readAppInto(name string, app *Application) error {
//TODO: This should probably use the cache, but it's only called at PollInterval anyways
url := fmt.Sprintf("%s/%s/%s", e.SelectServiceURL(), EurekaURLSlugs["Apps"], name)
slug := fmt.Sprintf("%s/%s", EurekaURLSlugs["Apps"], name)
url := e.generateUrl(slug)
log.Debug("Getting app %s from url %s", name, url)
out, rcode, err := getXML(url)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ import (
)

func TestGetApps(t *testing.T) {
e, _ := fargo.NewConnFromConfigFile("./config_sample/local.gcfg")
Convey("Pull applications", t, func() {
e := fargo.NewConn("http://172.16.0.11:8080/eureka/v2")
a, _ := e.GetApps()
So(len(a["EUREKA"].Instances), ShouldEqual, 2)
})
Convey("Pull single application", t, func() {
e := fargo.NewConn("http://172.16.0.11:8080/eureka/v2")
a, _ := e.GetApp("EUREKA")
So(len(a.Instances), ShouldEqual, 2)
for idx, ins := range a.Instances {
Expand All @@ -54,8 +53,8 @@ func TestGetApps(t *testing.T) {
}

func TestRegistration(t *testing.T) {
e, _ := fargo.NewConnFromConfigFile("./config_sample/local.gcfg")
Convey("Register an instance to TESTAPP", t, func() {
e := fargo.NewConn("http://172.16.0.11:8080/eureka/v2")
i := fargo.Instance{
HostName: "i-123456",
Port: 9090,
Expand All @@ -70,7 +69,6 @@ func TestRegistration(t *testing.T) {
So(err, ShouldBeNil)
})
Convey("Check in for TESTAPP", t, func() {
e := fargo.NewConn("http://172.16.0.11:8080/eureka/v2")
i := fargo.Instance{
HostName: "i-123456",
Port: 9090,
Expand Down
15 changes: 14 additions & 1 deletion rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ package fargo
import (
"bytes"
"io/ioutil"
"net"
"net/http"
"time"
)

func postXML(url string, reqBody []byte) ([]byte, int, error) {
Expand Down Expand Up @@ -66,7 +68,18 @@ func reqXML(req *http.Request) ([]byte, int, error) {

// Send the request via a client
client := &http.Client{}
resp, err := client.Do(req)
var resp *http.Response
var err error
for i := 0; i < 3; i++ {
resp, err = client.Do(req)
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// it's a transient network error so we sleep for a bit and try
// again in case it's a short-lived issue
log.Warning("Retrying after temporary network failure, error: %s",
nerr.Error())
time.Sleep(10)
}
}
if err != nil {
return nil, -1, err
}
Expand Down

0 comments on commit 4d053d5

Please sign in to comment.