From e60395beb1bb707dc686ec303be821ad2f71161e Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 21 Mar 2017 16:23:31 -0400 Subject: [PATCH] update grpc-go and update non-pinned deps Switch from Peter's fork of grpc to my own fork, since Peter's out of office. My fork is https://github.com/grpc/grpc-go head + Peter's one commit increasing the per-stream flow-control window. The motivation of syncing to newer gRPC is to include grpc/grpc-go#993 for helping with https://github.com/cockroachdb/cockroach/issues/13989 - we will move to gRPC internal heartbeats instead of using our own connection heartbeats. Other dep updates: - the Lightstep update is a single commit that seems innocuous --- .../distribution/registry/api/v2/urls.go | 111 +++----- .../distribution/registry/api/v2/urls_test.go | 249 ++++-------------- github.com/google/go-github/github/github.go | 2 +- .../google/go-github/github/orgs_projects.go | 60 +++++ .../go-github/github/orgs_projects_test.go | 68 +++++ .../google/go-github/github/repos_projects.go | 15 +- .../go-github/github/repos_projects_test.go | 4 +- .../google/go-github/github/users_gpg_keys.go | 22 +- .../go-github/github/users_gpg_keys_test.go | 32 ++- .../thrift_rpc/logencoder.go | 38 +-- github.com/tebeka/go2xunit/ChangeLog | 3 + .../go2xunit/data/in/gotest-escaped.out | 10 + .../data/out/xunit.net/gocheck-empty.out.xml | 3 +- .../data/out/xunit.net/gocheck-fail.out.xml | 3 +- .../data/out/xunit.net/gocheck-panic.out.xml | 3 +- .../data/out/xunit.net/gocheck-pass.out.xml | 3 +- .../out/xunit.net/gocheck-setup-miss.out.xml | 3 +- .../data/out/xunit.net/gotest-0.out.xml | 3 +- .../data/out/xunit.net/gotest-1.5.out.xml | 3 +- .../data/out/xunit.net/gotest-1.6.out.xml | 3 +- .../data/out/xunit.net/gotest-1.7.out.xml | 3 +- .../out/xunit.net/gotest-datarace.out.xml | 3 +- .../data/out/xunit.net/gotest-empty.out.xml | 3 +- .../data/out/xunit.net/gotest-escaped.out.xml | 50 ++++ .../data/out/xunit.net/gotest-fail.out.xml | 3 +- .../xunit.net/gotest-fatal-nosummary.out.xml | 3 +- .../data/out/xunit.net/gotest-log.out.xml | 3 +- .../data/out/xunit.net/gotest-multi.out.xml | 3 +- .../out/xunit.net/gotest-multierror.out.xml | 3 +- .../gotest-negative-duration.out.xml | 3 +- .../data/out/xunit.net/gotest-nofiles.out.xml | 3 +- .../out/xunit.net/gotest-nosummary.out.xml | 3 +- .../data/out/xunit.net/gotest-num.out.xml | 3 +- .../data/out/xunit.net/gotest-panic.out.xml | 3 +- .../data/out/xunit.net/gotest-pass.out.xml | 3 +- .../data/out/xunit.net/gotest-print.out.xml | 3 +- .../xunit.net/gotest-testify-suite.out.xml | 3 +- .../data/out/xunit.net/gotest.out.xml | 3 +- .../data/out/xunit/gocheck-empty.out.xml | 3 +- .../data/out/xunit/gocheck-fail.out.xml | 3 +- .../data/out/xunit/gocheck-panic.out.xml | 3 +- .../data/out/xunit/gocheck-pass.out.xml | 3 +- .../data/out/xunit/gocheck-setup-miss.out.xml | 3 +- .../go2xunit/data/out/xunit/gotest-0.out.xml | 3 +- .../data/out/xunit/gotest-1.5.out.xml | 3 +- .../data/out/xunit/gotest-1.6.out.xml | 3 +- .../data/out/xunit/gotest-1.7.out.xml | 3 +- .../data/out/xunit/gotest-datarace.out.xml | 3 +- .../data/out/xunit/gotest-empty.out.xml | 3 +- .../data/out/xunit/gotest-escaped.out.xml | 16 ++ .../data/out/xunit/gotest-fail.out.xml | 3 +- .../out/xunit/gotest-fatal-nosummary.out.xml | 3 +- .../data/out/xunit/gotest-log.out.xml | 3 +- .../data/out/xunit/gotest-multi.out.xml | 3 +- .../data/out/xunit/gotest-multierror.out.xml | 3 +- .../xunit/gotest-negative-duration.out.xml | 3 +- .../data/out/xunit/gotest-nofiles.out.xml | 3 +- .../data/out/xunit/gotest-nosummary.out.xml | 3 +- .../data/out/xunit/gotest-num.out.xml | 3 +- .../data/out/xunit/gotest-panic.out.xml | 3 +- .../data/out/xunit/gotest-pass.out.xml | 3 +- .../data/out/xunit/gotest-print.out.xml | 3 +- .../out/xunit/gotest-testify-suite.out.xml | 3 +- .../go2xunit/data/out/xunit/gotest.out.xml | 3 +- github.com/tebeka/go2xunit/lib/xmlout.go | 30 ++- github.com/tebeka/go2xunit/main.go | 2 +- .../appengine/cmd/aedeploy/aedeploy.go | 245 +---------------- google.golang.org/grpc/call.go | 3 +- google.golang.org/grpc/clientconn.go | 116 ++++---- google.golang.org/grpc/clientconn_test.go | 41 +++ .../grpc/examples/helloworld/mock/hw_test.go | 2 +- google.golang.org/grpc/grpclb/grpclb_test.go | 56 ++-- google.golang.org/grpc/keepalive/keepalive.go | 52 ++++ google.golang.org/grpc/rpc_util.go | 2 +- google.golang.org/grpc/server_test.go | 2 +- google.golang.org/grpc/stream.go | 43 +-- google.golang.org/grpc/test/end2end_test.go | 155 ++++++----- google.golang.org/grpc/transport/control.go | 10 +- .../grpc/transport/handler_server_test.go | 2 +- .../grpc/transport/http2_client.go | 195 ++++++++++---- .../grpc/transport/http2_server.go | 20 +- google.golang.org/grpc/transport/transport.go | 11 +- .../grpc/transport/transport_test.go | 134 +++++++++- 83 files changed, 1109 insertions(+), 842 deletions(-) create mode 100644 github.com/google/go-github/github/orgs_projects.go create mode 100644 github.com/google/go-github/github/orgs_projects_test.go create mode 100644 github.com/tebeka/go2xunit/data/in/gotest-escaped.out create mode 100644 github.com/tebeka/go2xunit/data/out/xunit.net/gotest-escaped.out.xml create mode 100644 github.com/tebeka/go2xunit/data/out/xunit/gotest-escaped.out.xml create mode 100644 google.golang.org/grpc/keepalive/keepalive.go diff --git a/github.com/docker/distribution/registry/api/v2/urls.go b/github.com/docker/distribution/registry/api/v2/urls.go index 3eb3e09d92..1337bdb127 100644 --- a/github.com/docker/distribution/registry/api/v2/urls.go +++ b/github.com/docker/distribution/registry/api/v2/urls.go @@ -2,10 +2,8 @@ package v2 import ( "fmt" - "net" "net/http" "net/url" - "strconv" "strings" "github.com/docker/distribution/reference" @@ -49,66 +47,42 @@ func NewURLBuilderFromString(root string, relative bool) (*URLBuilder, error) { // NewURLBuilderFromRequest uses information from an *http.Request to // construct the root url. func NewURLBuilderFromRequest(r *http.Request, relative bool) *URLBuilder { - var scheme string - - forwardedProto := r.Header.Get("X-Forwarded-Proto") - // TODO: log the error - forwardedHeader, _, _ := parseForwardedHeader(r.Header.Get("Forwarded")) - - switch { - case len(forwardedProto) > 0: - scheme = forwardedProto - case len(forwardedHeader["proto"]) > 0: - scheme = forwardedHeader["proto"] - case r.TLS != nil: - scheme = "https" - case len(r.URL.Scheme) > 0: - scheme = r.URL.Scheme - default: + var ( scheme = "http" - } + host = r.Host + ) - host := r.Host - - if forwardedHost := r.Header.Get("X-Forwarded-Host"); len(forwardedHost) > 0 { - // According to the Apache mod_proxy docs, X-Forwarded-Host can be a - // comma-separated list of hosts, to which each proxy appends the - // requested host. We want to grab the first from this comma-separated - // list. - hosts := strings.SplitN(forwardedHost, ",", 2) - host = strings.TrimSpace(hosts[0]) - } else if addr, exists := forwardedHeader["for"]; exists { - host = addr - } else if h, exists := forwardedHeader["host"]; exists { - host = h + if r.TLS != nil { + scheme = "https" + } else if len(r.URL.Scheme) > 0 { + scheme = r.URL.Scheme } - portLessHost, port := host, "" - if !isIPv6Address(portLessHost) { - // with go 1.6, this would treat the last part of IPv6 address as a port - portLessHost, port, _ = net.SplitHostPort(host) - } - if forwardedPort := r.Header.Get("X-Forwarded-Port"); len(port) == 0 && len(forwardedPort) > 0 { - ports := strings.SplitN(forwardedPort, ",", 2) - forwardedPort = strings.TrimSpace(ports[0]) - if _, err := strconv.ParseInt(forwardedPort, 10, 32); err == nil { - port = forwardedPort + // Handle fowarded headers + // Prefer "Forwarded" header as defined by rfc7239 if given + // see https://tools.ietf.org/html/rfc7239 + if forwarded := r.Header.Get("Forwarded"); len(forwarded) > 0 { + forwardedHeader, _, err := parseForwardedHeader(forwarded) + if err == nil { + if fproto := forwardedHeader["proto"]; len(fproto) > 0 { + scheme = fproto + } + if fhost := forwardedHeader["host"]; len(fhost) > 0 { + host = fhost + } } - } - - if len(portLessHost) > 0 { - host = portLessHost - } - if len(port) > 0 { - // remove enclosing brackets of ipv6 address otherwise they will be duplicated - if len(host) > 1 && host[0] == '[' && host[len(host)-1] == ']' { - host = host[1 : len(host)-1] + } else { + if forwardedProto := r.Header.Get("X-Forwarded-Proto"); len(forwardedProto) > 0 { + scheme = forwardedProto + } + if forwardedHost := r.Header.Get("X-Forwarded-Host"); len(forwardedHost) > 0 { + // According to the Apache mod_proxy docs, X-Forwarded-Host can be a + // comma-separated list of hosts, to which each proxy appends the + // requested host. We want to grab the first from this comma-separated + // list. + hosts := strings.SplitN(forwardedHost, ",", 2) + host = strings.TrimSpace(hosts[0]) } - // JoinHostPort properly encloses ipv6 addresses in square brackets - host = net.JoinHostPort(host, port) - } else if isIPv6Address(host) && host[0] != '[' { - // ipv6 needs to be enclosed in square brackets in urls - host = "[" + host + "]" } basePath := routeDescriptorsMap[RouteNameBase].Path @@ -290,28 +264,3 @@ func appendValues(u string, values ...url.Values) string { return appendValuesURL(up, values...).String() } - -// isIPv6Address returns true if given string is a valid IPv6 address. No port is allowed. The address may be -// enclosed in square brackets. -func isIPv6Address(host string) bool { - if len(host) > 1 && host[0] == '[' && host[len(host)-1] == ']' { - host = host[1 : len(host)-1] - } - // The IPv6 scoped addressing zone identifier starts after the last percent sign. - if i := strings.LastIndexByte(host, '%'); i > 0 { - host = host[:i] - } - ip := net.ParseIP(host) - if ip == nil { - return false - } - if ip.To16() == nil { - return false - } - if ip.To4() == nil { - return true - } - // dot can be present in ipv4-mapped address, it needs to come after a colon though - i := strings.IndexAny(host, ":.") - return i >= 0 && host[i] == ':' -} diff --git a/github.com/docker/distribution/registry/api/v2/urls_test.go b/github.com/docker/distribution/registry/api/v2/urls_test.go index 6449c6e4b6..4f854b23b0 100644 --- a/github.com/docker/distribution/registry/api/v2/urls_test.go +++ b/github.com/docker/distribution/registry/api/v2/urls_test.go @@ -207,7 +207,7 @@ func TestBuilderFromRequest(t *testing.T) { { name: "https protocol forwarded with a non-standard header", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "X-Forwarded-Proto": []string{"https"}, + "X-Custom-Forwarded-Proto": []string{"https"}, }}, base: "http://example.com", }, @@ -253,6 +253,7 @@ func TestBuilderFromRequest(t *testing.T) { { name: "forwarded port with a non-standard header", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ + "X-Forwarded-Host": []string{"example.com:5000"}, "X-Forwarded-Port": []string{"5000"}, }}, base: "http://example.com:5000", @@ -262,16 +263,33 @@ func TestBuilderFromRequest(t *testing.T) { request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ "X-Forwarded-Port": []string{"443 , 5001"}, }}, - base: "http://example.com:443", + base: "http://example.com", + }, + { + name: "forwarded standard port with non-standard headers", + request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ + "X-Forwarded-Proto": []string{"https"}, + "X-Forwarded-Host": []string{"example.com"}, + "X-Forwarded-Port": []string{"443"}, + }}, + base: "https://example.com", + }, + { + name: "forwarded standard port with non-standard headers and explicit port", + request: &http.Request{URL: u, Host: u.Host + ":443", Header: http.Header{ + "X-Forwarded-Proto": []string{"https"}, + "X-Forwarded-Host": []string{u.Host + ":443"}, + "X-Forwarded-Port": []string{"443"}, + }}, + base: "https://example.com:443", }, { name: "several non-standard headers", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ "X-Forwarded-Proto": []string{"https"}, - "X-Forwarded-Host": []string{" first.example.com "}, - "X-Forwarded-Port": []string{" 12345 \t"}, + "X-Forwarded-Host": []string{" first.example.com:12345 "}, }}, - base: "http://first.example.com:12345", + base: "https://first.example.com:12345", }, { name: "forwarded host with port supplied takes priority", @@ -292,16 +310,16 @@ func TestBuilderFromRequest(t *testing.T) { { name: "forwarded protocol and addr using standard header", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`proto=https;for="192.168.22.30:80"`}, + "Forwarded": []string{`proto=https;host="192.168.22.30:80"`}, }}, base: "https://192.168.22.30:80", }, { - name: "forwarded addr takes priority over host", + name: "forwarded host takes priority over for", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`host=reg.example.com;for="192.168.22.30:5000"`}, + "Forwarded": []string{`host="reg.example.com:5000";for="192.168.22.30"`}, }}, - base: "http://192.168.22.30:5000", + base: "http://reg.example.com:5000", }, { name: "forwarded host and protocol using standard header", @@ -320,92 +338,65 @@ func TestBuilderFromRequest(t *testing.T) { { name: "process just the first list element of standard header", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="reg.example.com:443";proto=https, for="reg.example.com:80";proto=http`}, + "Forwarded": []string{`host="reg.example.com:443";proto=https, host="reg.example.com:80";proto=http`}, }}, base: "https://reg.example.com:443", }, { - name: "IPv6 address override port", + name: "IPv6 address use host", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="2607:f0d0:1002:51::4"`}, - "X-Forwarded-Port": []string{"5001"}, + "Forwarded": []string{`for="2607:f0d0:1002:51::4";host="[2607:f0d0:1002:51::4]:5001"`}, + "X-Forwarded-Port": []string{"5002"}, }}, base: "http://[2607:f0d0:1002:51::4]:5001", }, { name: "IPv6 address with port", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="[2607:f0d0:1002:51::4]:4000"`}, + "Forwarded": []string{`host="[2607:f0d0:1002:51::4]:4000"`}, "X-Forwarded-Port": []string{"5001"}, }}, base: "http://[2607:f0d0:1002:51::4]:4000", }, { - name: "IPv6 long address override port", - request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="2607:f0d0:1002:0051:0000:0000:0000:0004"`}, - "X-Forwarded-Port": []string{"5001"}, - }}, - base: "http://[2607:f0d0:1002:0051:0000:0000:0000:0004]:5001", - }, - { - name: "IPv6 long address enclosed in brackets - be benevolent", - request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="[2607:f0d0:1002:0051:0000:0000:0000:0004]"`}, - "X-Forwarded-Port": []string{"5001"}, - }}, - base: "http://[2607:f0d0:1002:0051:0000:0000:0000:0004]:5001", - }, - { - name: "IPv6 long address with port", - request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="[2607:f0d0:1002:0051:0000:0000:0000:0004]:4321"`}, - "X-Forwarded-Port": []string{"5001"}, - }}, - base: "http://[2607:f0d0:1002:0051:0000:0000:0000:0004]:4321", - }, - { - name: "IPv6 address with zone ID", - request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="fe80::bd0f:a8bc:6480:238b%11"`}, - "X-Forwarded-Port": []string{"5001"}, - }}, - base: "http://[fe80::bd0f:a8bc:6480:238b%2511]:5001", - }, - { - name: "IPv6 address with zone ID and port", + name: "non-standard and standard forward headers", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="[fe80::bd0f:a8bc:6480:238b%eth0]:12345"`}, - "X-Forwarded-Port": []string{"5001"}, + "X-Forwarded-Proto": []string{`https`}, + "X-Forwarded-Host": []string{`first.example.com`}, + "X-Forwarded-Port": []string{``}, + "Forwarded": []string{`host=first.example.com; proto=https`}, }}, - base: "http://[fe80::bd0f:a8bc:6480:238b%25eth0]:12345", + base: "https://first.example.com", }, { - name: "IPv6 address without port", + name: "standard header takes precedence over non-standard headers", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "Forwarded": []string{`for="::FFFF:129.144.52.38"`}, + "X-Forwarded-Proto": []string{`http`}, + "Forwarded": []string{`host=second.example.com; proto=https`}, + "X-Forwarded-Host": []string{`first.example.com`}, + "X-Forwarded-Port": []string{`4000`}, }}, - base: "http://[::FFFF:129.144.52.38]", + base: "https://second.example.com", }, { - name: "non-standard and standard forward headers", + name: "incomplete standard header uses default", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ "X-Forwarded-Proto": []string{`https`}, + "Forwarded": []string{`for=127.0.0.1`}, "X-Forwarded-Host": []string{`first.example.com`}, - "X-Forwarded-Port": []string{``}, - "Forwarded": []string{`host=first.example.com; proto=https`}, + "X-Forwarded-Port": []string{`4000`}, }}, - base: "https://first.example.com", + base: "http://" + u.Host, }, { - name: "non-standard headers take precedence over standard one", + name: "standard with just proto", request: &http.Request{URL: u, Host: u.Host, Header: http.Header{ - "X-Forwarded-Proto": []string{`http`}, - "Forwarded": []string{`host=second.example.com; proto=https`}, + "X-Forwarded-Proto": []string{`https`}, + "Forwarded": []string{`proto=https`}, "X-Forwarded-Host": []string{`first.example.com`}, "X-Forwarded-Port": []string{`4000`}, }}, - base: "http://first.example.com:4000", + base: "https://" + u.Host, }, } @@ -428,23 +419,9 @@ func TestBuilderFromRequest(t *testing.T) { continue } - var expectedURL string - proto, ok := tr.request.Header["X-Forwarded-Proto"] - if !ok { - expectedURL = testCase.expectedPath - if !relative { - expectedURL = tr.base + expectedURL - } - } else { - urlBase, err := url.Parse(tr.base) - if err != nil { - t.Fatal(err) - } - urlBase.Scheme = proto[0] - expectedURL = testCase.expectedPath - if !relative { - expectedURL = urlBase.String() + expectedURL - } + expectedURL := testCase.expectedPath + if !relative { + expectedURL = tr.base + expectedURL } if buildURL != expectedURL { @@ -541,119 +518,3 @@ func TestBuilderFromRequestWithPrefix(t *testing.T) { } } } - -func TestIsIPv6Address(t *testing.T) { - for _, tc := range []struct { - name string - address string - isIPv6 bool - }{ - { - name: "IPv6 short address", - address: `2607:f0d0:1002:51::4`, - isIPv6: true, - }, - { - name: "IPv6 short address enclosed in brackets", - address: "[2607:f0d0:1002:51::4]", - isIPv6: true, - }, - { - name: "IPv6 address", - address: `2607:f0d0:1002:0051:0000:0000:0000:0004`, - isIPv6: true, - }, - { - name: "IPv6 address with numeric zone ID", - address: `fe80::bd0f:a8bc:6480:238b%11`, - isIPv6: true, - }, - { - name: "IPv6 address with device name as zone ID", - address: `fe80::bd0f:a8bc:6480:238b%eth0`, - isIPv6: true, - }, - { - name: "IPv6 address with device name as zone ID enclosed in brackets", - address: `[fe80::bd0f:a8bc:6480:238b%eth0]`, - isIPv6: true, - }, - { - name: "IPv4-mapped address", - address: "::FFFF:129.144.52.38", - isIPv6: true, - }, - { - name: "localhost", - address: "::1", - isIPv6: true, - }, - { - name: "localhost", - address: "::1", - isIPv6: true, - }, - { - name: "long localhost address", - address: "0:0:0:0:0:0:0:1", - isIPv6: true, - }, - { - name: "IPv6 long address with port", - address: "[2607:f0d0:1002:0051:0000:0000:0000:0004]:4321", - isIPv6: false, - }, - { - name: "too many groups", - address: "2607:f0d0:1002:0051:0000:0000:0000:0004:4321", - isIPv6: false, - }, - { - name: "square brackets don't make an IPv6 address", - address: "[2607:f0d0]", - isIPv6: false, - }, - { - name: "require two consecutive colons in localhost", - address: ":1", - isIPv6: false, - }, - { - name: "more then 4 hexadecimal digits", - address: "2607:f0d0b:1002:0051:0000:0000:0000:0004", - isIPv6: false, - }, - { - name: "too short address", - address: `2607:f0d0:1002:0000:0000:0000:0004`, - isIPv6: false, - }, - { - name: "IPv4 address", - address: `192.168.100.1`, - isIPv6: false, - }, - { - name: "unclosed bracket", - address: `[2607:f0d0:1002:0051:0000:0000:0000:0004`, - isIPv6: false, - }, - { - name: "trailing bracket", - address: `2607:f0d0:1002:0051:0000:0000:0000:0004]`, - isIPv6: false, - }, - { - name: "domain name", - address: `localhost`, - isIPv6: false, - }, - } { - isIPv6 := isIPv6Address(tc.address) - if isIPv6 && !tc.isIPv6 { - t.Errorf("[%s] address %q falsely detected as IPv6 address", tc.name, tc.address) - } else if !isIPv6 && tc.isIPv6 { - t.Errorf("[%s] address %q not recognized as IPv6", tc.name, tc.address) - } - } -} diff --git a/github.com/google/go-github/github/github.go b/github.com/google/go-github/github/github.go index 40798869d6..ba34310945 100644 --- a/github.com/google/go-github/github/github.go +++ b/github.com/google/go-github/github/github.go @@ -27,7 +27,7 @@ import ( ) const ( - libraryVersion = "4" + libraryVersion = "5" defaultBaseURL = "https://api.github.com/" uploadBaseURL = "https://uploads.github.com/" userAgent = "go-github/" + libraryVersion diff --git a/github.com/google/go-github/github/orgs_projects.go b/github.com/google/go-github/github/orgs_projects.go new file mode 100644 index 0000000000..e57cba9782 --- /dev/null +++ b/github.com/google/go-github/github/orgs_projects.go @@ -0,0 +1,60 @@ +// Copyright 2017 The go-github AUTHORS. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package github + +import ( + "context" + "fmt" +) + +// ListProjects lists the projects for an organization. +// +// GitHub API docs: https://developer.github.com/v3/projects/#list-organization-projects +func (s *OrganizationsService) ListProjects(ctx context.Context, org string, opt *ProjectListOptions) ([]*Project, *Response, error) { + u := fmt.Sprintf("orgs/%v/projects", org) + u, err := addOptions(u, opt) + if err != nil { + return nil, nil, err + } + + req, err := s.client.NewRequest("GET", u, nil) + if err != nil { + return nil, nil, err + } + + // TODO: remove custom Accept header when this API fully launches. + req.Header.Set("Accept", mediaTypeProjectsPreview) + + var projects []*Project + resp, err := s.client.Do(ctx, req, &projects) + if err != nil { + return nil, resp, err + } + + return projects, resp, nil +} + +// CreateProject creates a GitHub Project for the specified organization. +// +// GitHub API docs: https://developer.github.com/v3/projects/#create-an-organization-project +func (s *OrganizationsService) CreateProject(ctx context.Context, org string, opt *ProjectOptions) (*Project, *Response, error) { + u := fmt.Sprintf("orgs/%v/projects", org) + req, err := s.client.NewRequest("POST", u, opt) + if err != nil { + return nil, nil, err + } + + // TODO: remove custom Accept header when this API fully launches. + req.Header.Set("Accept", mediaTypeProjectsPreview) + + project := &Project{} + resp, err := s.client.Do(ctx, req, project) + if err != nil { + return nil, resp, err + } + + return project, resp, nil +} diff --git a/github.com/google/go-github/github/orgs_projects_test.go b/github.com/google/go-github/github/orgs_projects_test.go new file mode 100644 index 0000000000..533f691968 --- /dev/null +++ b/github.com/google/go-github/github/orgs_projects_test.go @@ -0,0 +1,68 @@ +// Copyright 2017 The go-github AUTHORS. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package github + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + "testing" +) + +func TestOrganizationsService_ListProjects(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc("/orgs/o/projects", func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, "GET") + testHeader(t, r, "Accept", mediaTypeProjectsPreview) + testFormValues(t, r, values{"state": "open", "page": "2"}) + fmt.Fprint(w, `[{"id":1}]`) + }) + + opt := &ProjectListOptions{State: "open", ListOptions: ListOptions{Page: 2}} + projects, _, err := client.Organizations.ListProjects(context.Background(), "o", opt) + if err != nil { + t.Errorf("Organizations.ListProjects returned error: %v", err) + } + + want := []*Project{{ID: Int(1)}} + if !reflect.DeepEqual(projects, want) { + t.Errorf("Organizations.ListProjects returned %+v, want %+v", projects, want) + } +} + +func TestOrganizationsService_CreateProject(t *testing.T) { + setup() + defer teardown() + + input := &ProjectOptions{Name: "Project Name", Body: "Project body."} + + mux.HandleFunc("/orgs/o/projects", func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, "POST") + testHeader(t, r, "Accept", mediaTypeProjectsPreview) + + v := &ProjectOptions{} + json.NewDecoder(r.Body).Decode(v) + if !reflect.DeepEqual(v, input) { + t.Errorf("Request body = %+v, want %+v", v, input) + } + + fmt.Fprint(w, `{"id":1}`) + }) + + project, _, err := client.Organizations.CreateProject(context.Background(), "o", input) + if err != nil { + t.Errorf("Organizations.CreateProject returned error: %v", err) + } + + want := &Project{ID: Int(1)} + if !reflect.DeepEqual(project, want) { + t.Errorf("Organizations.CreateProject returned %+v, want %+v", project, want) + } +} diff --git a/github.com/google/go-github/github/repos_projects.go b/github.com/google/go-github/github/repos_projects.go index 9e1a4dbb2d..770ffc76fa 100644 --- a/github.com/google/go-github/github/repos_projects.go +++ b/github.com/google/go-github/github/repos_projects.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-github AUTHORS. All rights reserved. +// Copyright 2017 The go-github AUTHORS. All rights reserved. // // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -10,10 +10,19 @@ import ( "fmt" ) +// ProjectListOptions specifies the optional parameters to the +// OrganizationsService.ListProjects and RepositoriesService.ListProjects methods. +type ProjectListOptions struct { + // Indicates the state of the projects to return. Can be either open, closed, or all. Default: open + State string `url:"state,omitempty"` + + ListOptions +} + // ListProjects lists the projects for a repo. // // GitHub API docs: https://developer.github.com/v3/projects/#list-repository-projects -func (s *RepositoriesService) ListProjects(ctx context.Context, owner, repo string, opt *ListOptions) ([]*Project, *Response, error) { +func (s *RepositoriesService) ListProjects(ctx context.Context, owner, repo string, opt *ProjectListOptions) ([]*Project, *Response, error) { u := fmt.Sprintf("repos/%v/%v/projects", owner, repo) u, err := addOptions(u, opt) if err != nil { @@ -28,7 +37,7 @@ func (s *RepositoriesService) ListProjects(ctx context.Context, owner, repo stri // TODO: remove custom Accept header when this API fully launches. req.Header.Set("Accept", mediaTypeProjectsPreview) - projects := []*Project{} + var projects []*Project resp, err := s.client.Do(ctx, req, &projects) if err != nil { return nil, resp, err diff --git a/github.com/google/go-github/github/repos_projects_test.go b/github.com/google/go-github/github/repos_projects_test.go index 7bc323bed0..57dbdd1b3f 100644 --- a/github.com/google/go-github/github/repos_projects_test.go +++ b/github.com/google/go-github/github/repos_projects_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-github AUTHORS. All rights reserved. +// Copyright 2017 The go-github AUTHORS. All rights reserved. // // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -25,7 +25,7 @@ func TestRepositoriesService_ListProjects(t *testing.T) { fmt.Fprint(w, `[{"id":1}]`) }) - opt := &ListOptions{Page: 2} + opt := &ProjectListOptions{ListOptions: ListOptions{Page: 2}} projects, _, err := client.Repositories.ListProjects(context.Background(), "o", "r", opt) if err != nil { t.Errorf("Repositories.ListProjects returned error: %v", err) diff --git a/github.com/google/go-github/github/users_gpg_keys.go b/github.com/google/go-github/github/users_gpg_keys.go index 35cce02092..be88c042ab 100644 --- a/github.com/google/go-github/github/users_gpg_keys.go +++ b/github.com/google/go-github/github/users_gpg_keys.go @@ -40,12 +40,24 @@ type GPGEmail struct { Verified *bool `json:"verified,omitempty"` } -// ListGPGKeys lists the current user's GPG keys. It requires authentication +// ListGPGKeys lists the public GPG keys for a user. Passing the empty +// string will fetch keys for the authenticated user. It requires authentication // via Basic Auth or via OAuth with at least read:gpg_key scope. -// -// GitHub API docs: https://developer.github.com/v3/users/gpg_keys/#list-your-gpg-keys -func (s *UsersService) ListGPGKeys(ctx context.Context) ([]*GPGKey, *Response, error) { - req, err := s.client.NewRequest("GET", "user/gpg_keys", nil) + +// GitHub API docs: https://developer.github.com/v3/users/gpg_keys/#list-gpg-keys-for-a-user +func (s *UsersService) ListGPGKeys(ctx context.Context, user string, opt *ListOptions) ([]*GPGKey, *Response, error) { + var u string + if user != "" { + u = fmt.Sprintf("users/%v/gpg_keys", user) + } else { + u = "user/gpg_keys" + } + u, err := addOptions(u, opt) + if err != nil { + return nil, nil, err + } + + req, err := s.client.NewRequest("GET", u, nil) if err != nil { return nil, nil, err } diff --git a/github.com/google/go-github/github/users_gpg_keys_test.go b/github.com/google/go-github/github/users_gpg_keys_test.go index 12bd7ef7af..f4180df988 100644 --- a/github.com/google/go-github/github/users_gpg_keys_test.go +++ b/github.com/google/go-github/github/users_gpg_keys_test.go @@ -14,17 +14,19 @@ import ( "testing" ) -func TestUsersService_ListGPGKeys(t *testing.T) { +func TestUsersService_ListGPGKeys_authenticatedUser(t *testing.T) { setup() defer teardown() mux.HandleFunc("/user/gpg_keys", func(w http.ResponseWriter, r *http.Request) { testMethod(t, r, "GET") testHeader(t, r, "Accept", mediaTypeGitSigningPreview) + testFormValues(t, r, values{"page": "2"}) fmt.Fprint(w, `[{"id":1,"primary_key_id":2}]`) }) - keys, _, err := client.Users.ListGPGKeys(context.Background()) + opt := &ListOptions{Page: 2} + keys, _, err := client.Users.ListGPGKeys(context.Background(), "", opt) if err != nil { t.Errorf("Users.ListGPGKeys returned error: %v", err) } @@ -35,6 +37,32 @@ func TestUsersService_ListGPGKeys(t *testing.T) { } } +func TestUsersService_ListGPGKeys_specifiedUser(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc("/users/u/gpg_keys", func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, "GET") + testHeader(t, r, "Accept", mediaTypeGitSigningPreview) + fmt.Fprint(w, `[{"id":1,"primary_key_id":2}]`) + }) + + keys, _, err := client.Users.ListGPGKeys(context.Background(), "u", nil) + if err != nil { + t.Errorf("Users.ListGPGKeys returned error: %v", err) + } + + want := []*GPGKey{{ID: Int(1), PrimaryKeyID: Int(2)}} + if !reflect.DeepEqual(keys, want) { + t.Errorf("Users.ListGPGKeys = %+v, want %+v", keys, want) + } +} + +func TestUsersService_ListGPGKeys_invalidUser(t *testing.T) { + _, _, err := client.Users.ListGPGKeys(context.Background(), "%", nil) + testURLParseError(t, err) +} + func TestUsersService_GetGPGKey(t *testing.T) { setup() defer teardown() diff --git a/github.com/lightstep/lightstep-tracer-go/thrift_rpc/logencoder.go b/github.com/lightstep/lightstep-tracer-go/thrift_rpc/logencoder.go index 077ac8f31e..327ebf1990 100644 --- a/github.com/lightstep/lightstep-tracer-go/thrift_rpc/logencoder.go +++ b/github.com/lightstep/lightstep-tracer-go/thrift_rpc/logencoder.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/lightstep/lightstep-tracer-go/lightstep_thrift" - "github.com/lightstep/lightstep-tracer-go/thrift_0_9_2/lib/go/thrift" "github.com/opentracing/opentracing-go/log" ) @@ -25,27 +24,30 @@ type logFieldEncoder struct { } func (lfe *logFieldEncoder) EmitString(key, value string) { - if key == deprecatedFieldKeyEvent { - if len(value) > lfe.recorder.maxLogMessageLen { - value = value[:(lfe.recorder.maxLogMessageLen-1)] + ellipsis - } - lfe.logRecord.StableName = thrift.StringPtr(value) + if len(value) > lfe.recorder.maxLogMessageLen { + value = value[:(lfe.recorder.maxLogMessageLen-1)] + ellipsis } + lfe.logRecord.Fields = append(lfe.logRecord.Fields, &lightstep_thrift.KeyValue{ + Key: key, + Value: value, + }) } + func (lfe *logFieldEncoder) EmitObject(key string, value interface{}) { - if key == deprecatedFieldKeyPayload { - var thriftPayload string - jsonString, err := json.Marshal(value) - if err != nil { - thriftPayload = fmt.Sprintf("Error encoding payload object: %v", err) - } else { - thriftPayload = string(jsonString) - } - if len(thriftPayload) > lfe.recorder.maxLogMessageLen { - thriftPayload = thriftPayload[:(lfe.recorder.maxLogMessageLen-1)] + ellipsis - } - lfe.logRecord.PayloadJson = thrift.StringPtr(thriftPayload) + var thriftPayload string + jsonString, err := json.Marshal(value) + if err != nil { + thriftPayload = fmt.Sprintf("Error encoding payload object: %v", err) + } else { + thriftPayload = string(jsonString) + } + if len(thriftPayload) > lfe.recorder.maxLogMessageLen { + thriftPayload = thriftPayload[:(lfe.recorder.maxLogMessageLen-1)] + ellipsis } + lfe.logRecord.Fields = append(lfe.logRecord.Fields, &lightstep_thrift.KeyValue{ + Key: key, + Value: thriftPayload, + }) } func (lfe *logFieldEncoder) EmitBool(key string, value bool) { diff --git a/github.com/tebeka/go2xunit/ChangeLog b/github.com/tebeka/go2xunit/ChangeLog index 1d30e7211b..771ee1306c 100644 --- a/github.com/tebeka/go2xunit/ChangeLog +++ b/github.com/tebeka/go2xunit/ChangeLog @@ -1,3 +1,6 @@ +2017-03-21 version 1.4.4 +* Escape user-provided strings in XML output (@xperimental in PR #47) + 2016-11-23 version 1.4.3 * Experimental gotest lexer diff --git a/github.com/tebeka/go2xunit/data/in/gotest-escaped.out b/github.com/tebeka/go2xunit/data/in/gotest-escaped.out new file mode 100644 index 0000000000..f51d6a9a79 --- /dev/null +++ b/github.com/tebeka/go2xunit/data/in/gotest-escaped.out @@ -0,0 +1,10 @@ +=== RUN TestEscapedChars +=== RUN TestEscapedChars/no_special_chars +=== RUN TestEscapedChars/"needs_escape" +=== RUN TestEscapedChars/reserved_ +--- PASS: TestEscapedChars (0.00s) + --- PASS: TestEscapedChars/no_special_chars (0.00s) + --- PASS: TestEscapedChars/"needs_escape" (0.00s) + --- PASS: TestEscapedChars/reserved_ (0.00s) +PASS +ok _/go/src/github.com/tebeka/go2xunit/data 0.005s diff --git a/github.com/tebeka/go2xunit/data/out/xunit.net/gocheck-empty.out.xml b/github.com/tebeka/go2xunit/data/out/xunit.net/gocheck-empty.out.xml index 17c33648fc..b0dc5f74b5 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit.net/gocheck-empty.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit.net/gocheck-empty.out.xml @@ -1,4 +1,5 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit.net/gotest-fail.out.xml b/github.com/tebeka/go2xunit/data/out/xunit.net/gotest-fail.out.xml index 84c723cc38..c2f5f7b263 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit.net/gotest-fail.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit.net/gotest-fail.out.xml @@ -1,4 +1,5 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-fail.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-fail.out.xml index 3570888b0e..3bc08e3310 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-fail.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-fail.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-panic.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-panic.out.xml index df4fd26cd7..5b566eaaca 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-panic.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-panic.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-pass.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-pass.out.xml index d9ac257b36..1b2ecfddeb 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-pass.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-pass.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-setup-miss.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-setup-miss.out.xml index b4e1f52eb1..3a1ebb15e7 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gocheck-setup-miss.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gocheck-setup-miss.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-0.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-0.out.xml index b761761091..7d111ed363 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-0.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-0.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.5.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.5.out.xml index d7ef9f31ea..45d27c1feb 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.5.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.5.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.6.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.6.out.xml index d3709e8fb5..b258e66937 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.6.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.6.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.7.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.7.out.xml index 20c96599c3..de4c251acc 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.7.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-1.7.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-datarace.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-datarace.out.xml index 72e185229f..43efb36d6c 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-datarace.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-datarace.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-empty.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-empty.out.xml index 3af4721cd9..6a5706911a 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-empty.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-empty.out.xml @@ -1,3 +1,4 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-escaped.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-escaped.out.xml new file mode 100644 index 0000000000..0dd6800b54 --- /dev/null +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-escaped.out.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-fail.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-fail.out.xml index 20d54b841a..5e5fbfe54d 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-fail.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-fail.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-fatal-nosummary.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-fatal-nosummary.out.xml index 18ccf47bdd..0b4552fff5 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-fatal-nosummary.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-fatal-nosummary.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-log.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-log.out.xml index 7c7f3393a1..52a257767d 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-log.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-log.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-multi.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-multi.out.xml index 8fc6bd7033..f703b01700 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-multi.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-multi.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-multierror.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-multierror.out.xml index 76c8ef8ac1..6086274eb5 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-multierror.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-multierror.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-negative-duration.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-negative-duration.out.xml index 576a3dc2dd..f2cc51e7ae 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-negative-duration.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-negative-duration.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-nofiles.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-nofiles.out.xml index 8fc6bd7033..f703b01700 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-nofiles.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-nofiles.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-nosummary.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-nosummary.out.xml index 19a1d88ab5..e7a781ca97 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-nosummary.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-nosummary.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-num.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-num.out.xml index 643bc3f76e..62ee012974 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-num.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-num.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-panic.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-panic.out.xml index f2bed44ad5..5631f12b81 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-panic.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-panic.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-pass.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-pass.out.xml index 67898e70a8..c37fbcf191 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-pass.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-pass.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-print.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-print.out.xml index ef928eded2..a49621a854 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-print.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-print.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest-testify-suite.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest-testify-suite.out.xml index ee6a8fa893..ef644f774e 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest-testify-suite.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest-testify-suite.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/data/out/xunit/gotest.out.xml b/github.com/tebeka/go2xunit/data/out/xunit/gotest.out.xml index ef928eded2..a49621a854 100644 --- a/github.com/tebeka/go2xunit/data/out/xunit/gotest.out.xml +++ b/github.com/tebeka/go2xunit/data/out/xunit/gotest.out.xml @@ -1,4 +1,5 @@ - + + diff --git a/github.com/tebeka/go2xunit/lib/xmlout.go b/github.com/tebeka/go2xunit/lib/xmlout.go index 135cce2e08..a1d66e774a 100644 --- a/github.com/tebeka/go2xunit/lib/xmlout.go +++ b/github.com/tebeka/go2xunit/lib/xmlout.go @@ -2,6 +2,8 @@ package lib // XML output import ( + "bytes" + "encoding/xml" "fmt" "io" "strconv" @@ -10,12 +12,10 @@ import ( ) const ( - xmlDeclaration = `` - // XUnitTemplate is XML template for xunit style reporting XUnitTemplate string = ` -{{range $suite := .Suites}} -{{range $test := $suite.Tests}} +{{range $suite := .Suites}} +{{range $test := $suite.Tests}} {{if eq $test.Status $.Skipped }} {{end}} {{if eq $test.Status $.Failed }} @@ -31,7 +31,7 @@ const ( // XUnitNetTemplate is XML template for xunit.net // see https://xunit.codeplex.com/wikipage?title=XmlFormat XUnitNetTemplate string = ` - {{range $suite := .Suites}} - {{range $test := $suite.Tests}} - {{if eq $test.Status $.Failed }} @@ -96,6 +96,14 @@ func (r *TestResults) calcTotals() { r.Len = r.NumPassed + r.NumSkipped + r.NumFailed } +func escapeForXML(in string) (string, error) { + w := &bytes.Buffer{} + if err := xml.EscapeText(w, []byte(in)); err != nil { + return "", fmt.Errorf("error escaping text: %s", err) + } + return w.String(), nil +} + // WriteXML exits xunit XML of tests to out func WriteXML(suites []*Suite, out io.Writer, xmlTemplate string, testTime time.Time) { testsResult := TestResults{ @@ -108,9 +116,11 @@ func WriteXML(suites []*Suite, out io.Writer, xmlTemplate string, testTime time. Failed: Failed, } testsResult.calcTotals() - t := template.New("test template") + t := template.New("test template").Funcs(template.FuncMap{ + "escape": escapeForXML, + }) - t, err := t.Parse(xmlDeclaration + xmlTemplate) + t, err := t.Parse(xml.Header + xmlTemplate) if err != nil { fmt.Printf("Error in parse %v\n", err) return diff --git a/github.com/tebeka/go2xunit/main.go b/github.com/tebeka/go2xunit/main.go index 2cf9b2ac41..946fd42de8 100644 --- a/github.com/tebeka/go2xunit/main.go +++ b/github.com/tebeka/go2xunit/main.go @@ -12,7 +12,7 @@ import ( const ( // Version is the current version - Version = "1.4.3" + Version = "1.4.4" ) // getInput return input io.File from file name, if file name is - it will diff --git a/google.golang.org/appengine/cmd/aedeploy/aedeploy.go b/google.golang.org/appengine/cmd/aedeploy/aedeploy.go index 7098cbf9ef..8093c93ff4 100644 --- a/google.golang.org/appengine/cmd/aedeploy/aedeploy.go +++ b/google.golang.org/appengine/cmd/aedeploy/aedeploy.go @@ -16,25 +16,12 @@ package main import ( "flag" "fmt" - "go/build" - "io" - "io/ioutil" "log" "os" "os/exec" - "path/filepath" "strings" ) -var ( - skipFiles = map[string]bool{ - ".git": true, - ".gitconfig": true, - ".hg": true, - ".travis.yml": true, - } -) - func usage() { fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0]) fmt.Fprintf(os.Stderr, "\t%s gcloud --verbosity debug app deploy --version myversion ./app.yaml\tDeploy app to production\n", os.Args[0]) @@ -59,31 +46,18 @@ func main() { os.Exit(1) } - if err := aedeploy(); err != nil { - fmt.Fprintf(os.Stderr, os.Args[0]+": Error: %v\n", err) - os.Exit(1) - } -} - -func aedeploy() error { - tags := []string{"appenginevm"} - app, err := analyze(tags) - if err != nil { - return err - } - - tmpDir, err := app.bundle() - if tmpDir != "" { - defer os.RemoveAll(tmpDir) - } - if err != nil { - return err + notice := func() { + fmt.Fprintln(os.Stderr, `NOTICE: aedeploy is deprecated. Just use "gcloud app deploy".`) } - if err := os.Chdir(tmpDir); err != nil { - return fmt.Errorf("unable to chdir to %v: %v", tmpDir, err) + notice() + if err := deploy(); err != nil { + fmt.Fprintf(os.Stderr, os.Args[0]+": Error: %v\n", err) + notice() + fmt.Fprintln(os.Stderr, `You might need to update gcloud. Run "gcloud components update".`) + os.Exit(1) } - return deploy() + notice() // Make sure they see it at the end. } // deploy calls the provided command to deploy the app from the temporary directory. @@ -96,204 +70,3 @@ func deploy() error { } return nil } - -type app struct { - appFiles []string - imports map[string]string -} - -// analyze checks the app for building with the given build tags and returns -// app files, and a map of full directory import names to original import names. -func analyze(tags []string) (*app, error) { - ctxt := buildContext(tags) - vlogf("Using build context %#v", ctxt) - appFiles, err := appFiles(ctxt) - if err != nil { - return nil, err - } - im, err := imports(ctxt, ".") - return &app{ - appFiles: appFiles, - imports: im, - }, err -} - -// buildContext returns the context for building the source. -func buildContext(tags []string) *build.Context { - return &build.Context{ - GOARCH: "amd64", - GOOS: "linux", - GOROOT: build.Default.GOROOT, - GOPATH: build.Default.GOPATH, - Compiler: build.Default.Compiler, - BuildTags: append(defaultBuildTags, tags...), - } -} - -// All build tags except go1.7, since Go 1.6 is the runtime version. -var defaultBuildTags = []string{ - "go1.1", "go1.2", "go1.3", "go1.4", "go1.5", "go1.6"} - -// bundle bundles the app into a temporary directory. -func (s *app) bundle() (tmpdir string, err error) { - workDir, err := ioutil.TempDir("", "aedeploy") - if err != nil { - return "", fmt.Errorf("unable to create tmpdir: %v", err) - } - - for srcDir, importName := range s.imports { - dstDir := "_gopath/src/" + importName - if err := copyTree(workDir, dstDir, srcDir); err != nil { - return workDir, fmt.Errorf("unable to copy directory %v to %v: %v", srcDir, dstDir, err) - } - } - if err := copyTree(workDir, ".", "."); err != nil { - return workDir, fmt.Errorf("unable to copy root directory to /app: %v", err) - } - return workDir, nil -} - -// imports returns a map of all import directories used by the app. -// The return value maps full directory names to original import names. -func imports(ctxt *build.Context, srcDir string) (map[string]string, error) { - result := make(map[string]string) - - type importFrom struct { - path, fromDir string - } - var imports []importFrom - visited := make(map[importFrom]bool) - - pkg, err := ctxt.ImportDir(srcDir, 0) - if err != nil { - return nil, err - } - for _, v := range pkg.Imports { - imports = append(imports, importFrom{ - path: v, - fromDir: srcDir, - }) - } - - // Resolve all non-standard-library imports - for len(imports) != 0 { - i := imports[0] - imports = imports[1:] // shift - if i.path == "C" { - // ignore cgo - continue - } - if _, ok := visited[i]; ok { - // already scanned - continue - } - visited[i] = true - - abs, err := filepath.Abs(i.fromDir) - if err != nil { - return nil, fmt.Errorf("unable to get absolute directory of %q: %v", i.fromDir, err) - } - pkg, err := ctxt.Import(i.path, abs, 0) - if err != nil { - return nil, fmt.Errorf("unable to find import %s, imported from %q: %v", i.path, i.fromDir, err) - } - - // TODO(cbro): handle packages that are vendored by multiple imports correctly. - - if pkg.Goroot { - // ignore standard library imports - continue - } - - vlogf("Located %q (imported from %q) -> %q", i.path, i.fromDir, pkg.Dir) - result[pkg.Dir] = i.path - - for _, v := range pkg.Imports { - imports = append(imports, importFrom{ - path: v, - fromDir: pkg.Dir, - }) - } - } - - return result, nil -} - -// copyTree copies srcDir to dstDir relative to dstRoot, ignoring skipFiles. -func copyTree(dstRoot, dstDir, srcDir string) error { - vlogf("Copying %q to %q", srcDir, dstDir) - d := filepath.Join(dstRoot, dstDir) - if err := os.MkdirAll(d, 0755); err != nil { - return fmt.Errorf("unable to create directory %q: %v", d, err) - } - - entries, err := ioutil.ReadDir(srcDir) - if err != nil { - return fmt.Errorf("unable to read dir %q: %v", srcDir, err) - } - for _, entry := range entries { - n := entry.Name() - if skipFiles[n] { - continue - } - s := filepath.Join(srcDir, n) - if entry.Mode()&os.ModeSymlink == os.ModeSymlink { - if entry, err = os.Stat(s); err != nil { - return fmt.Errorf("unable to stat %v: %v", s, err) - } - } - d := filepath.Join(dstDir, n) - if entry.IsDir() { - if err := copyTree(dstRoot, d, s); err != nil { - return fmt.Errorf("unable to copy dir %q to %q: %v", s, d, err) - } - continue - } - if err := copyFile(dstRoot, d, s); err != nil { - return fmt.Errorf("unable to copy dir %q to %q: %v", s, d, err) - } - } - return nil -} - -// copyFile copies src to dst relative to dstRoot. -func copyFile(dstRoot, dst, src string) error { - s, err := os.Open(src) - if err != nil { - return fmt.Errorf("unable to open %q: %v", src, err) - } - defer s.Close() - - dst = filepath.Join(dstRoot, dst) - d, err := os.Create(dst) - if err != nil { - return fmt.Errorf("unable to create %q: %v", dst, err) - } - _, err = io.Copy(d, s) - if err != nil { - d.Close() // ignore error, copy already failed. - return fmt.Errorf("unable to copy %q to %q: %v", src, dst, err) - } - if err := d.Close(); err != nil { - return fmt.Errorf("unable to close %q: %v", dst, err) - } - return nil -} - -// appFiles returns a list of all Go source files in the app. -func appFiles(ctxt *build.Context) ([]string, error) { - pkg, err := ctxt.ImportDir(".", 0) - if err != nil { - return nil, err - } - if !pkg.IsCommand() { - return nil, fmt.Errorf(`the root of your app needs to be package "main" (currently %q). Please see https://cloud.google.com/appengine/docs/flexible/go/ for more details on structuring your app.`, pkg.Name) - } - var appFiles []string - for _, f := range pkg.GoFiles { - n := filepath.Join(".", f) - appFiles = append(appFiles, n) - } - vlogf("Found application files %v", appFiles) - return appFiles, nil -} diff --git a/google.golang.org/grpc/call.go b/google.golang.org/grpc/call.go index 81b52be294..c1588c6375 100644 --- a/google.golang.org/grpc/call.go +++ b/google.golang.org/grpc/call.go @@ -36,7 +36,6 @@ package grpc import ( "bytes" "io" - "math" "time" "golang.org/x/net/context" @@ -73,7 +72,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran } } for { - if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil { + if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil { if err == io.EOF { break } diff --git a/google.golang.org/grpc/clientconn.go b/google.golang.org/grpc/clientconn.go index 459ce0b641..19f9a3798b 100644 --- a/google.golang.org/grpc/clientconn.go +++ b/google.golang.org/grpc/clientconn.go @@ -36,6 +36,7 @@ package grpc import ( "errors" "fmt" + "math" "net" "strings" "sync" @@ -45,6 +46,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -78,7 +80,6 @@ var ( errConnClosing = errors.New("grpc: the connection is closing") // errConnUnavailable indicates that the connection is unavailable. errConnUnavailable = errors.New("grpc: the connection is unavailable") - errNoAddr = errors.New("grpc: there is no address available to dial") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -86,23 +87,33 @@ var ( // dialOptions configure a Dial call. dialOptions are set by the DialOption // values passed to Dial. type dialOptions struct { - unaryInt UnaryClientInterceptor - streamInt StreamClientInterceptor - codec Codec - cp Compressor - dc Decompressor - bs backoffStrategy - balancer Balancer - block bool - insecure bool - timeout time.Duration - scChan <-chan ServiceConfig - copts transport.ConnectOptions -} + unaryInt UnaryClientInterceptor + streamInt StreamClientInterceptor + codec Codec + cp Compressor + dc Decompressor + bs backoffStrategy + balancer Balancer + block bool + insecure bool + timeout time.Duration + scChan <-chan ServiceConfig + copts transport.ConnectOptions + maxMsgSize int +} + +const defaultClientMaxMsgSize = math.MaxInt32 // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. +func WithMaxMsgSize(s int) DialOption { + return func(o *dialOptions) { + o.maxMsgSize = s + } +} + // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. func WithCodec(c Codec) DialOption { return func(o *dialOptions) { @@ -249,6 +260,13 @@ func WithUserAgent(s string) DialOption { } } +// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. +func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { + return func(o *dialOptions) { + o.copts.KeepaliveParams = kp + } +} + // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { return func(o *dialOptions) { @@ -288,6 +306,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * conns: make(map[Address]*addrConn), } cc.ctx, cc.cancel = context.WithCancel(context.Background()) + cc.dopts.maxMsgSize = defaultClientMaxMsgSize for _, opt := range opts { opt(&cc.dopts) } @@ -339,17 +358,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } cc.authority = target[:colonPos] } - var ok bool waitC := make(chan error, 1) go func() { - var addrs []Address + defer close(waitC) if cc.dopts.balancer == nil && cc.sc.LB != nil { cc.dopts.balancer = cc.sc.LB } - if cc.dopts.balancer == nil { - // Connect to target directly if balancer is nil. - addrs = append(addrs, Address{Addr: target}) - } else { + if cc.dopts.balancer != nil { var credsClone credentials.TransportCredentials if creds != nil { credsClone = creds.Clone() @@ -362,24 +377,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return } ch := cc.dopts.balancer.Notify() - if ch == nil { - // There is no name resolver installed. - addrs = append(addrs, Address{Addr: target}) - } else { - addrs, ok = <-ch - if !ok || len(addrs) == 0 { - waitC <- errNoAddr - return + if ch != nil { + if cc.dopts.block { + doneChan := make(chan struct{}) + go cc.lbWatcher(doneChan) + <-doneChan + } else { + go cc.lbWatcher(nil) } - } - } - for _, a := range addrs { - if err := cc.resetAddrConn(a, false, nil); err != nil { - waitC <- err return } } - close(waitC) + // No balancer, or no resolver within the balancer. Connect directly. + if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil { + waitC <- err + return + } }() select { case <-ctx.Done(): @@ -390,15 +403,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } } - // If balancer is nil or balancer.Notify() is nil, ok will be false here. - // The lbWatcher goroutine will not be created. - if ok { - go cc.lbWatcher() - } - if cc.dopts.scChan != nil { go cc.scWatcher() } + return cc, nil } @@ -449,7 +457,10 @@ type ClientConn struct { conns map[Address]*addrConn } -func (cc *ClientConn) lbWatcher() { +// lbWatcher watches the Notify channel of the balancer in cc and manages +// connections accordingly. If doneChan is not nil, it is closed after the +// first successfull connection is made. +func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { for addrs := range cc.dopts.balancer.Notify() { var ( add []Address // Addresses need to setup connections. @@ -476,7 +487,15 @@ func (cc *ClientConn) lbWatcher() { } cc.mu.Unlock() for _, a := range add { - cc.resetAddrConn(a, true, nil) + if doneChan != nil { + err := cc.resetAddrConn(a, true, nil) + if err == nil { + close(doneChan) + doneChan = nil + } + } else { + cc.resetAddrConn(a, false, nil) + } } for _, c := range del { c.tearDown(errConnDrain) @@ -505,7 +524,7 @@ func (cc *ClientConn) scWatcher() { // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. -func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error { +func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { ac := &addrConn{ cc: cc, addr: addr, @@ -555,8 +574,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err stale.tearDown(tearDownErr) } } - // skipWait may overwrite the decision in ac.dopts.block. - if ac.dopts.block && !skipWait { + if block { if err := ac.resetTransport(false); err != nil { if err != errConnClosing { // Tear down ac and delete it from cc.conns. @@ -777,6 +795,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { Metadata: ac.addr.Metadata, } newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts) + // Don't call cancel in success path due to a race in Go 1.6: + // https://github.com/golang/go/issues/15078. if err != nil { cancel() @@ -855,9 +875,9 @@ func (ac *addrConn) transportMonitor() { // In both cases, a new ac is created. select { case <-t.Error(): - ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) default: - ac.cc.resetAddrConn(ac.addr, true, errConnDrain) + ac.cc.resetAddrConn(ac.addr, false, errConnDrain) } return case <-t.Error(): @@ -866,7 +886,7 @@ func (ac *addrConn) transportMonitor() { t.Close() return case <-t.GoAway(): - ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) return default: } diff --git a/google.golang.org/grpc/clientconn_test.go b/google.golang.org/grpc/clientconn_test.go index 93e78a5a8d..ea4eaf53db 100644 --- a/google.golang.org/grpc/clientconn_test.go +++ b/google.golang.org/grpc/clientconn_test.go @@ -253,3 +253,44 @@ func TestDialWithBlockErrorOnNonTemporaryErrorDialer(t *testing.T) { t.Fatalf("Dial(%q) = %v, want %v", "", err, context.DeadlineExceeded) } } + +// emptyBalancer returns an empty set of servers. +type emptyBalancer struct { + ch chan []Address +} + +func newEmptyBalancer() Balancer { + return &emptyBalancer{ch: make(chan []Address, 1)} +} +func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error { + b.ch <- nil + return nil +} +func (b *emptyBalancer) Up(_ Address) func(error) { + return nil +} +func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) { + return Address{}, nil, nil +} +func (b *emptyBalancer) Notify() <-chan []Address { + return b.ch +} +func (b *emptyBalancer) Close() error { + close(b.ch) + return nil +} + +func TestNonblockingDialWithEmptyBalancer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + dialDone := make(chan struct{}) + go func() { + conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer())) + if err != nil { + t.Fatalf("unexpected error dialing connection: %v", err) + } + conn.Close() + close(dialDone) + }() + <-dialDone + cancel() +} diff --git a/google.golang.org/grpc/examples/helloworld/mock/hw_test.go b/google.golang.org/grpc/examples/helloworld/mock/hw_test.go index 6f7b4f8604..b63fec7ad6 100644 --- a/google.golang.org/grpc/examples/helloworld/mock/hw_test.go +++ b/google.golang.org/grpc/examples/helloworld/mock/hw_test.go @@ -1,4 +1,4 @@ -package mock +package mock_test import ( "fmt" diff --git a/google.golang.org/grpc/grpclb/grpclb_test.go b/google.golang.org/grpc/grpclb/grpclb_test.go index f034b6ba95..767f400d6e 100644 --- a/google.golang.org/grpc/grpclb/grpclb_test.go +++ b/google.golang.org/grpc/grpclb/grpclb_test.go @@ -332,25 +332,19 @@ func TestDropRequest(t *testing.T) { if err != nil { t.Fatalf("Failed to generate the port number %v", err) } - var bes []*lbpb.Server - be := &lbpb.Server{ - IpAddress: []byte(beAddr1[0]), - Port: int32(bePort1), - LoadBalanceToken: lbToken, - DropRequest: true, - } - bes = append(bes, be) - be = &lbpb.Server{ - IpAddress: []byte(beAddr2[0]), - Port: int32(bePort2), - LoadBalanceToken: lbToken, - DropRequest: false, - } - bes = append(bes, be) - sl := &lbpb.ServerList{ - Servers: bes, - } - sls := []*lbpb.ServerList{sl} + sls := []*lbpb.ServerList{{ + Servers: []*lbpb.Server{{ + IpAddress: []byte(beAddr1[0]), + Port: int32(bePort1), + LoadBalanceToken: lbToken, + DropRequest: true, + }, { + IpAddress: []byte(beAddr2[0]), + Port: int32(bePort2), + LoadBalanceToken: lbToken, + DropRequest: false, + }}, + }} intervals := []time.Duration{0} ls := newRemoteBalancer(sls, intervals) lbpb.RegisterLoadBalancerServer(lb, ls) @@ -371,20 +365,24 @@ func TestDropRequest(t *testing.T) { if err != nil { t.Fatalf("Failed to dial to the backend %v", err) } - // The 1st fail-fast RPC should fail because the 1st backend has DropRequest set to true. helloC := hwpb.NewGreeterClient(cc) - if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable { - t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable) - } - // The 2nd fail-fast RPC should succeed since it chooses the non-drop-request backend according - // to the round robin policy. - if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); err != nil { - t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", helloC, err) - } - // The 3nd non-fail-fast RPC should succeed. + // The 1st, non-fail-fast RPC should succeed. This ensures both server + // connections are made, because the first one has DropRequest set to true. if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil { t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", helloC, err) } + for i := 0; i < 3; i++ { + // Odd fail-fast RPCs should fail, because the 1st backend has DropRequest + // set to true. + if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable { + t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable) + } + // Even fail-fast RPCs should succeed since they choose the + // non-drop-request backend according to the round robin policy. + if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); err != nil { + t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", helloC, err) + } + } cc.Close() } diff --git a/google.golang.org/grpc/keepalive/keepalive.go b/google.golang.org/grpc/keepalive/keepalive.go new file mode 100644 index 0000000000..20672e49d9 --- /dev/null +++ b/google.golang.org/grpc/keepalive/keepalive.go @@ -0,0 +1,52 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Package keepalive defines configurable parameters for point-to-point healthcheck. +package keepalive + +import ( + "time" +) + +// ClientParameters is used to set keepalive parameters on the client-side. +// These configure how the client will actively probe to notice when a connection broken +// and to cause activity so intermediaries are aware the connection is still in use. +type ClientParameters struct { + // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. + Time time.Duration // The current default value is infinity. + // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that + // the connection is closed. + Timeout time.Duration // The current default value is 20 seconds. + // If true, client runs keepalive checks even with no active RPCs. + PermitWithoutStream bool +} diff --git a/google.golang.org/grpc/rpc_util.go b/google.golang.org/grpc/rpc_util.go index d98a88376b..73c3a96655 100644 --- a/google.golang.org/grpc/rpc_util.go +++ b/google.golang.org/grpc/rpc_util.go @@ -377,7 +377,7 @@ type rpcError struct { } func (e *rpcError) Error() string { - return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc) + return fmt.Sprintf("rpc error: code = %s desc = %s", e.code, e.desc) } // Code returns the error code for err if it was produced by the rpc system. diff --git a/google.golang.org/grpc/server_test.go b/google.golang.org/grpc/server_test.go index 23838806d4..53968cc272 100644 --- a/google.golang.org/grpc/server_test.go +++ b/google.golang.org/grpc/server_test.go @@ -60,7 +60,7 @@ func TestStopBeforeServe(t *testing.T) { // server.Serve is responsible for closing the listener, even if the // server was already stopped. err = lis.Close() - if got, want := ErrorDesc(err), "use of closed network connection"; !strings.Contains(got, want) { + if got, want := ErrorDesc(err), "use of closed"; !strings.Contains(got, want) { t.Errorf("Close() error = %q, want %q", got, want) } } diff --git a/google.golang.org/grpc/stream.go b/google.golang.org/grpc/stream.go index bb468dc37e..0ef2077ce2 100644 --- a/google.golang.org/grpc/stream.go +++ b/google.golang.org/grpc/stream.go @@ -37,7 +37,6 @@ import ( "bytes" "errors" "io" - "math" "sync" "time" @@ -208,13 +207,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, - cancel: cancel, + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + maxMsgSize: cc.dopts.maxMsgSize, + cancel: cancel, put: put, t: t, @@ -259,17 +259,18 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { - opts []CallOption - c callInfo - t transport.ClientTransport - s *transport.Stream - p *parser - desc *StreamDesc - codec Codec - cp Compressor - cbuf *bytes.Buffer - dc Decompressor - cancel context.CancelFunc + opts []CallOption + c callInfo + t transport.ClientTransport + s *transport.Stream + p *parser + desc *StreamDesc + codec Codec + cp Compressor + cbuf *bytes.Buffer + dc Decompressor + maxMsgSize int + cancel context.CancelFunc tracing bool // set to EnableTracing when the clientStream is created. @@ -382,7 +383,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { Client: true, } } - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -405,7 +406,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } // Special handling for client streaming rpc. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil) cs.closeTransportStream(err) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) diff --git a/google.golang.org/grpc/test/end2end_test.go b/google.golang.org/grpc/test/end2end_test.go index 9bcea032a4..98d590e9ea 100644 --- a/google.golang.org/grpc/test/end2end_test.go +++ b/google.golang.org/grpc/test/end2end_test.go @@ -570,6 +570,9 @@ func (te *test) clientConn() *grpc.ClientConn { if te.streamClientInt != nil { opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) } + if te.maxMsgSize > 0 { + opts = append(opts, grpc.WithMaxMsgSize(te.maxMsgSize)) + } switch te.e.security { case "tls": creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com") @@ -1427,22 +1430,34 @@ func testExceedMsgLimit(t *testing.T, e env) { tc := testpb.NewTestServiceClient(te.clientConn()) argSize := int32(te.maxMsgSize + 1) - const respSize = 1 + const smallSize = 1 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) if err != nil { t.Fatal(err) } + smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) + if err != nil { + t.Fatal(err) + } + // test on server side for unary RPC req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), - ResponseSize: proto.Int32(respSize), + ResponseSize: proto.Int32(smallSize), Payload: payload, } if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Internal { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.Internal) } + // test on client side for unary RPC + req.ResponseSize = proto.Int32(int32(te.maxMsgSize) + 1) + req.Payload = smallPayload + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Internal { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.Internal) + } + // test on server side for streaming RPC stream, err := tc.FullDuplexCall(te.ctx) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) @@ -1469,6 +1484,21 @@ func testExceedMsgLimit(t *testing.T, e env) { if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Internal { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.Internal) } + + // test on client side for streaming RPC + stream, err = tc.FullDuplexCall(te.ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + respParam[0].Size = proto.Int32(int32(te.maxMsgSize) + 1) + sreq.Payload = smallPayload + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Internal { + t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.Internal) + } + } func TestPeerClientSide(t *testing.T) { @@ -2417,85 +2447,6 @@ func testFailedServerStreaming(t *testing.T, e env) { } } -// checkTimeoutErrorServer is a gRPC server checks context timeout error in FullDuplexCall(). -// It is only used in TestStreamingRPCTimeoutServerError. -type checkTimeoutErrorServer struct { - t *testing.T - done chan struct{} - testpb.TestServiceServer -} - -func (s *checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { - defer close(s.done) - for { - _, err := stream.Recv() - if err != nil { - if grpc.Code(err) != codes.DeadlineExceeded { - s.t.Errorf("stream.Recv() = _, %v, want error code %s", err, codes.DeadlineExceeded) - } - return err - } - if err := stream.Send(&testpb.StreamingOutputCallResponse{ - Payload: &testpb.Payload{ - Body: []byte{'0'}, - }, - }); err != nil { - if grpc.Code(err) != codes.DeadlineExceeded { - s.t.Errorf("stream.Send(_) = %v, want error code %s", err, codes.DeadlineExceeded) - } - return err - } - } -} - -func TestStreamingRPCTimeoutServerError(t *testing.T) { - defer leakCheck(t)() - for _, e := range listTestEnv() { - testStreamingRPCTimeoutServerError(t, e) - } -} - -// testStreamingRPCTimeoutServerError tests the server side behavior. -// When context timeout happens on client side, server should get deadline exceeded error. -func testStreamingRPCTimeoutServerError(t *testing.T, e env) { - te := newTest(t, e) - serverDone := make(chan struct{}) - te.startServer(&checkTimeoutErrorServer{t: t, done: serverDone}) - defer te.tearDown() - - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) - - req := &testpb.StreamingOutputCallRequest{} - for duration := 50 * time.Millisecond; ; duration *= 2 { - ctx, _ := context.WithTimeout(context.Background(), duration) - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) - if grpc.Code(err) == codes.DeadlineExceeded { - // Redo test with double timeout. - continue - } - if err != nil { - t.Errorf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - return - } - for { - err := stream.Send(req) - if err != nil { - break - } - _, err = stream.Recv() - if err != nil { - break - } - } - - // Wait for context timeout on server before closing connection - // to make sure the server will get timeout error. - <-serverDone - break - } -} - // concurrentSendServer is a TestServiceServer whose // StreamingOutputCall makes ten serial Send calls, sending payloads // "0".."9", inclusive. TestServerStreamingConcurrent verifies they @@ -2702,6 +2653,48 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { } } +const defaultMaxStreamsClient = 100 + +func TestExceedDefaultMaxStreamsLimit(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testExceedDefaultMaxStreamsLimit(t, e) + } +} + +func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) { + te := newTest(t, e) + te.declareLogNoise( + "http2Client.notifyError got notified that the client transport was broken", + "Conn.resetTransport failed to create client transport", + "grpc: the connection is closing", + ) + // When masStream is set to 0 the server doesn't send a settings frame for + // MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams. + // In such a case, there should be a default cap on the client-side. + te.maxStream = 0 + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + + // Create as many streams as a client can. + for i := 0; i < defaultMaxStreamsClient; i++ { + if _, err := tc.StreamingInputCall(te.ctx); err != nil { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + } + + // Trying to create one more should timeout. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := tc.StreamingInputCall(ctx) + if err == nil || grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) + } +} + func TestStreamsQuotaRecovery(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { diff --git a/google.golang.org/grpc/transport/control.go b/google.golang.org/grpc/transport/control.go index c539ac318e..87dccd761e 100644 --- a/google.golang.org/grpc/transport/control.go +++ b/google.golang.org/grpc/transport/control.go @@ -35,7 +35,9 @@ package transport import ( "fmt" + "math" "sync" + "time" "golang.org/x/net/http2" ) @@ -44,8 +46,12 @@ const ( // The default value of flow control window size in HTTP2 spec. defaultWindowSize = 65535 // The initial window size for flow control. - initialWindowSize = defaultWindowSize * 32 // for an RPC - initialConnWindowSize = defaultWindowSize * 32 // for a connection + initialWindowSize = defaultWindowSize * 32 // for an RPC + initialConnWindowSize = defaultWindowSize * 32 // for a connection + infinity = time.Duration(math.MaxInt64) + defaultKeepaliveTime = infinity + defaultKeepaliveTimeout = time.Duration(20 * time.Second) + defaultMaxStreamsClient = 100 ) // The following defines various control items which could flow through diff --git a/google.golang.org/grpc/transport/handler_server_test.go b/google.golang.org/grpc/transport/handler_server_test.go index 9843d36b61..44adf2eed8 100644 --- a/google.golang.org/grpc/transport/handler_server_test.go +++ b/google.golang.org/grpc/transport/handler_server_test.go @@ -188,7 +188,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) { }, RequestURI: "/service/foo.bar", }, - wantErr: `stream error: code = 13 desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`, + wantErr: `stream error: code = Internal desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`, }, { name: "with metadata", diff --git a/google.golang.org/grpc/transport/http2_client.go b/google.golang.org/grpc/transport/http2_client.go index 892f8ba675..627a590a0d 100644 --- a/google.golang.org/grpc/transport/http2_client.go +++ b/google.golang.org/grpc/transport/http2_client.go @@ -41,6 +41,7 @@ import ( "net" "strings" "sync" + "sync/atomic" "time" "golang.org/x/net/context" @@ -49,6 +50,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -80,6 +82,8 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} + // awakenKeepalive is used to wake up keepalive when after it has gone dormant. + awakenKeepalive chan struct{} framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding @@ -99,6 +103,11 @@ type http2Client struct { creds []credentials.PerRPCCredentials + // Boolean to keep track of reading activity on transport. + // 1 is true and 0 is false. + activity uint32 // Accessed atomically. + kp keepalive.ClientParameters + statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -182,6 +191,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua } + kp := opts.KeepaliveParams + // Validate keepalive parameters. + if kp.Time == 0 { + kp.Time = defaultKeepaliveTime + } + if kp.Timeout == 0 { + kp.Timeout = defaultKeepaliveTimeout + } var buf bytes.Buffer t := &http2Client{ ctx: ctx, @@ -198,6 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), goAway: make(chan struct{}), + awakenKeepalive: make(chan struct{}, 1), framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), @@ -208,10 +226,15 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( state: reachable, activeStreams: make(map[uint32]*Stream), creds: opts.PerRPCCredentials, - maxStreams: math.MaxInt32, + maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), streamSendQuota: defaultWindowSize, + kp: kp, statsHandler: opts.StatsHandler, } + // Make sure awakenKeepalive can't be written upon. + // keepalive routine will make it writable, if need be. + t.awakenKeepalive <- struct{}{} if t.statsHandler != nil { t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, @@ -256,6 +279,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } } go t.controller() + if t.kp.Time != infinity { + go t.keepalive() + } t.writableChan <- 0 return t, nil } @@ -337,21 +363,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } - checkStreamsQuota := t.streamsQuota != nil t.mu.Unlock() - if checkStreamsQuota { - sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) - if err != nil { - return nil, err - } - // Returns the quota balance back. - if sq > 1 { - t.streamsQuota.add(sq - 1) - } + sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) + if err != nil { + return nil, err + } + // Returns the quota balance back. + if sq > 1 { + t.streamsQuota.add(sq - 1) } if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { // Return the quota back now because there is no stream returned to the caller. - if _, ok := err.(StreamError); ok && checkStreamsQuota { + if _, ok := err.(StreamError); ok { t.streamsQuota.add(1) } return nil, err @@ -359,9 +382,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - if checkStreamsQuota { - t.streamsQuota.add(1) - } + t.streamsQuota.add(1) // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0 return nil, ErrStreamDrain @@ -373,17 +394,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s := t.newStream(ctx, callHdr) s.clientStatsCtx = userCtx t.activeStreams[s.id] = s - - // This stream is not counted when applySetings(...) initialize t.streamsQuota. - // Reset t.streamsQuota to the right value. - var reset bool - if !checkStreamsQuota && t.streamsQuota != nil { - reset = true + // If the number of active streams change from 0 to 1, then check if keepalive + // has gone dormant. If so, wake it up. + if len(t.activeStreams) == 1 { + select { + case t.awakenKeepalive <- struct{}{}: + t.framer.writePing(false, false, [8]byte{}) + default: + } } + t.mu.Unlock() - if reset { - t.streamsQuota.add(-1) - } // HPACK encodes various headers. Note that once WriteField(...) is // called, the corresponding headers/continuation frame has to be sent @@ -491,15 +512,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // CloseStream clears the footprint of a stream when the stream is not needed any more. // This must not be executed in reader's goroutine. func (t *http2Client) CloseStream(s *Stream, err error) { - var updateStreams bool t.mu.Lock() if t.activeStreams == nil { t.mu.Unlock() return } - if t.streamsQuota != nil { - updateStreams = true - } delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. @@ -508,10 +525,27 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() - if updateStreams { - t.streamsQuota.add(1) - } + // rstStream is true in case the stream is being closed at the client-side + // and the server needs to be intimated about it by sending a RST_STREAM + // frame. + // To make sure this frame is written to the wire before the headers of the + // next stream waiting for streamsQuota, we add to streamsQuota pool only + // after having acquired the writableChan to send RST_STREAM out (look at + // the controller() routine). + var rstStream bool + var rstError http2.ErrCode + defer func() { + // In case, the client doesn't have to send RST_STREAM to server + // we can safely add back to streamsQuota pool now. + if !rstStream { + t.streamsQuota.add(1) + return + } + t.controlBuf.put(&resetStream{s.id, rstError}) + }() s.mu.Lock() + rstStream = s.rstStream + rstError = s.rstError if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { t.controlBuf.put(&windowUpdate{0, n}) @@ -527,8 +561,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { } s.state = streamDone s.mu.Unlock() - if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded { - t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel}) + if _, ok := err.(StreamError); ok { + rstStream = true + rstError = http2.ErrCodeCancel } } @@ -742,7 +777,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { } func (t *http2Client) handleData(f *http2.DataFrame) { - size := len(f.Data()) + size := f.Header().Length if err := t.fc.onData(uint32(size)); err != nil { t.notifyError(connectionErrorf(true, err, "%v", err)) return @@ -756,6 +791,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) { return } if size > 0 { + if f.Header().Flags.Has(http2.FlagDataPadded) { + if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } + } s.mu.Lock() if s.state == streamDone { s.mu.Unlock() @@ -769,19 +809,27 @@ func (t *http2Client) handleData(f *http2.DataFrame) { s.state = streamDone s.statusCode = codes.Internal s.statusDesc = err.Error() + s.rstStream = true + s.rstError = http2.ErrCodeFlowControl close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) - t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } + if f.Header().Flags.Has(http2.FlagDataPadded) { + if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { + t.controlBuf.put(&windowUpdate{s.id, w}) + } + } s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? - data := make([]byte, size) - copy(data, f.Data()) - s.write(recvMsg{data: data}) + if len(f.Data()) > 0 { + data := make([]byte, len(f.Data())) + copy(data, f.Data()) + s.write(recvMsg{data: data}) + } } // The server has closed the stream without sending trailers. Record that // the read direction is closed, and set the status appropriately. @@ -982,6 +1030,7 @@ func (t *http2Client) reader() { t.notifyError(err) return } + atomic.CompareAndSwapUint32(&t.activity, 0, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { t.notifyError(err) @@ -992,6 +1041,7 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() + atomic.CompareAndSwapUint32(&t.activity, 0, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1043,16 +1093,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) { s.Val = math.MaxInt32 } t.mu.Lock() - reset := t.streamsQuota != nil - if !reset { - t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams)) - } ms := t.maxStreams t.maxStreams = int(s.Val) t.mu.Unlock() - if reset { - t.streamsQuota.add(int(s.Val) - ms) - } + t.streamsQuota.add(int(s.Val) - ms) case http2.SettingInitialWindowSize: t.mu.Lock() for _, stream := range t.activeStreams { @@ -1085,6 +1129,12 @@ func (t *http2Client) controller() { t.framer.writeSettings(true, i.ss...) } case *resetStream: + // If the server needs to be to intimated about stream closing, + // then we need to make sure the RST_STREAM frame is written to + // the wire before the headers of the next stream waiting on + // streamQuota. We ensure this by adding to the streamsQuota pool + // only after having acquired the writableChan to send RST_STREAM. + t.streamsQuota.add(1) t.framer.writeRSTStream(true, i.streamID, i.code) case *flushIO: t.framer.flushWrite() @@ -1104,6 +1154,61 @@ func (t *http2Client) controller() { } } +// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. +func (t *http2Client) keepalive() { + p := &ping{data: [8]byte{}} + timer := time.NewTimer(t.kp.Time) + for { + select { + case <-timer.C: + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { + timer.Reset(t.kp.Time) + continue + } + // Check if keepalive should go dormant. + t.mu.Lock() + if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { + // Make awakenKeepalive writable. + <-t.awakenKeepalive + t.mu.Unlock() + select { + case <-t.awakenKeepalive: + // If the control gets here a ping has been sent + // need to reset the timer with keepalive.Timeout. + case <-t.shutdownChan: + return + } + } else { + t.mu.Unlock() + // Send ping. + t.controlBuf.put(p) + } + + // By the time control gets here a ping has been sent one way or the other. + timer.Reset(t.kp.Timeout) + select { + case <-timer.C: + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { + timer.Reset(t.kp.Time) + continue + } + t.Close() + return + case <-t.shutdownChan: + if !timer.Stop() { + <-timer.C + } + return + } + case <-t.shutdownChan: + if !timer.Stop() { + <-timer.C + } + return + } + } +} + func (t *http2Client) Error() <-chan struct{} { return t.errorChan } diff --git a/google.golang.org/grpc/transport/http2_server.go b/google.golang.org/grpc/transport/http2_server.go index a095dd0e07..f5c590f438 100644 --- a/google.golang.org/grpc/transport/http2_server.go +++ b/google.golang.org/grpc/transport/http2_server.go @@ -381,7 +381,7 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) { } func (t *http2Server) handleData(f *http2.DataFrame) { - size := len(f.Data()) + size := f.Header().Length if err := t.fc.onData(uint32(size)); err != nil { grpclog.Printf("transport: http2Server %v", err) t.Close() @@ -396,6 +396,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) { return } if size > 0 { + if f.Header().Flags.Has(http2.FlagDataPadded) { + if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } + } s.mu.Lock() if s.state == streamDone { s.mu.Unlock() @@ -411,13 +416,20 @@ func (t *http2Server) handleData(f *http2.DataFrame) { t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } + if f.Header().Flags.Has(http2.FlagDataPadded) { + if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { + t.controlBuf.put(&windowUpdate{s.id, w}) + } + } s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? - data := make([]byte, size) - copy(data, f.Data()) - s.write(recvMsg{data: data}) + if len(f.Data()) > 0 { + data := make([]byte, len(f.Data())) + copy(data, f.Data()) + s.write(recvMsg{data: data}) + } } if f.Header().Flags.Has(http2.FlagDataEndStream) { // Received the end of stream from the client. diff --git a/google.golang.org/grpc/transport/transport.go b/google.golang.org/grpc/transport/transport.go index caee54a801..beb0a520a5 100644 --- a/google.golang.org/grpc/transport/transport.go +++ b/google.golang.org/grpc/transport/transport.go @@ -45,8 +45,10 @@ import ( "sync" "golang.org/x/net/context" + "golang.org/x/net/http2" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/tap" @@ -213,6 +215,11 @@ type Stream struct { // the status received from the server. statusCode codes.Code statusDesc string + // rstStream indicates whether a RST_STREAM frame needs to be sent + // to the server to signify that this stream is closing. + rstStream bool + // rstError is the error that needs to be sent along with the RST_STREAM frame. + rstError http2.ErrCode } // RecvCompress returns the compression algorithm applied to the inbound @@ -385,6 +392,8 @@ type ConnectOptions struct { PerRPCCredentials []credentials.PerRPCCredentials // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials + // KeepaliveParams stores the keepalive parameters. + KeepaliveParams keepalive.ClientParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler } @@ -568,7 +577,7 @@ type StreamError struct { } func (e StreamError) Error() string { - return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc) + return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc) } // ContextErr converts the error from context package into a StreamError. diff --git a/google.golang.org/grpc/transport/transport_test.go b/google.golang.org/grpc/transport/transport_test.go index 1ca6eb1a63..05686fb65a 100644 --- a/google.golang.org/grpc/transport/transport_test.go +++ b/google.golang.org/grpc/transport/transport_test.go @@ -49,6 +49,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" ) type server struct { @@ -251,6 +252,10 @@ func (s *server) stop() { } func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) { + return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{}) +} + +func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) { server := &server{startedErr: make(chan error, 1)} go server.start(t, port, maxStreams, ht) server.wait(t, 2*time.Second) @@ -262,13 +267,135 @@ func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, Client target := TargetInfo{ Addr: addr, } - ct, connErr = NewClientTransport(context.Background(), target, ConnectOptions{}) + ct, connErr = NewClientTransport(context.Background(), target, copts) if connErr != nil { t.Fatalf("failed to create transport: %v", connErr) } return server, ct } +func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + // Launch a non responsive server. + go func() { + defer lis.Close() + conn, err := lis.Accept() + if err != nil { + t.Errorf("Error at server-side while accepting: %v", err) + close(done) + return + } + done <- conn + }() + tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) + if err != nil { + // Server clean-up. + lis.Close() + if conn, ok := <-done; ok { + conn.Close() + } + t.Fatalf("Failed to dial: %v", err) + } + return tr +} + +func TestKeepaliveClientClosesIdleTransport(t *testing.T) { + done := make(chan net.Conn, 1) + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: true, // Run keepalive even with no RPCs. + }}, done) + defer tr.Close() + conn, ok := <-done + if !ok { + t.Fatalf("Server didn't return connection object") + } + defer conn.Close() + // Sleep for keepalive to close the connection. + time.Sleep(4 * time.Second) + // Assert that the connection was closed. + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state == reachable { + t.Fatalf("Test Failed: Expected client transport to have closed.") + } +} + +func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { + done := make(chan net.Conn, 1) + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + }}, done) + defer tr.Close() + conn, ok := <-done + if !ok { + t.Fatalf("server didn't reutrn connection object") + } + defer conn.Close() + // Give keepalive some time. + time.Sleep(4 * time.Second) + // Assert that connections is still healthy. + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state != reachable { + t.Fatalf("Test failed: Expected client transport to be healthy.") + } +} + +func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { + done := make(chan net.Conn, 1) + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + }}, done) + defer tr.Close() + conn, ok := <-done + if !ok { + t.Fatalf("Server didn't return connection object") + } + defer conn.Close() + // Create a stream. + _, err := tr.NewStream(context.Background(), &CallHdr{}) + if err != nil { + t.Fatalf("Failed to create a new stream: %v", err) + } + // Give keepalive some time. + time.Sleep(4 * time.Second) + // Assert that transport was closed. + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state == reachable { + t.Fatalf("Test failed: Expected client transport to have closed.") + } +} + +func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: true, // Run keepalive even with no RPCs. + }}) + defer s.stop() + defer tr.Close() + // Give keep alive some time. + time.Sleep(4 * time.Second) + // Assert that transport is healthy. + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state != reachable { + t.Fatalf("Test failed: Expected client transport to be healthy.") + } +} + func TestClientSendAndReceive(t *testing.T) { server, ct := setUp(t, 0, math.MaxUint32, normal) callHdr := &CallHdr{ @@ -507,7 +634,10 @@ func TestMaxStreams(t *testing.T) { case <-cc.streamsQuota.acquire(): t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.") default: - if cc.streamsQuota.quota != 0 { + cc.streamsQuota.mu.Lock() + quota := cc.streamsQuota.quota + cc.streamsQuota.mu.Unlock() + if quota != 0 { t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.") } }