diff --git a/go.mod b/go.mod index 07fe14270b7..21e19ce32c7 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/tektoncd/pipeline go 1.17 require ( - github.com/cloudevents/sdk-go/v2 v2.5.0 + github.com/cloudevents/sdk-go/v2 v2.10.1 github.com/containerd/containerd v1.5.10 github.com/google/go-cmp v0.5.7 github.com/google/go-containerregistry v0.8.1-0.20220216220642-00c59d91847c diff --git a/go.sum b/go.sum index 7a22be7753f..7e20bc10f4a 100644 --- a/go.sum +++ b/go.sum @@ -287,8 +287,8 @@ github.com/cilium/ebpf v0.6.2/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudevents/sdk-go/v2 v2.5.0 h1:Ts6aLHbBUJfcNcZ4ouAfJ4+Np7SE1Yf2w4ADKRCd7Fo= -github.com/cloudevents/sdk-go/v2 v2.5.0/go.mod h1:nlXhgFkf0uTopxmRXalyMwS2LG70cRGPrxzmjJgSG0U= +github.com/cloudevents/sdk-go/v2 v2.10.1 h1:qNFovJ18fWOd8Q9ydWJPk1oiFudXyv1GxJIP7MwPjuM= +github.com/cloudevents/sdk-go/v2 v2.10.1/go.mod h1:GpCBmUj7DIRiDhVvsK5d6WCbgTWs8DxAWTRtAwQmIXs= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go index 0be62d7fc95..ea8fbfbb4db 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go @@ -97,6 +97,7 @@ type ceClient struct { receiverMu sync.Mutex eventDefaulterFns []EventDefaulter pollGoroutines int + blockingCallback bool } func (c *ceClient) applyOptions(opts ...Option) error { @@ -128,11 +129,10 @@ func (c *ceClient) Send(ctx context.Context, e event.Event) protocol.Result { return err } - // Event has been defaulted and validated, record we are going to preform send. + // Event has been defaulted and validated, record we are going to perform send. ctx, cb := c.observabilityService.RecordSendingEvent(ctx, e) - defer cb(err) - err = c.sender.Send(ctx, (*binding.EventMessage)(&e)) + defer cb(err) return err } @@ -160,7 +160,6 @@ func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, pr // Event has been defaulted and validated, record we are going to perform request. ctx, cb := c.observabilityService.RecordRequestEvent(ctx, e) - defer cb(err, resp) // If provided a requester, use it to do request/response. var msg binding.Message @@ -186,7 +185,7 @@ func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, pr } else { resp = rs } - + defer cb(err, resp) return resp, err } @@ -250,14 +249,22 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { continue } - // Do not block on the invoker. - wg.Add(1) - go func() { + callback := func() { if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { cecontext.LoggerFrom(ctx).Warn("Error while handling a message: ", err) } - wg.Done() - }() + } + + if c.blockingCallback { + callback() + } else { + // Do not block on the invoker. + wg.Add(1) + go func() { + defer wg.Done() + callback() + }() + } } }() } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go index e6d11f55f35..403fb0f5598 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go @@ -81,9 +81,9 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p var cb func(error) ctx, cb = r.observabilityService.RecordCallingInvoker(ctx, e) - defer cb(result) resp, result = r.fn.invoke(ctx, e) + defer cb(result) return }() @@ -127,6 +127,9 @@ func (r *receiveInvoker) IsResponder() bool { func computeInboundContext(message binding.Message, fallback context.Context, inboundContextDecorators []func(context.Context, binding.Message) context.Context) context.Context { result := fallback + if mctx, ok := message.(binding.MessageContext); ok { + result = cecontext.ValuesDelegating(mctx.Context(), fallback) + } for _, f := range inboundContextDecorators { result = f(result, message) } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go index d0fe9dbaa9b..938478162b0 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go @@ -8,6 +8,7 @@ package client import ( "context" "fmt" + "github.com/cloudevents/sdk-go/v2/binding" ) @@ -113,3 +114,15 @@ func WithInboundContextDecorator(dec func(context.Context, binding.Message) cont return nil } } + +// WithBlockingCallback makes the callback passed into StartReceiver is executed as a blocking call, +// i.e. in each poll go routine, the next event will not be received until the callback on current event completes. +// To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1) +func WithBlockingCallback() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.blockingCallback = true + } + return nil + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/context/delegating.go b/vendor/github.com/cloudevents/sdk-go/v2/context/delegating.go new file mode 100644 index 00000000000..434a4da7a01 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/context/delegating.go @@ -0,0 +1,25 @@ +package context + +import "context" + +type valuesDelegating struct { + context.Context + parent context.Context +} + +// ValuesDelegating wraps a child and parent context. It will perform Value() +// lookups first on the child, and then fall back to the child. All other calls +// go solely to the child context. +func ValuesDelegating(child, parent context.Context) context.Context { + return &valuesDelegating{ + Context: child, + parent: parent, + } +} + +func (c *valuesDelegating) Value(key interface{}) interface{} { + if val := c.Context.Value(key); val != nil { + return val + } + return c.parent.Value(key) +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/event_data.go b/vendor/github.com/cloudevents/sdk-go/v2/event/event_data.go index 0f183148271..8fc449ed94e 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/event_data.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/event_data.go @@ -73,7 +73,7 @@ func (e Event) Data() []byte { } // DataAs attempts to populate the provided data object with the event payload. -// data should be a pointer type. +// obj should be a pointer type. func (e Event) DataAs(obj interface{}) error { data := e.Data() diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go index 561f4c5dfbc..c511c81c458 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go @@ -20,6 +20,17 @@ const ( CloudEventsVersionV03 = "0.3" ) +var specV03Attributes = map[string]struct{}{ + "type": {}, + "source": {}, + "subject": {}, + "id": {}, + "time": {}, + "schemaurl": {}, + "datacontenttype": {}, + "datacontentencoding": {}, +} + // EventContextV03 represents the non-data attributes of a CloudEvents v0.3 // event. type EventContextV03 struct { @@ -78,11 +89,17 @@ func (ec EventContextV03) ExtensionAs(name string, obj interface{}) error { } } -// SetExtension adds the extension 'name' with value 'value' to the CloudEvents context. +// SetExtension adds the extension 'name' with value 'value' to the CloudEvents +// context. This function fails if the name uses a reserved event context key. func (ec *EventContextV03) SetExtension(name string, value interface{}) error { if ec.Extensions == nil { ec.Extensions = make(map[string]interface{}) } + + if _, ok := specV03Attributes[strings.ToLower(name)]; ok { + return fmt.Errorf("bad key %q: CloudEvents spec attribute MUST NOT be overwritten by extension", name) + } + if value == nil { delete(ec.Extensions, name) if len(ec.Extensions) == 0 { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go index 01f97586f61..8f164502b05 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go @@ -21,6 +21,17 @@ const ( CloudEventsVersionV1 = "1.0" ) +var specV1Attributes = map[string]struct{}{ + "id": {}, + "source": {}, + "type": {}, + "datacontenttype": {}, + "subject": {}, + "time": {}, + "specversion": {}, + "dataschema": {}, +} + // EventContextV1 represents the non-data attributes of a CloudEvents v1.0 // event. type EventContextV1 struct { @@ -73,13 +84,18 @@ func (ec EventContextV1) ExtensionAs(name string, obj interface{}) error { return fmt.Errorf("unknown extension type %T", obj) } -// SetExtension adds the extension 'name' with value 'value' to the CloudEvents context. -// This function fails if the name doesn't respect the regex ^[a-zA-Z0-9]+$ +// SetExtension adds the extension 'name' with value 'value' to the CloudEvents +// context. This function fails if the name doesn't respect the regex +// ^[a-zA-Z0-9]+$ or if the name uses a reserved event context key. func (ec *EventContextV1) SetExtension(name string, value interface{}) error { if err := validateExtensionName(name); err != nil { return err } + if _, ok := specV1Attributes[strings.ToLower(name)]; ok { + return fmt.Errorf("bad key %q: CloudEvents spec attribute MUST NOT be overwritten by extension", name) + } + name = strings.ToLower(name) if ec.Extensions == nil { ec.Extensions = make(map[string]interface{}) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go new file mode 100644 index 00000000000..0eec396a1e6 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/context.go @@ -0,0 +1,48 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package http + +import ( + "context" + + nethttp "net/http" + "net/url" +) + +type requestKey struct{} + +// RequestData holds the http.Request information subset that can be +// used to retrieve HTTP information for an incoming CloudEvent. +type RequestData struct { + URL *url.URL + Header nethttp.Header + RemoteAddr string + Host string +} + +// WithRequestDataAtContext uses the http.Request to add RequestData +// information to the Context. +func WithRequestDataAtContext(ctx context.Context, r *nethttp.Request) context.Context { + if r == nil { + return ctx + } + + return context.WithValue(ctx, requestKey{}, &RequestData{ + URL: r.URL, + Header: r.Header, + RemoteAddr: r.RemoteAddr, + Host: r.Host, + }) +} + +// RequestDataFromContext retrieves RequestData from the Context. +// If not set nil is returned. +func RequestDataFromContext(ctx context.Context) *RequestData { + if req := ctx.Value(requestKey{}); req != nil { + return req.(*RequestData) + } + return nil +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go index 55031939c60..5e400905a70 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go @@ -248,7 +248,7 @@ func WithDefaultOptionsHandlerFunc(methods []string, rate int, origins []string, if p == nil { return fmt.Errorf("http OPTIONS handler func can not set nil protocol") } - p.OptionsHandlerFn = p.DeleteHandlerFn + p.OptionsHandlerFn = p.OptionsHandler p.WebhookConfig = &WebhookConfig{ AllowedMethods: methods, AllowedRate: &rate, @@ -277,3 +277,25 @@ func WithIsRetriableFunc(isRetriable IsRetriable) Option { return nil } } + +func WithRateLimiter(rl RateLimiter) Option { + return func(p *Protocol) error { + if p == nil { + return fmt.Errorf("http OPTIONS handler func can not set nil protocol") + } + p.limiter = rl + return nil + } +} + +// WithRequestDataAtContextMiddleware adds to the Context RequestData. +// This enables a user's dispatch handler to inspect HTTP request information by +// retrieving it from the Context. +func WithRequestDataAtContextMiddleware() Option { + return WithMiddleware(func(next nethttp.Handler) nethttp.Handler { + return nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) { + ctx := WithRequestDataAtContext(r.Context(), r) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + }) +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go index a17028795dd..06204b2a1f0 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go @@ -13,6 +13,7 @@ import ( "io" "net/http" "net/url" + "strconv" "sync" "sync/atomic" "time" @@ -86,6 +87,7 @@ type Protocol struct { server *http.Server handlerRegistered bool middleware []Middleware + limiter RateLimiter isRetriableFunc IsRetriable } @@ -115,6 +117,10 @@ func New(opts ...Option) (*Protocol, error) { p.isRetriableFunc = defaultIsRetriableFunc } + if p.limiter == nil { + p.limiter = noOpLimiter{} + } + return p, nil } @@ -151,7 +157,14 @@ func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ... buf := new(bytes.Buffer) buf.ReadFrom(message.BodyReader) errorStr := buf.String() - err = NewResult(res.StatusCode, "%s", errorStr) + // If the error is not wrapped, then append the original error string. + if og, ok := err.(*Result); ok { + og.Format = og.Format + "%s" + og.Args = append(og.Args, errorStr) + err = og + } else { + err = NewResult(res.StatusCode, "%w: %s", err, errorStr) + } } } } @@ -277,6 +290,20 @@ func (p *Protocol) Respond(ctx context.Context) (binding.Message, protocol.Respo // ServeHTTP implements http.Handler. // Blocks until ResponseFn is invoked. func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + // always apply limiter first using req context + ok, reset, err := p.limiter.Allow(req.Context(), req) + if err != nil { + p.incoming <- msgErr{msg: nil, err: fmt.Errorf("unable to acquire rate limit token: %w", err)} + rw.WriteHeader(http.StatusInternalServerError) + return + } + + if !ok { + rw.Header().Add("Retry-After", strconv.Itoa(int(reset))) + http.Error(rw, "limit exceeded", 429) + return + } + // Filter the GET style methods: switch req.Method { case http.MethodOptions: diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_rate.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_rate.go new file mode 100644 index 00000000000..9c4c10a293c --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_rate.go @@ -0,0 +1,34 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package http + +import ( + "context" + "net/http" +) + +type RateLimiter interface { + // Allow attempts to take one token from the rate limiter for the specified + // request. It returns ok when this operation was successful. In case ok is + // false, reset will indicate the time in seconds when it is safe to perform + // another attempt. An error is returned when this operation failed, e.g. due to + // a backend error. + Allow(ctx context.Context, r *http.Request) (ok bool, reset uint64, err error) + // Close terminates rate limiter and cleans up any data structures or + // connections that may remain open. After a store is stopped, Take() should + // always return zero values. + Close(ctx context.Context) error +} + +type noOpLimiter struct{} + +func (n noOpLimiter) Allow(ctx context.Context, r *http.Request) (bool, uint64, error) { + return true, 0, nil +} + +func (n noOpLimiter) Close(ctx context.Context) error { + return nil +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go index fb7bcd27efa..71e7346f304 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_retry.go @@ -6,8 +6,11 @@ package http import ( + "bytes" "context" "errors" + "io" + "io/ioutil" "net/http" "net/url" "time" @@ -53,6 +56,24 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam retry := 0 results := make([]protocol.Result, 0) + var ( + body []byte + err error + ) + + if req != nil && req.Body != nil { + defer func() { + if err = req.Body.Close(); err != nil { + cecontext.LoggerFrom(ctx).Warnw("could not close request body", zap.Error(err)) + } + }() + body, err = ioutil.ReadAll(req.Body) + if err != nil { + panic(err) + } + resetBody(req, body) + } + for { msg, result := p.doOnce(req) @@ -90,6 +111,8 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam } DoBackoff: + resetBody(req, body) + // Wait for the correct amount of backoff time. // total tries = retry + 1 @@ -103,3 +126,20 @@ func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParam results = append(results, result) } } + +// reset body to allow it to be read multiple times, e.g. when retrying http +// requests +func resetBody(req *http.Request, body []byte) { + if req == nil || req.Body == nil { + return + } + + req.Body = ioutil.NopCloser(bytes.NewReader(body)) + + // do not modify existing GetBody function + if req.GetBody == nil { + req.GetBody = func() (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader(body)), nil + } + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c4876943e33..932f7ea370d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -159,7 +159,7 @@ github.com/cespare/xxhash/v2 github.com/chrismellard/docker-credential-acr-env/pkg/credhelper github.com/chrismellard/docker-credential-acr-env/pkg/registry github.com/chrismellard/docker-credential-acr-env/pkg/token -# github.com/cloudevents/sdk-go/v2 v2.5.0 +# github.com/cloudevents/sdk-go/v2 v2.10.1 ## explicit; go 1.14 github.com/cloudevents/sdk-go/v2 github.com/cloudevents/sdk-go/v2/binding