From cfc10ac4c3c20cae378f836f1dea7bc5f50fa998 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Aug 2024 06:41:24 +0000 Subject: [PATCH] Bump github.com/tektoncd/chains from 0.21.1 to 0.22.0 Bumps [github.com/tektoncd/chains](https://github.com/tektoncd/chains) from 0.21.1 to 0.22.0. - [Release notes](https://github.com/tektoncd/chains/releases) - [Changelog](https://github.com/tektoncd/chains/blob/main/releases.md) - [Commits](https://github.com/tektoncd/chains/compare/v0.21.1...v0.22.0) --- updated-dependencies: - dependency-name: github.com/tektoncd/chains dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 8 +- go.sum | 17 +- .../chains/formats/slsa/extract/extract.go | 19 +- .../formats/slsa/extract/v1beta1/extract.go | 18 +- .../chains/pkg/chains/objects/objects.go | 26 ++- .../tektoncd/chains/pkg/chains/signing.go | 10 +- .../chains/pkg/chains/signing/kms/kms.go | 30 ++- .../chains/pkg/chains/storage/docdb/docdb.go | 206 +++++++++++++++++- .../chains/pkg/chains/storage/storage.go | 56 +++++ .../tektoncd/chains/pkg/config/config.go | 34 ++- .../mongo-driver/event/monitoring.go | 1 + .../mongo-driver/internal/csot/csot.go | 6 +- .../mongo-driver/mongo/change_stream.go | 8 +- .../mongo-driver/mongo/collection.go | 30 ++- .../mongo/writeconcern/writeconcern.go | 2 +- .../mongo-driver/version/version.go | 2 +- .../mongo-driver/x/mongo/driver/errors.go | 19 +- .../mongo-driver/x/mongo/driver/operation.go | 54 +++-- .../x/mongo/driver/operation/aggregate.go | 14 ++ .../x/mongo/driver/operation/find.go | 14 ++ .../x/mongo/driver/topology/connection.go | 18 +- .../x/mongo/driver/topology/pool.go | 159 ++++++++++++-- .../x/mongo/driver/topology/rtt_monitor.go | 9 +- .../docstore/mongodocstore/urls.go | 34 +-- vendor/modules.txt | 14 +- 25 files changed, 681 insertions(+), 127 deletions(-) diff --git a/go.mod b/go.mod index 0e288c9865..a84053d691 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/sigstore/sigstore v1.8.8 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 - github.com/tektoncd/chains v0.21.1 + github.com/tektoncd/chains v0.22.0 github.com/tektoncd/hub v1.17.0 github.com/tektoncd/pipeline v0.62.1 github.com/tektoncd/plumbing v0.0.0-20230907180608-5625252a2de1 @@ -292,9 +292,9 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect - github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect + github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 // indirect github.com/zeebo/errs v1.3.0 // indirect - go.mongodb.org/mongo-driver v1.14.0 // indirect + go.mongodb.org/mongo-driver v1.15.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect @@ -304,7 +304,7 @@ require ( go.step.sm/crypto v0.51.1 // indirect goa.design/goa/v3 v3.16.1 // indirect gocloud.dev v0.37.0 // indirect - gocloud.dev/docstore/mongodocstore v0.37.0 // indirect + gocloud.dev/docstore/mongodocstore v0.37.1-0.20240501181211-d8b9c9401f18 // indirect gocloud.dev/pubsub/kafkapubsub v0.37.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect diff --git a/go.sum b/go.sum index 3b535509a9..629c754fcc 100644 --- a/go.sum +++ b/go.sum @@ -1282,8 +1282,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDd github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= github.com/tchap/go-patricia/v2 v2.3.1 h1:6rQp39lgIYZ+MHmdEq4xzuk1t7OdC35z/xm0BGhTkes= github.com/tchap/go-patricia/v2 v2.3.1/go.mod h1:VZRHKAb53DLaG+nA9EaYYiaEx6YztwDlLElMsnSHD4k= -github.com/tektoncd/chains v0.21.1 h1:Q3Bw4XS9bImzTplVZZOUD6Yn67sIVSF1oo7WeEFeeY4= -github.com/tektoncd/chains v0.21.1/go.mod h1:iC6MunbSGJrES1RH+zR0gBOCXXr15hi2SPgVBseYq4Y= +github.com/tektoncd/chains v0.22.0 h1:9rgm+skfKpmIAh0CpHSPT6i2R2kmH815YF5iVnvNEMM= +github.com/tektoncd/chains v0.22.0/go.mod h1:5FsO4gIKUIlJ4ohmmMXep0GPMWN1oEwRLXiETmU7XhY= github.com/tektoncd/hub v1.17.0 h1:BKUDeQoC7PLlJmeNt86eEP4lYBXE5pIUZYlrgHcVwl4= github.com/tektoncd/hub v1.17.0/go.mod h1:8SnC66jMZtYFVuh70U1wN91/tJpi7nQX+7V5YsnWIAE= github.com/tektoncd/pipeline v0.62.1 h1:l+EvRCrLqTHuHwas+C4bRP6jzln8E1J9I7tnfwmWfJQ= @@ -1348,8 +1348,8 @@ github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yashtewari/glob-intersection v0.2.0 h1:8iuHdN88yYuCzCdjt0gDe+6bAhUwBeEWqThExu54RFg= github.com/yashtewari/glob-intersection v0.2.0/go.mod h1:LK7pIC3piUjovexikBbJ26Yml7g8xa5bsjfx2v1fwok= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= -github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 h1:tBiBTKHnIjovYoLX/TPkcf+OjqqKGQrPtGT3Foz+Pgo= +github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76/go.mod h1:SQliXeA7Dhkt//vS29v3zpbEwoa+zb2Cn5xj5uO4K5U= github.com/ysmood/fetchup v0.2.3 h1:ulX+SonA0Vma5zUFXtv52Kzip/xe7aj4vqT5AJwQ+ZQ= github.com/ysmood/fetchup v0.2.3/go.mod h1:xhibcRKziSvol0H1/pj33dnKrYyI2ebIvz5cOOkYGns= github.com/ysmood/goob v0.4.0 h1:HsxXhyLBeGzWXnqVKtmT9qM7EuVs/XOgkX7T6r1o1AQ= @@ -1382,8 +1382,8 @@ go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lL go.etcd.io/etcd/pkg/v3 v3.5.0/go.mod h1:UzJGatBQ1lXChBkQF0AuAtkRQMYnHubxAEYIrC3MSsE= go.etcd.io/etcd/raft/v3 v3.5.0/go.mod h1:UFOHSIvO/nKwd4lhkwabrTD3cqW5yVyYYf/KlD00Szc= go.etcd.io/etcd/server/v3 v3.5.0/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4= -go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= -go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= +go.mongodb.org/mongo-driver v1.15.0 h1:rJCKC8eEliewXjZGf0ddURtl7tTVy1TK3bfl0gkUSLc= +go.mongodb.org/mongo-driver v1.15.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -1450,8 +1450,8 @@ goa.design/goa/v3 v3.16.1 h1:yZwbKrfMpE8+sz0uf+n+BtArVOFQ0kNSC0twQKwQb04= goa.design/goa/v3 v3.16.1/go.mod h1:Yd42LR0PYDbHSbsbF3vNd4YY/O+LG20Jb7+IyNdkQic= gocloud.dev v0.37.0 h1:XF1rN6R0qZI/9DYjN16Uy0durAmSlf58DHOcb28GPro= gocloud.dev v0.37.0/go.mod h1:7/O4kqdInCNsc6LqgmuFnS0GRew4XNNYWpA44yQnwco= -gocloud.dev/docstore/mongodocstore v0.37.0 h1:A0dE2rKHgfRFr6wn0wPRTS2beinV2/i+aSVswEryKy8= -gocloud.dev/docstore/mongodocstore v0.37.0/go.mod h1:Y5m5zbUUBWispcLtQWppVVB9TKFR8wtwrjQIKA0TuJM= +gocloud.dev/docstore/mongodocstore v0.37.1-0.20240501181211-d8b9c9401f18 h1:71QcVqNRzyPI+f3v38XK+TjrLusNUFIYA7Wro48aX2I= +gocloud.dev/docstore/mongodocstore v0.37.1-0.20240501181211-d8b9c9401f18/go.mod h1:IN/uj8zLRQ0sZ5UffdERqoB5MT7DsmyMQtZDht9cknQ= gocloud.dev/pubsub/kafkapubsub v0.37.0 h1:rH122Q2COVNhAGRyid/FHY164LAZhss9V5DiIKQ5Gos= gocloud.dev/pubsub/kafkapubsub v0.37.0/go.mod h1:y0a+Rv5JvNuVyv1qGic2m1bDoy0ZArWLqcxm/1WxuB8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -1464,7 +1464,6 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/extract.go b/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/extract.go index a47d558f7c..706fc57183 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/extract.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/extract.go @@ -80,15 +80,20 @@ func subjectsFromPipelineRun(ctx context.Context, obj objects.TektonObject, slsa pipelineTasks := pSpec.Tasks pipelineTasks = append(pipelineTasks, pSpec.Finally...) for _, t := range pipelineTasks { - tr := pro.GetTaskRunFromTask(t.Name) - // Ignore Tasks that did not execute during the PipelineRun. - if tr == nil || tr.Status.CompletionTime == nil { - logger.Infof("taskrun status not found for task %s", t.Name) + taskRuns := pro.GetTaskRunsFromTask(t.Name) + if len(taskRuns) == 0 { + logger.Infof("no taskruns found for task %s", t.Name) continue } - - trSubjects := subjectsFromTektonObject(ctx, tr) - result = artifact.AppendSubjects(result, trSubjects...) + for _, tr := range taskRuns { + // Ignore Tasks that did not execute during the PipelineRun. + if tr == nil || tr.Status.CompletionTime == nil { + logger.Infof("taskrun status not found for task %s", t.Name) + continue + } + trSubjects := subjectsFromTektonObject(ctx, tr) + result = artifact.AppendSubjects(result, trSubjects...) + } } } diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/v1beta1/extract.go b/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/v1beta1/extract.go index 043ec0deb1..293cb168f7 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/v1beta1/extract.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/formats/slsa/extract/v1beta1/extract.go @@ -74,14 +74,20 @@ func SubjectsFromPipelineRunV1Beta1(ctx context.Context, obj objects.TektonObjec if pSpec != nil { pipelineTasks := append(pSpec.Tasks, pSpec.Finally...) for _, t := range pipelineTasks { - tr := pro.GetTaskRunFromTask(t.Name) - // Ignore Tasks that did not execute during the PipelineRun. - if tr == nil || tr.Status.CompletionTime == nil { - logger.Infof("taskrun status not found for task %s", t.Name) + taskRuns := pro.GetTaskRunsFromTask(t.Name) + if len(taskRuns) == 0 { + logger.Infof("no taskruns found for task %s", t.Name) continue } - trSubjects := SubjectsFromTektonObjectV1Beta1(ctx, tr) - result = artifact.AppendSubjects(result, trSubjects...) + for _, tr := range taskRuns { + // Ignore Tasks that did not execute during the PipelineRun. + if tr == nil || tr.Status.CompletionTime == nil { + logger.Infof("taskrun status not found for task %s", t.Name) + continue + } + trSubjects := SubjectsFromTektonObjectV1Beta1(ctx, tr) + result = artifact.AppendSubjects(result, trSubjects...) + } } } diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/objects/objects.go b/vendor/github.com/tektoncd/chains/pkg/chains/objects/objects.go index 0d89bec6bf..559e3c5ba0 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/objects/objects.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/objects/objects.go @@ -315,14 +315,15 @@ func (pro *PipelineRunObjectV1) GetTaskRuns() []*v1.TaskRun { } // Get the associated TaskRun via the Task name -func (pro *PipelineRunObjectV1) GetTaskRunFromTask(taskName string) *TaskRunObjectV1 { +func (pro *PipelineRunObjectV1) GetTaskRunsFromTask(taskName string) []*TaskRunObjectV1 { + var taskRuns []*TaskRunObjectV1 for _, tr := range pro.taskRuns { val, ok := tr.Labels[PipelineTaskLabel] if ok && val == taskName { - return NewTaskRunObjectV1(tr) + taskRuns = append(taskRuns, NewTaskRunObjectV1(tr)) } } - return nil + return taskRuns } // Get the imgPullSecrets from the pod template @@ -388,13 +389,17 @@ func (pro *PipelineRunObjectV1) GetExecutedTasks() (tro []*TaskRunObjectV1) { tasks := pSpec.Tasks tasks = append(tasks, pSpec.Finally...) for _, task := range tasks { - tr := pro.GetTaskRunFromTask(task.Name) - - if tr == nil || tr.Status.CompletionTime == nil { + taskRuns := pro.GetTaskRunsFromTask(task.Name) + if len(taskRuns) == 0 { continue } + for _, tr := range taskRuns { + if tr == nil || tr.Status.CompletionTime == nil { + continue + } - tro = append(tro, tr) + tro = append(tro, tr) + } } return @@ -514,14 +519,15 @@ func (pro *PipelineRunObjectV1Beta1) AppendTaskRun(tr *v1beta1.TaskRun) { //noli } // Get the associated TaskRun via the Task name -func (pro *PipelineRunObjectV1Beta1) GetTaskRunFromTask(taskName string) *TaskRunObjectV1Beta1 { +func (pro *PipelineRunObjectV1Beta1) GetTaskRunsFromTask(taskName string) []*TaskRunObjectV1Beta1 { + var taskRuns []*TaskRunObjectV1Beta1 for _, tr := range pro.taskRuns { val, ok := tr.Labels[PipelineTaskLabel] if ok && val == taskName { - return NewTaskRunObjectV1Beta1(tr) + taskRuns = append(taskRuns, NewTaskRunObjectV1Beta1(tr)) } } - return nil + return taskRuns } // Get the imgPullSecrets from the pod template diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/signing.go b/vendor/github.com/tektoncd/chains/pkg/chains/signing.go index ce0bb380af..112f5c7849 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/signing.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/signing.go @@ -30,6 +30,7 @@ import ( "github.com/tektoncd/chains/pkg/chains/storage" "github.com/tektoncd/chains/pkg/config" versioned "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" + "golang.org/x/exp/maps" "google.golang.org/protobuf/encoding/protojson" "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/logging" @@ -186,7 +187,14 @@ func (o *ObjectSigner) Sign(ctx context.Context, tektonObj objects.TektonObject) // Now store those! for _, backend := range sets.List[string](signableType.StorageBackend(cfg)) { - b := o.Backends[backend] + b, ok := o.Backends[backend] + if !ok { + backendErr := fmt.Errorf("could not find backend '%s' in configured backends (%v) while trying sign: %s/%s", backend, maps.Keys(o.Backends), tektonObj.GetKindName(), tektonObj.GetName()) + logger.Error(backendErr) + merr = multierror.Append(merr, backendErr) + continue + } + storageOpts := config.StorageOpts{ ShortKey: signableType.ShortKey(obj), FullKey: signableType.FullKey(obj), diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/signing/kms/kms.go b/vendor/github.com/tektoncd/chains/pkg/chains/signing/kms/kms.go index 560b3b2543..fb5845e046 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/signing/kms/kms.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/signing/kms/kms.go @@ -20,6 +20,8 @@ import ( "fmt" "net" "net/url" + "os" + "strings" "time" "github.com/sigstore/sigstore/pkg/signature" @@ -95,12 +97,26 @@ func NewSigner(ctx context.Context, cfg config.KMSSigner) (*Signer, error) { // pass through configuration options to RPCAuth used by KMS in sigstore rpcAuth := options.RPCAuth{ Address: cfg.Auth.Address, - Token: cfg.Auth.Token, OIDC: options.RPCAuthOIDC{ Role: cfg.Auth.OIDC.Role, Path: cfg.Auth.OIDC.Path, }, } + + // get token from file KMS_AUTH_TOKEN, a mounted secret at signers.kms.auth.token-dir or + // as direct value set from signers.kms.auth.token. + // If both values are set, priority will be given to token-dir. + + if cfg.Auth.TokenPath != "" { + rpcAuthToken, err := getKMSAuthToken(cfg.Auth.TokenPath) + if err != nil { + return nil, err + } + rpcAuth.Token = rpcAuthToken + } else { + rpcAuth.Token = cfg.Auth.Token + } + // get token from spire if cfg.Auth.Spire.Sock != "" { token, err := newSpireToken(ctx, cfg) @@ -120,6 +136,18 @@ func NewSigner(ctx context.Context, cfg config.KMSSigner) (*Signer, error) { }, nil } +// getKMSAuthToken retreives token from the given mount path +func getKMSAuthToken(path string) (string, error) { + fileData, err := os.ReadFile(path) + if err != nil { + return "", fmt.Errorf("reading file in %q: %w", path, err) + } + + // A trailing newline is fairly common in mounted files, so remove it. + fileDataNormalized := strings.TrimSuffix(string(fileData), "\n") + return fileDataNormalized, nil +} + // newSpireToken retrieves an SVID token from Spire func newSpireToken(ctx context.Context, cfg config.KMSSigner) (string, error) { jwtSource, err := workloadapi.NewJWTSource( diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/storage/docdb/docdb.go b/vendor/github.com/tektoncd/chains/pkg/chains/storage/docdb/docdb.go index d9b82c979d..b3fcd09483 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/storage/docdb/docdb.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/storage/docdb/docdb.go @@ -17,19 +17,31 @@ import ( "context" "encoding/base64" "encoding/json" + "fmt" + "net/url" + "os" + "path/filepath" + "slices" + "strings" + "github.com/fsnotify/fsnotify" "github.com/tektoncd/chains/pkg/chains/objects" "github.com/tektoncd/chains/pkg/config" "gocloud.dev/docstore" _ "gocloud.dev/docstore/awsdynamodb" _ "gocloud.dev/docstore/gcpfirestore" + "gocloud.dev/docstore/mongodocstore" _ "gocloud.dev/docstore/mongodocstore" + "knative.dev/pkg/logging" ) const ( StorageTypeDocDB = "docdb" ) +// ErrNothingToWatch is an error that's returned when the backend doesn't have anything to "watch" +var ErrNothingToWatch = fmt.Errorf("backend has nothing to watch") + // Backend is a storage backend that stores signed payloads in the TaskRun metadata as an annotation. // It is stored as base64 encoded JSON. type Backend struct { @@ -47,8 +59,21 @@ type SignedDocument struct { // NewStorageBackend returns a new Tekton StorageBackend that stores signatures on a TaskRun func NewStorageBackend(ctx context.Context, cfg config.Config) (*Backend, error) { - url := cfg.Storage.DocDB.URL - coll, err := docstore.OpenCollection(ctx, url) + docdbURL := cfg.Storage.DocDB.URL + + u, err := url.Parse(docdbURL) + if err != nil { + return nil, err + } + + if u.Scheme == mongodocstore.Scheme { + // MONGO_SERVER_URL can be passed in as an environment variable or via config fields + if err := populateMongoServerURL(ctx, cfg); err != nil { + return nil, err + } + } + + coll, err := docstore.OpenCollection(ctx, docdbURL) if err != nil { return nil, err } @@ -58,6 +83,110 @@ func NewStorageBackend(ctx context.Context, cfg config.Config) (*Backend, error) }, nil } +// WatchBackend returns a channel that receives a new Backend each time it needs to be updated +func WatchBackend(ctx context.Context, cfg config.Config, watcherStop chan bool) (chan *Backend, error) { + logger := logging.FromContext(ctx) + docDBURL := cfg.Storage.DocDB.URL + + u, err := url.Parse(docDBURL) + if err != nil { + return nil, err + } + + // Set up the watcher only for mongo backends + if u.Scheme != mongodocstore.Scheme { + return nil, ErrNothingToWatch + } + + // Set up watcher only when `storage.docdb.mongo-server-url-dir` is set + if cfg.Storage.DocDB.MongoServerURLDir == "" { + return nil, ErrNothingToWatch + } + + logger.Infof("setting up fsnotify watcher for directory: %s", cfg.Storage.DocDB.MongoServerURLDir) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + pathsToWatch := []string{ + // mongo-server-url-dir/MONGO_SERVER_URL is where the MONGO_SERVER_URL environment + // variable is expected to be mounted, either manually or via a Kubernetes secret, etc. + filepath.Join(cfg.Storage.DocDB.MongoServerURLDir, "MONGO_SERVER_URL"), + // When a Kubernetes secret is mounted on a path, the `data` in that secret is mounted + // under path/..data that is then `symlink`ed to the key of the data. In this instance, + // the mounted path is going to look like: + // file 1 - ..2024_05_03_11_23_23.1253599725 + // file 2 - ..data -> ..2024_05_03_11_23_23.1253599725 + // file 3 - MONGO_SERVER_URL -> ..data/MONGO_SERVER_URL + // So each time the secret is updated, the file `MONGO_SERVER_URL` is not updated, + // instead the underlying symlink at `..data` is updated and that's what we want to + // capture via the fsnotify event watcher + filepath.Join(cfg.Storage.DocDB.MongoServerURLDir, "..data"), + } + + backendChan := make(chan *Backend) + // Start listening for events. + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + logger.Infof("received event: %s, path: %s", event.Op.String(), event.Name) + // Only respond to create/write/remove events in the directory + if !(event.Has(fsnotify.Create) || event.Has(fsnotify.Write) || event.Has(fsnotify.Remove)) { + continue + } + + if !slices.Contains(pathsToWatch, event.Name) { + continue + } + + updatedEnv, err := getMongoServerURLFromDir(cfg.Storage.DocDB.MongoServerURLDir) + if err != nil { + logger.Error(err) + backendChan <- nil + } + if updatedEnv != os.Getenv("MONGO_SERVER_URL") { + logger.Infof("directory %s has been updated, reconfiguring backend...", cfg.Storage.DocDB.MongoServerURLDir) + + // Now that MONGO_SERVER_URL has been updated, we should update docdb backend again + newDocDBBackend, err := NewStorageBackend(ctx, cfg) + if err != nil { + logger.Error(err) + backendChan <- nil + } else { + // Storing the backend in the signer so everyone has access to the up-to-date backend + backendChan <- newDocDBBackend + } + } else { + logger.Infof("MONGO_SERVER_URL has not changed in path: %s, backend will not be reconfigured", cfg.Storage.DocDB.MongoServerURLDir) + } + + case err, ok := <-watcher.Errors: + if !ok { + return + } + logger.Error(err) + + case <-watcherStop: + logger.Info("stopping fsnotify context...") + return + } + } + }() + + // Add a path. + err = watcher.Add(cfg.Storage.DocDB.MongoServerURLDir) + if err != nil { + return nil, err + } + return backendChan, nil +} + // StorePayload implements the Payloader interface. func (b *Backend) StorePayload(ctx context.Context, _ objects.TektonObject, rawPayload []byte, signature string, opts config.StorageOpts) error { var obj interface{} @@ -125,3 +254,76 @@ func (b *Backend) retrieveDocuments(ctx context.Context, opts config.StorageOpts } return []SignedDocument{d}, nil } + +func populateMongoServerURL(ctx context.Context, cfg config.Config) error { + // First preference is given to the key `storage.docdb.mongo-server-url-dir`. + // If that doesn't exist, then we move on to `storage.docdb.mongo-server-url`. + // If that doesn't exist, then we check if `MONGO_SERVER_URL` env var is set. + logger := logging.FromContext(ctx) + mongoEnv := "MONGO_SERVER_URL" + + if cfg.Storage.DocDB.MongoServerURLDir != "" { + logger.Infof("setting %s from storage.docdb.mongo-server-url-dir: %s", mongoEnv, cfg.Storage.DocDB.MongoServerURLDir) + if err := setMongoServerURLFromDir(cfg.Storage.DocDB.MongoServerURLDir); err != nil { + return err + } + } else if cfg.Storage.DocDB.MongoServerURL != "" { + logger.Infof("setting %s from storage.docdb.mongo-server-url", mongoEnv) + if err := os.Setenv(mongoEnv, cfg.Storage.DocDB.MongoServerURL); err != nil { + return err + } + } + + if _, envExists := os.LookupEnv(mongoEnv); !envExists { + return fmt.Errorf("mongo docstore configured but %s environment variable not set, "+ + "supply one of storage.docdb.mongo-server-url-dir, storage.docdb.mongo-server-url or set %s", mongoEnv, mongoEnv) + } + + return nil +} + +func setMongoServerURLFromDir(dir string) error { + fileDataNormalized, err := getMongoServerURLFromDir(dir) + if err != nil { + return err + } + + if err = os.Setenv("MONGO_SERVER_URL", fileDataNormalized); err != nil { + return err + } + + return nil +} + +func getMongoServerURLFromDir(dir string) (string, error) { + mongoEnv := "MONGO_SERVER_URL" + + stat, err := os.Stat(dir) + if err != nil { + if os.IsNotExist(err) { + // If directory does not exist, then create it. This is needed for + // the fsnotify watcher. + // fsnotify does not receive events if the path that it's watching + // is created later. + if err := os.MkdirAll(dir, 0755); err != nil { + return "", err + } + return "", nil + } + return "", err + } + // If the path exists but is not a directory, then throw an error + if !stat.IsDir() { + return "", fmt.Errorf("path specified at storage.docdb.mongo-server-url-dir: %s is not a directory", dir) + } + + filePath := filepath.Join(dir, mongoEnv) + fileData, err := os.ReadFile(filePath) + if err != nil { + return "", err + } + // A trailing newline is fairly common in mounted files, let's remove it. + fileDataNormalized := strings.TrimSuffix(string(fileData), "\n") + + return fileDataNormalized, nil +} diff --git a/vendor/github.com/tektoncd/chains/pkg/chains/storage/storage.go b/vendor/github.com/tektoncd/chains/pkg/chains/storage/storage.go index be3561c965..dbb07a37b2 100644 --- a/vendor/github.com/tektoncd/chains/pkg/chains/storage/storage.go +++ b/vendor/github.com/tektoncd/chains/pkg/chains/storage/storage.go @@ -15,6 +15,7 @@ package storage import ( "context" + "errors" "github.com/tektoncd/chains/pkg/chains/objects" "github.com/tektoncd/chains/pkg/chains/storage/docdb" @@ -25,8 +26,10 @@ import ( "github.com/tektoncd/chains/pkg/chains/storage/tekton" "github.com/tektoncd/chains/pkg/config" "github.com/tektoncd/pipeline/pkg/client/clientset/versioned" + "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" + "knative.dev/pkg/logging" ) // Backend is an interface to store a chains Payload @@ -42,6 +45,8 @@ type Backend interface { // InitializeBackends creates and initializes every configured storage backend. func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernetes.Interface, cfg config.Config) (map[string]Backend, error) { + logger := logging.FromContext(ctx) + // Add an entry here for every configured backend configuredBackends := []string{} if cfg.Artifacts.TaskRuns.Enabled() { @@ -53,6 +58,7 @@ func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernet if cfg.Artifacts.PipelineRuns.Enabled() { configuredBackends = append(configuredBackends, sets.List[string](cfg.Artifacts.PipelineRuns.StorageBackend)...) } + logger.Infof("configured backends from config: %v", configuredBackends) // Now only initialize and return the configured ones. backends := map[string]Backend{} @@ -90,5 +96,55 @@ func InitializeBackends(ctx context.Context, ps versioned.Interface, kc kubernet } } + + logger.Infof("successfully initialized backends: %v", maps.Keys(backends)) return backends, nil } + +// WatchBackends watches backends for any update and keeps them up to date. +func WatchBackends(ctx context.Context, watcherStop chan bool, backends map[string]Backend, cfg config.Config) error { + logger := logging.FromContext(ctx) + for backend := range backends { + switch backend { + case docdb.StorageTypeDocDB: + docdbWatcherStop := make(chan bool) + backendChan, err := docdb.WatchBackend(ctx, cfg, docdbWatcherStop) + if err != nil { + if errors.Is(err, docdb.ErrNothingToWatch) { + logger.Info(err) + continue + } + return err + } + go func() { + for { + select { + case newBackend := <-backendChan: + if newBackend == nil { + logger.Errorf("removing backend %s from backends", docdb.StorageTypeDocDB) + delete(backends, docdb.StorageTypeDocDB) + continue + } + logger.Infof("adding to backends: %s", docdb.StorageTypeDocDB) + backends[docdb.StorageTypeDocDB] = newBackend + case <-watcherStop: + // Stop the DocDB watcher first + select { + case docdbWatcherStop <- true: + logger.Info("sent close event to docdb.WatchBackend()...") + default: + logger.Info("could not send close event to docdb.WatchBackend()...") + } + + // Now stop this backend + logger.Info("stop watching backends...") + return + } + } + }() + default: + logger.Debugf("no backends to watch...") + } + } + return nil +} diff --git a/vendor/github.com/tektoncd/chains/pkg/config/config.go b/vendor/github.com/tektoncd/chains/pkg/config/config.go index c1df774f68..7175ac2bc1 100644 --- a/vendor/github.com/tektoncd/chains/pkg/config/config.go +++ b/vendor/github.com/tektoncd/chains/pkg/config/config.go @@ -91,10 +91,11 @@ type KMSSigner struct { // KMSAuth configures authentication to the KMS server type KMSAuth struct { - Address string - Token string - OIDC KMSAuthOIDC - Spire KMSAuthSpire + Address string + Token string + TokenPath string + OIDC KMSAuthOIDC + Spire KMSAuthSpire } // KMSAuthOIDC configures settings to authenticate with OIDC @@ -122,7 +123,9 @@ type TektonStorageConfig struct { } type DocDBStorageConfig struct { - URL string + URL string + MongoServerURL string + MongoServerURLDir string } type GrafeasConfig struct { @@ -165,13 +168,16 @@ const ( ociStorageKey = "artifacts.oci.storage" ociSignerKey = "artifacts.oci.signer" - gcsBucketKey = "storage.gcs.bucket" - ociRepositoryKey = "storage.oci.repository" - ociRepositoryInsecureKey = "storage.oci.repository.insecure" - docDBUrlKey = "storage.docdb.url" - grafeasProjectIDKey = "storage.grafeas.projectid" - grafeasNoteIDKey = "storage.grafeas.noteid" - grafeasNoteHint = "storage.grafeas.notehint" + gcsBucketKey = "storage.gcs.bucket" + ociRepositoryKey = "storage.oci.repository" + ociRepositoryInsecureKey = "storage.oci.repository.insecure" + docDBUrlKey = "storage.docdb.url" + docDBMongoServerURLKey = "storage.docdb.mongo-server-url" + docDBMongoServerURLDirKey = "storage.docdb.mongo-server-url-dir" + + grafeasProjectIDKey = "storage.grafeas.projectid" + grafeasNoteIDKey = "storage.grafeas.noteid" + grafeasNoteHint = "storage.grafeas.notehint" // PubSub - General pubsubProvider = "storage.pubsub.provider" @@ -187,6 +193,7 @@ const ( kmsAuthAddress = "signers.kms.auth.address" kmsAuthToken = "signers.kms.auth.token" kmsAuthOIDCPath = "signers.kms.auth.oidc.path" + kmsAuthTokenPath = "signers.kms.auth.token-path" // #nosec G101 kmsAuthOIDCRole = "signers.kms.auth.oidc.role" kmsAuthSpireSock = "signers.kms.auth.spire.sock" kmsAuthSpireAudience = "signers.kms.auth.spire.audience" @@ -293,6 +300,8 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { asString(ociRepositoryKey, &cfg.Storage.OCI.Repository), asBool(ociRepositoryInsecureKey, &cfg.Storage.OCI.Insecure), asString(docDBUrlKey, &cfg.Storage.DocDB.URL), + asString(docDBMongoServerURLKey, &cfg.Storage.DocDB.MongoServerURL), + asString(docDBMongoServerURLDirKey, &cfg.Storage.DocDB.MongoServerURLDir), asString(grafeasProjectIDKey, &cfg.Storage.Grafeas.ProjectID), asString(grafeasNoteIDKey, &cfg.Storage.Grafeas.NoteID), asString(grafeasNoteHint, &cfg.Storage.Grafeas.NoteHint), @@ -304,6 +313,7 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { asString(kmsSignerKMSRef, &cfg.Signers.KMS.KMSRef), asString(kmsAuthAddress, &cfg.Signers.KMS.Auth.Address), asString(kmsAuthToken, &cfg.Signers.KMS.Auth.Token), + asString(kmsAuthTokenPath, &cfg.Signers.KMS.Auth.TokenPath), asString(kmsAuthOIDCPath, &cfg.Signers.KMS.Auth.OIDC.Path), asString(kmsAuthOIDCRole, &cfg.Signers.KMS.Auth.OIDC.Role), asString(kmsAuthSpireSock, &cfg.Signers.KMS.Auth.Spire.Sock), diff --git a/vendor/go.mongodb.org/mongo-driver/event/monitoring.go b/vendor/go.mongodb.org/mongo-driver/event/monitoring.go index cc2c7a4e6c..ddc7abacf7 100644 --- a/vendor/go.mongodb.org/mongo-driver/event/monitoring.go +++ b/vendor/go.mongodb.org/mongo-driver/event/monitoring.go @@ -117,6 +117,7 @@ type PoolEvent struct { Address string `json:"address"` ConnectionID uint64 `json:"connectionId"` PoolOptions *MonitorPoolOptions `json:"options"` + Duration time.Duration `json:"duration"` Reason string `json:"reason"` // ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field // can be used to distinguish between individual servers in a load balanced deployment. diff --git a/vendor/go.mongodb.org/mongo-driver/internal/csot/csot.go b/vendor/go.mongodb.org/mongo-driver/internal/csot/csot.go index 678252c51a..43801a5d4f 100644 --- a/vendor/go.mongodb.org/mongo-driver/internal/csot/csot.go +++ b/vendor/go.mongodb.org/mongo-driver/internal/csot/csot.go @@ -21,11 +21,13 @@ type timeoutKey struct{} // TODO default behavior. func MakeTimeoutContext(ctx context.Context, to time.Duration) (context.Context, context.CancelFunc) { // Only use the passed in Duration as a timeout on the Context if it - // is non-zero. + // is non-zero and if the Context doesn't already have a timeout. cancelFunc := func() {} - if to != 0 { + if _, deadlineSet := ctx.Deadline(); to != 0 && !deadlineSet { ctx, cancelFunc = context.WithTimeout(ctx, to) } + + // Add timeoutKey either way to indicate CSOT is enabled. return context.WithValue(ctx, timeoutKey{}, true), cancelFunc } diff --git a/vendor/go.mongodb.org/mongo-driver/mongo/change_stream.go b/vendor/go.mongodb.org/mongo-driver/mongo/change_stream.go index c4c2fb2590..1bedcc3f8a 100644 --- a/vendor/go.mongodb.org/mongo-driver/mongo/change_stream.go +++ b/vendor/go.mongodb.org/mongo-driver/mongo/change_stream.go @@ -277,10 +277,10 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err cs.aggregate.Pipeline(plArr) } - // If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already - // a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution - // and potential retry. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) { + // If cs.client.timeout is set and context is not already a Timeout context, + // honor cs.client.timeout in new Timeout context for change stream + // operation execution and potential retry. + if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) { newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout) // Redefine ctx to be the new timeout-derived context. ctx = newCtx diff --git a/vendor/go.mongodb.org/mongo-driver/mongo/collection.go b/vendor/go.mongodb.org/mongo-driver/mongo/collection.go index c7b2a8a113..555035ff51 100644 --- a/vendor/go.mongodb.org/mongo-driver/mongo/collection.go +++ b/vendor/go.mongodb.org/mongo-driver/mongo/collection.go @@ -863,6 +863,15 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) { Timeout(a.client.timeout). MaxTime(ao.MaxTime) + // Omit "maxTimeMS" from operations that return a user-managed cursor to + // prevent confusing "cursor not found" errors. To maintain existing + // behavior for users who set "timeoutMS" with no context deadline, only + // omit "maxTimeMS" when a context deadline is set. + // + // See DRIVERS-2722 for more detail. + _, deadlineSet := a.ctx.Deadline() + op.OmitCSOTMaxTimeMS(deadlineSet) + if ao.AllowDiskUse != nil { op.AllowDiskUse(*ao.AllowDiskUse) } @@ -1191,6 +1200,22 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/find/. func (coll *Collection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (cur *Cursor, err error) { + // Omit "maxTimeMS" from operations that return a user-managed cursor to + // prevent confusing "cursor not found" errors. To maintain existing + // behavior for users who set "timeoutMS" with no context deadline, only + // omit "maxTimeMS" when a context deadline is set. + // + // See DRIVERS-2722 for more detail. + _, deadlineSet := ctx.Deadline() + return coll.find(ctx, filter, deadlineSet, opts...) +} + +func (coll *Collection) find( + ctx context.Context, + filter interface{}, + omitCSOTMaxTimeMS bool, + opts ...*options.FindOptions, +) (cur *Cursor, err error) { if ctx == nil { ctx = context.Background() @@ -1230,7 +1255,8 @@ func (coll *Collection) Find(ctx context.Context, filter interface{}, CommandMonitor(coll.client.monitor).ServerSelector(selector). ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name). Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI). - Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger) + Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger). + OmitCSOTMaxTimeMS(omitCSOTMaxTimeMS) cursorOpts := coll.client.createBaseCursorOptions() @@ -1408,7 +1434,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{}, // by the server. findOpts = append(findOpts, options.Find().SetLimit(-1)) - cursor, err := coll.Find(ctx, filter, findOpts...) + cursor, err := coll.find(ctx, filter, false, findOpts...) return &SingleResult{ ctx: ctx, cur: cursor, diff --git a/vendor/go.mongodb.org/mongo-driver/mongo/writeconcern/writeconcern.go b/vendor/go.mongodb.org/mongo-driver/mongo/writeconcern/writeconcern.go index 8e288d10b7..1d9472ec0b 100644 --- a/vendor/go.mongodb.org/mongo-driver/mongo/writeconcern/writeconcern.go +++ b/vendor/go.mongodb.org/mongo-driver/mongo/writeconcern/writeconcern.go @@ -42,7 +42,7 @@ var ErrNegativeW = errors.New("write concern `w` field cannot be a negative numb // Deprecated: ErrNegativeWTimeout will be removed in Go Driver 2.0. var ErrNegativeWTimeout = errors.New("write concern `wtimeout` field cannot be negative") -// A WriteConcern defines a MongoDB read concern, which describes the level of acknowledgment +// A WriteConcern defines a MongoDB write concern, which describes the level of acknowledgment // requested from MongoDB for write operations to a standalone mongod, to replica sets, or to // sharded clusters. // diff --git a/vendor/go.mongodb.org/mongo-driver/version/version.go b/vendor/go.mongodb.org/mongo-driver/version/version.go index 13c18479af..879bbdb7a4 100644 --- a/vendor/go.mongodb.org/mongo-driver/version/version.go +++ b/vendor/go.mongodb.org/mongo-driver/version/version.go @@ -8,4 +8,4 @@ package version // import "go.mongodb.org/mongo-driver/version" // Driver is the current version of the driver. -var Driver = "v1.14.0" +var Driver = "v1.15.0" diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/errors.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/errors.go index 3b8b9823b7..177c2d4501 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/errors.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/errors.go @@ -14,6 +14,7 @@ import ( "strings" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/internal/csot" "go.mongodb.org/mongo-driver/mongo/description" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) @@ -377,7 +378,7 @@ func (e Error) NamespaceNotFound() bool { // ExtractErrorFromServerResponse extracts an error from a server response bsoncore.Document // if there is one. Also used in testing for SDAM. -func ExtractErrorFromServerResponse(doc bsoncore.Document) error { +func ExtractErrorFromServerResponse(ctx context.Context, doc bsoncore.Document) error { var errmsg, codeName string var code int32 var labels []string @@ -514,7 +515,7 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error { errmsg = "command failed" } - return Error{ + err := Error{ Code: code, Message: errmsg, Name: codeName, @@ -522,6 +523,20 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error { TopologyVersion: tv, Raw: doc, } + + // If CSOT is enabled and we get a MaxTimeMSExpired error, assume that + // the error was caused by setting "maxTimeMS" on the command based on + // the context deadline or on "timeoutMS". In that case, make the error + // wrap context.DeadlineExceeded so that users can always check + // + // errors.Is(err, context.DeadlineExceeded) + // + // for either client-side or server-side timeouts. + if csot.IsTimeoutContext(ctx) && err.Code == 50 { + err.Wrapped = context.DeadlineExceeded + } + + return err } if len(wcError.WriteErrors) > 0 || wcError.WriteConcernError != nil { diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation.go index b39a63abe4..eb1acec88f 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation.go @@ -310,6 +310,11 @@ type Operation struct { // OP_MSG as well as for logging server selection data. Name string + // OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the + // command when CSOT is enabled. It does not effect "maxTimeMS" set by + // [Operation.MaxTime]. + OmitCSOTMaxTimeMS bool + // omitReadPreference is a boolean that indicates whether to omit the // read preference from the command. This omition includes the case // where a default read preference is used when the operation @@ -499,9 +504,9 @@ func (op Operation) Execute(ctx context.Context) error { return err } - // If no deadline is set on the passed-in context, op.Timeout is set, and context is not already - // a Timeout context, honor op.Timeout in new Timeout context for operation execution. - if _, deadlineSet := ctx.Deadline(); !deadlineSet && op.Timeout != nil && !csot.IsTimeoutContext(ctx) { + // If op.Timeout is set, and context is not already a Timeout context, honor + // op.Timeout in new Timeout context for operation execution. + if op.Timeout != nil && !csot.IsTimeoutContext(ctx) { newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *op.Timeout) // Redefine ctx to be the new timeout-derived context. ctx = newCtx @@ -617,6 +622,13 @@ func (op Operation) Execute(ctx context.Context) error { } }() for { + // If we're starting a retry and the the error from the previous try was + // a context canceled or deadline exceeded error, stop retrying and + // return that error. + if errors.Is(prevErr, context.Canceled) || errors.Is(prevErr, context.DeadlineExceeded) { + return prevErr + } + requestID := wiremessage.NextRequestID() // If the server or connection are nil, try to select a new server and get a new connection. @@ -683,8 +695,7 @@ func (op Operation) Execute(ctx context.Context) error { first = false } - // Calculate maxTimeMS value to potentially be appended to the wire message. - maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor().P90(), srvr.RTTMonitor().Stats()) + maxTimeMS, err := op.calculateMaxTimeMS(ctx, srvr.RTTMonitor()) if err != nil { return err } @@ -777,7 +788,7 @@ func (op Operation) Execute(ctx context.Context) error { } else if deadline, ok := ctx.Deadline(); ok { if csot.IsTimeoutContext(ctx) && time.Now().Add(srvr.RTTMonitor().P90()).After(deadline) { err = fmt.Errorf( - "remaining time %v until context deadline is less than 90th percentile RTT: %w\n%v", + "remaining time %v until context deadline is less than 90th percentile network round-trip time: %w\n%v", time.Until(deadline), ErrDeadlineWouldBeExceeded, srvr.RTTMonitor().Stats()) @@ -1089,7 +1100,7 @@ func (op Operation) readWireMessage(ctx context.Context, conn Connection) (resul } // decode - res, err := op.decodeResult(opcode, rem) + res, err := op.decodeResult(ctx, opcode, rem) // Update cluster/operation time and recovery tokens before handling the error to ensure we're properly updating // everything. op.updateClusterTimes(res) @@ -1562,10 +1573,15 @@ func (op Operation) addClusterTime(dst []byte, desc description.SelectedServer) // if the ctx is a Timeout context. If the context is not a Timeout context, it uses the // operation's MaxTimeMS if set. If no MaxTimeMS is set on the operation, and context is // not a Timeout context, calculateMaxTimeMS returns 0. -func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration, rttStats string) (uint64, error) { +func (op Operation) calculateMaxTimeMS(ctx context.Context, mon RTTMonitor) (uint64, error) { if csot.IsTimeoutContext(ctx) { + if op.OmitCSOTMaxTimeMS { + return 0, nil + } + if deadline, ok := ctx.Deadline(); ok { remainingTimeout := time.Until(deadline) + rtt90 := mon.P90() maxTime := remainingTimeout - rtt90 // Always round up to the next millisecond value so we never truncate the calculated @@ -1573,11 +1589,21 @@ func (op Operation) calculateMaxTimeMS(ctx context.Context, rtt90 time.Duration, maxTimeMS := int64((maxTime + (time.Millisecond - 1)) / time.Millisecond) if maxTimeMS <= 0 { return 0, fmt.Errorf( - "remaining time %v until context deadline is less than or equal to 90th percentile RTT: %w\n%v", + "negative maxTimeMS: remaining time %v until context deadline is less than 90th percentile network round-trip time (%v): %w", remainingTimeout, - ErrDeadlineWouldBeExceeded, - rttStats) + mon.Stats(), + ErrDeadlineWouldBeExceeded) + } + + // The server will return a "BadValue" error if maxTimeMS is greater + // than the maximum positive int32 value (about 24.9 days). If the + // user specified a timeout value greater than that, omit maxTimeMS + // and let the client-side timeout handle cancelling the op if the + // timeout is ever reached. + if maxTimeMS > math.MaxInt32 { + return 0, nil } + return uint64(maxTimeMS), nil } } else if op.MaxTime != nil { @@ -1827,7 +1853,7 @@ func (Operation) decodeOpReply(wm []byte) opReply { return reply } -func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore.Document, error) { +func (op Operation) decodeResult(ctx context.Context, opcode wiremessage.OpCode, wm []byte) (bsoncore.Document, error) { switch opcode { case wiremessage.OpReply: reply := op.decodeOpReply(wm) @@ -1845,7 +1871,7 @@ func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore return nil, NewCommandResponseError("malformed OP_REPLY: invalid document", err) } - return rdr, ExtractErrorFromServerResponse(rdr) + return rdr, ExtractErrorFromServerResponse(ctx, rdr) case wiremessage.OpMsg: _, wm, ok := wiremessage.ReadMsgFlags(wm) if !ok { @@ -1882,7 +1908,7 @@ func (op Operation) decodeResult(opcode wiremessage.OpCode, wm []byte) (bsoncore return nil, NewCommandResponseError("malformed OP_MSG: invalid document", err) } - return res, ExtractErrorFromServerResponse(res) + return res, ExtractErrorFromServerResponse(ctx, res) default: return nil, fmt.Errorf("cannot decode result from %s", opcode) } diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/aggregate.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/aggregate.go index ca0e796523..44467df8fd 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/aggregate.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/aggregate.go @@ -50,6 +50,7 @@ type Aggregate struct { hasOutputStage bool customOptions map[string]bsoncore.Value timeout *time.Duration + omitCSOTMaxTimeMS bool result driver.CursorResponse } @@ -113,6 +114,7 @@ func (a *Aggregate) Execute(ctx context.Context) error { MaxTime: a.maxTime, Timeout: a.timeout, Name: driverutil.AggregateOp, + OmitCSOTMaxTimeMS: a.omitCSOTMaxTimeMS, }.Execute(ctx) } @@ -419,3 +421,15 @@ func (a *Aggregate) Timeout(timeout *time.Duration) *Aggregate { a.timeout = timeout return a } + +// OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the +// command when CSOT is enabled. It does not effect "maxTimeMS" set by +// [Aggregate.MaxTime]. +func (a *Aggregate) OmitCSOTMaxTimeMS(omit bool) *Aggregate { + if a == nil { + a = new(Aggregate) + } + + a.omitCSOTMaxTimeMS = omit + return a +} diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/find.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/find.go index 27bb5b4f99..8950fde86d 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/find.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/operation/find.go @@ -62,6 +62,7 @@ type Find struct { result driver.CursorResponse serverAPI *driver.ServerAPIOptions timeout *time.Duration + omitCSOTMaxTimeMS bool logger *logger.Logger } @@ -110,6 +111,7 @@ func (f *Find) Execute(ctx context.Context) error { Timeout: f.timeout, Logger: f.logger, Name: driverutil.FindOp, + OmitCSOTMaxTimeMS: f.omitCSOTMaxTimeMS, }.Execute(ctx) } @@ -552,6 +554,18 @@ func (f *Find) Timeout(timeout *time.Duration) *Find { return f } +// OmitCSOTMaxTimeMS omits the automatically-calculated "maxTimeMS" from the +// command when CSOT is enabled. It does not effect "maxTimeMS" set by +// [Find.MaxTime]. +func (f *Find) OmitCSOTMaxTimeMS(omit bool) *Find { + if f == nil { + f = new(Find) + } + + f.omitCSOTMaxTimeMS = omit + return f +} + // Logger sets the logger for this operation. func (f *Find) Logger(logger *logger.Logger) *Find { if f == nil { diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/connection.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/connection.go index 13035abc0f..476459e8e6 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/connection.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/connection.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "go.mongodb.org/mongo-driver/internal/csot" "go.mongodb.org/mongo-driver/mongo/address" "go.mongodb.org/mongo-driver/mongo/description" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" @@ -77,6 +78,10 @@ type connection struct { // TODO(GODRIVER-2824): change driverConnectionID type to int64. driverConnectionID uint64 generation uint64 + + // awaitingResponse indicates that the server response was not completely + // read before returning the connection to the pool. + awaitingResponse bool } // newConnection handles the creation of a connection. It does not connect the connection. @@ -414,8 +419,17 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) { dst, errMsg, err := c.read(ctx) if err != nil { - // We closeConnection the connection because we don't know if there are other bytes left to read. - c.close() + if nerr := net.Error(nil); errors.As(err, &nerr) && nerr.Timeout() && csot.IsTimeoutContext(ctx) { + // If the error was a timeout error and CSOT is enabled, instead of + // closing the connection mark it as awaiting response so the pool + // can read the response before making it available to other + // operations. + c.awaitingResponse = true + } else { + // Otherwise, use the pre-CSOT behavior and close the connection + // because we don't know if there are other bytes left to read. + c.close() + } message := errMsg if errors.Is(err, io.EOF) { message = "socket was unexpectedly closed" diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/pool.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/pool.go index 6ca23c071b..52461eb681 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/pool.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/pool.go @@ -467,6 +467,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { }) } + start := time.Now() // Check the pool state while holding a stateMu read lock. If the pool state is not "ready", // return an error. Do all of this while holding the stateMu read lock to prevent a state change between // checking the state and entering the wait queue. Not holding the stateMu read lock here may @@ -477,8 +478,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { case poolClosed: p.stateMu.RUnlock() + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed, } @@ -487,9 +490,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonPoolClosed, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonPoolClosed, }) } return nil, ErrPoolClosed @@ -497,8 +501,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { err := poolClearedError{err: p.lastClearErr, address: p.address} p.stateMu.RUnlock() + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedError, } @@ -507,10 +513,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonConnectionErrored, - Error: err, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonConnectionErrored, + Error: err, }) } return nil, err @@ -539,9 +546,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { // or an error, so unlock the stateMu lock here. p.stateMu.RUnlock() + duration := time.Since(start) if w.err != nil { if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedError, } @@ -550,18 +559,21 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonConnectionErrored, - Error: w.err, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonConnectionErrored, + Error: w.err, }) } return nil, w.err } + duration = time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ logger.KeyDriverConnectionID, w.conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), } logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) @@ -572,6 +584,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Type: event.GetSucceeded, Address: p.address.String(), ConnectionID: w.conn.driverConnectionID, + Duration: duration, }) } @@ -584,12 +597,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { p.stateMu.RUnlock() // Wait for either the wantConn to be ready or for the Context to time out. - start := time.Now() + waitQueueStart := time.Now() select { case <-w.ready: if w.err != nil { + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedError, logger.KeyError, w.err.Error(), } @@ -599,19 +614,22 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonConnectionErrored, - Error: w.err, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonConnectionErrored, + Error: w.err, }) } return nil, w.err } + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ logger.KeyDriverConnectionID, w.conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), } logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) @@ -622,14 +640,17 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Type: event.GetSucceeded, Address: p.address.String(), ConnectionID: w.conn.driverConnectionID, + Duration: duration, }) } return w.conn, nil case <-ctx.Done(): - duration := time.Since(start) + waitQueueDuration := time.Since(waitQueueStart) + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedTimout, } @@ -638,10 +659,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonTimedOut, - Error: ctx.Err(), + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonTimedOut, + Error: ctx.Err(), }) } @@ -650,7 +672,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { maxPoolSize: p.maxSize, totalConnections: p.totalConnectionCount(), availableConnections: p.availableConnectionCount(), - waitDuration: duration, + waitDuration: waitQueueDuration, } if p.loadBalanced { err.pinnedConnections = &pinnedConnections{ @@ -742,6 +764,81 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro return nil } +var ( + // BGReadTimeout is the maximum amount of the to wait when trying to read + // the server reply on a connection after an operation timed out. The + // default is 1 second. + // + // Deprecated: BGReadTimeout is intended for internal use only and may be + // removed or modified at any time. + BGReadTimeout = 1 * time.Second + + // BGReadCallback is a callback for monitoring the behavior of the + // background-read-on-timeout connection preserving mechanism. + // + // Deprecated: BGReadCallback is intended for internal use only and may be + // removed or modified at any time. + BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool) +) + +// bgRead sets a new read deadline on the provided connection (1 second in the +// future) and tries to read any bytes returned by the server. If successful, it +// checks the connection into the provided pool. If there are any errors, it +// closes the connection. +// +// It calls the package-global BGReadCallback function, if set, with the +// address, timings, and any errors that occurred. +func bgRead(pool *pool, conn *connection) { + var start, read time.Time + start = time.Now() + errs := make([]error, 0) + connClosed := false + + defer func() { + // No matter what happens, always check the connection back into the + // pool, which will either make it available for other operations or + // remove it from the pool if it was closed. + err := pool.checkInNoEvent(conn) + if err != nil { + errs = append(errs, fmt.Errorf("error checking in: %w", err)) + } + + if BGReadCallback != nil { + BGReadCallback(conn.addr.String(), start, read, errs, connClosed) + } + }() + + err := conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout)) + if err != nil { + errs = append(errs, fmt.Errorf("error setting a read deadline: %w", err)) + + connClosed = true + err := conn.close() + if err != nil { + errs = append(errs, fmt.Errorf("error closing conn after setting read deadline: %w", err)) + } + + return + } + + // The context here is only used for cancellation, not deadline timeout, so + // use context.Background(). The read timeout is set by calling + // SetReadDeadline above. + _, _, err = conn.read(context.Background()) + read = time.Now() + if err != nil { + errs = append(errs, fmt.Errorf("error reading: %w", err)) + + connClosed = true + err := conn.close() + if err != nil { + errs = append(errs, fmt.Errorf("error closing conn after reading: %w", err)) + } + + return + } +} + // checkIn returns an idle connection to the pool. If the connection is perished or the pool is // closed, it is removed from the connection pool and closed. func (p *pool) checkIn(conn *connection) error { @@ -781,6 +878,20 @@ func (p *pool) checkInNoEvent(conn *connection) error { return ErrWrongPool } + // If the connection has an awaiting server response, try to read the + // response in another goroutine before checking it back into the pool. + // + // Do this here because we want to publish checkIn events when the operation + // is done with the connection, not when it's ready to be used again. That + // means that connections in "awaiting response" state are checked in but + // not usable, which is not covered by the current pool events. We may need + // to add pool event information in the future to communicate that. + if conn.awaitingResponse { + conn.awaitingResponse = false + go bgRead(p, conn) + return nil + } + // Bump the connection idle deadline here because we're about to make the connection "available". // The idle deadline is used to determine when a connection has reached its max idle time and // should be closed. A connection reaches its max idle time when it has been "available" in the @@ -1085,6 +1196,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { }) } + start := time.Now() // Pass the createConnections context to connect to allow pool close to cancel connection // establishment so shutdown doesn't block indefinitely if connectTimeout=0. err := conn.connect(ctx) @@ -1111,9 +1223,11 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { continue } + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ logger.KeyDriverConnectionID, conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), } logPoolMessage(p, logger.ConnectionReady, keysAndValues...) @@ -1124,6 +1238,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { Type: event.ConnectionReady, Address: p.address.String(), ConnectionID: conn.driverConnectionID, + Duration: duration, }) } diff --git a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/rtt_monitor.go b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/rtt_monitor.go index 3dd031f2ea..07f508caae 100644 --- a/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/rtt_monitor.go +++ b/vendor/go.mongodb.org/mongo-driver/x/mongo/driver/topology/rtt_monitor.go @@ -322,7 +322,10 @@ func (r *rttMonitor) Stats() string { } } - return fmt.Sprintf(`Round-trip-time monitor statistics:`+"\n"+ - `average RTT: %v, minimum RTT: %v, 90th percentile RTT: %v, standard dev: %v`+"\n", - time.Duration(avg), r.minRTT, r.rtt90, time.Duration(stdDev)) + return fmt.Sprintf( + "network round-trip time stats: avg: %v, min: %v, 90th pct: %v, stddev: %v", + time.Duration(avg), + r.minRTT, + r.rtt90, + time.Duration(stdDev)) } diff --git a/vendor/gocloud.dev/docstore/mongodocstore/urls.go b/vendor/gocloud.dev/docstore/mongodocstore/urls.go index 3e7ce282e2..b6d2ed14d4 100644 --- a/vendor/gocloud.dev/docstore/mongodocstore/urls.go +++ b/vendor/gocloud.dev/docstore/mongodocstore/urls.go @@ -34,27 +34,31 @@ func init() { // defaultDialer dials a default Mongo server based on the environment variable // MONGO_SERVER_URL. type defaultDialer struct { - init sync.Once - opener *URLOpener - err error + mongoServerURL string + mu sync.Mutex + opener *URLOpener + err error } func (o *defaultDialer) OpenCollectionURL(ctx context.Context, u *url.URL) (*docstore.Collection, error) { - o.init.Do(func() { - serverURL := os.Getenv("MONGO_SERVER_URL") - if serverURL == "" { - o.err = errors.New("MONGO_SERVER_URL environment variable is not set") - return - } - client, err := Dial(ctx, serverURL) + o.mu.Lock() + defer o.mu.Unlock() + currentEnv := os.Getenv("MONGO_SERVER_URL") + + if currentEnv == "" { + o.err = errors.New("MONGO_SERVER_URL environment variable is not set") + return nil, fmt.Errorf("open collection %s: %v", u, o.err) + } + + // If MONGO_SERVER_URL has been updated, then update o.opener as well + if currentEnv != o.mongoServerURL { + client, err := Dial(ctx, currentEnv) if err != nil { - o.err = fmt.Errorf("failed to dial default Mongo server at %q: %v", serverURL, err) - return + o.err = fmt.Errorf("failed to dial default Mongo server at %q: %v", currentEnv, err) + return nil, fmt.Errorf("open collection %s: %v", u, o.err) } + o.mongoServerURL = currentEnv o.opener = &URLOpener{Client: client} - }) - if o.err != nil { - return nil, fmt.Errorf("open collection %s: %v", u, o.err) } return o.opener.OpenCollectionURL(ctx, u) } diff --git a/vendor/modules.txt b/vendor/modules.txt index a7431b6a62..abcce9cc28 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1366,8 +1366,8 @@ github.com/syndtr/goleveldb/leveldb/opt github.com/syndtr/goleveldb/leveldb/storage github.com/syndtr/goleveldb/leveldb/table github.com/syndtr/goleveldb/leveldb/util -# github.com/tektoncd/chains v0.21.1 -## explicit; go 1.21 +# github.com/tektoncd/chains v0.22.0 +## explicit; go 1.22 github.com/tektoncd/chains/internal/backport github.com/tektoncd/chains/pkg/artifacts github.com/tektoncd/chains/pkg/chains @@ -1627,13 +1627,13 @@ github.com/xdg-go/stringprep # github.com/xlab/treeprint v1.2.0 ## explicit; go 1.13 github.com/xlab/treeprint -# github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a -## explicit; go 1.12 +# github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 +## explicit; go 1.22 github.com/youmark/pkcs8 # github.com/zeebo/errs v1.3.0 ## explicit; go 1.12 github.com/zeebo/errs -# go.mongodb.org/mongo-driver v1.14.0 +# go.mongodb.org/mongo-driver v1.15.0 ## explicit; go 1.18 go.mongodb.org/mongo-driver/bson go.mongodb.org/mongo-driver/bson/bsoncodec @@ -1796,8 +1796,8 @@ gocloud.dev/pubsub gocloud.dev/pubsub/batcher gocloud.dev/pubsub/driver gocloud.dev/pubsub/mempubsub -# gocloud.dev/docstore/mongodocstore v0.37.0 -## explicit; go 1.20 +# gocloud.dev/docstore/mongodocstore v0.37.1-0.20240501181211-d8b9c9401f18 +## explicit; go 1.22 gocloud.dev/docstore/mongodocstore # gocloud.dev/pubsub/kafkapubsub v0.37.0 ## explicit; go 1.20