From dd7efc8fb2ee03edd8d63273abb2d1f973c2352c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Wed, 27 Nov 2024 15:25:18 +0100 Subject: [PATCH 01/11] Add 'k6/cloud' module --- cmd/cloud.go | 27 ++++++++- js/jsmodules.go | 2 + js/modules/k6/cloud/cloud.go | 93 +++++++++++++++++++++++++++++++ js/modules/k6/cloud/cloud_test.go | 54 ++++++++++++++++++ lib/options.go | 3 +- 5 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 js/modules/k6/cloud/cloud.go create mode 100644 js/modules/k6/cloud/cloud_test.go diff --git a/cmd/cloud.go b/cmd/cloud.go index 9c189606c04..64d8e9e1780 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -1,6 +1,7 @@ package cmd import ( + "bytes" "context" "encoding/json" "errors" @@ -182,11 +183,15 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { return err } - refID := cloudTestRun.ReferenceID if cloudTestRun.ConfigOverride != nil { cloudConfig = cloudConfig.Apply(*cloudTestRun.ConfigOverride) } + refID := cloudTestRun.ReferenceID + if err := storeTestRunIdInCloudOptions(testRunState, refID); err != nil { + return err + } + // Trap Interrupts, SIGINTs and SIGTERMs. gracefulStop := func(sig os.Signal) { logger.WithField("sig", sig).Print("Stopping cloud test run in response to signal...") @@ -422,3 +427,23 @@ func exactCloudArgs() cobra.PositionalArgs { return nil } } + +const testRunIdKey = "testRunId" + +func storeTestRunIdInCloudOptions(testRunState *lib.TestRunState, testRunId string) error { + tmpConfig, err := cloudapi.GetTemporaryCloudConfig(testRunState.Options.Cloud, nil) + if err != nil { + return err + } + + tmpConfig[testRunIdKey] = testRunId + + buffer := &bytes.Buffer{} + if err := json.NewEncoder(buffer).Encode(tmpConfig); err != nil { + return err + } + + testRunState.Options.Cloud = buffer.Bytes() + + return nil +} diff --git a/js/jsmodules.go b/js/jsmodules.go index e6f96eb2882..61a48e6b011 100644 --- a/js/jsmodules.go +++ b/js/jsmodules.go @@ -8,6 +8,7 @@ import ( "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules/k6" + "go.k6.io/k6/js/modules/k6/cloud" "go.k6.io/k6/js/modules/k6/crypto" "go.k6.io/k6/js/modules/k6/crypto/x509" "go.k6.io/k6/js/modules/k6/data" @@ -32,6 +33,7 @@ import ( func getInternalJSModules() map[string]interface{} { return map[string]interface{}{ "k6": k6.New(), + "k6/cloud": cloud.New(), "k6/crypto": crypto.New(), "k6/crypto/x509": x509.New(), "k6/data": data.New(), diff --git a/js/modules/k6/cloud/cloud.go b/js/modules/k6/cloud/cloud.go new file mode 100644 index 00000000000..c15a20e7162 --- /dev/null +++ b/js/modules/k6/cloud/cloud.go @@ -0,0 +1,93 @@ +// Package cloud implements k6/cloud which lets script find out more about the Cloud execution. +package cloud + +import ( + "bytes" + "encoding/json" + "errors" + + "github.com/grafana/sobek" + + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules" +) + +type ( + // RootModule is the global module instance that will create module + // instances for each VU. + RootModule struct{} + + // ModuleInstance represents an instance of the execution module. + ModuleInstance struct { + vu modules.VU + obj *sobek.Object + } +) + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &ModuleInstance{} +) + +// New returns a pointer to a new RootModule instance. +func New() *RootModule { + return &RootModule{} +} + +// NewModuleInstance implements the modules.Module interface to return +// a new instance for each VU. +func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { + mi := &ModuleInstance{vu: vu} + rt := vu.Runtime() + o := rt.NewObject() + defProp := func(name string, getter func() (sobek.Value, error)) { + err := o.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value { + obj, err := getter() + if err != nil { + common.Throw(rt, err) + } + return obj + }), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE) + if err != nil { + common.Throw(rt, err) + } + } + defProp("testRunId", mi.testRunId) + + mi.obj = o + + return mi +} + +// Exports returns the exports of the execution module. +func (mi *ModuleInstance) Exports() modules.Exports { + return modules.Exports{Default: mi.obj} +} + +var errRunInInitContext = errors.New("getting cloud information outside of the VU context is not supported") + +// testRunId returns a sobek.Value(string) with the Cloud test run id. +func (mi *ModuleInstance) testRunId() (sobek.Value, error) { + rt := mi.vu.Runtime() + vuState := mi.vu.State() + if vuState == nil { + return sobek.Undefined(), errRunInInitContext + } + + if vuState.Options.Cloud == nil { + return sobek.Undefined(), nil + } + + config := make(map[string]interface{}) + reader := bytes.NewReader(vuState.Options.Cloud) + if err := json.NewDecoder(reader).Decode(&config); err != nil { + return nil, err + } + + testRunId, hasTestRunId := config["testRunId"] + if !hasTestRunId { + return sobek.Undefined(), nil + } + + return rt.ToValue(testRunId), nil +} diff --git a/js/modules/k6/cloud/cloud_test.go b/js/modules/k6/cloud/cloud_test.go new file mode 100644 index 00000000000..7b54c6e0f8d --- /dev/null +++ b/js/modules/k6/cloud/cloud_test.go @@ -0,0 +1,54 @@ +package cloud + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.k6.io/k6/js/modulestest" + "go.k6.io/k6/lib" +) + +func setupCloudTestEnv(t *testing.T) *modulestest.Runtime { + tRt := modulestest.NewRuntime(t) + m, ok := New().NewModuleInstance(tRt.VU).(*ModuleInstance) + require.True(t, ok) + require.NoError(t, tRt.VU.Runtime().Set("cloud", m.Exports().Default)) + return tRt +} + +func TestGetTestRunId(t *testing.T) { + t.Parallel() + + t.Run("init context", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t) + _, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.ErrorIs(t, err, errRunInInitContext) + }) + + t.Run("undefined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t) + tRt.MoveToVUContext(&lib.State{ + Options: lib.Options{}, + }) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, "undefined", testRunId.String()) + }) + + t.Run("defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t) + tRt.MoveToVUContext(&lib.State{ + Options: lib.Options{ + Cloud: []byte(`{"testRunId": "123"}`), + }, + }) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, "123", testRunId.String()) + }) +} diff --git a/lib/options.go b/lib/options.go index e4f3a2d6929..55c0dcc84c8 100644 --- a/lib/options.go +++ b/lib/options.go @@ -308,8 +308,7 @@ type Options struct { // iteration is shorter than the specified value. MinIterationDuration types.NullDuration `json:"minIterationDuration" envconfig:"K6_MIN_ITERATION_DURATION"` - // Cloud is the config for the cloud - // formally known as ext.loadimpact + // Cloud is the configuration for the k6 Cloud, formerly known as ext.loadimpact. Cloud json.RawMessage `json:"cloud,omitempty"` // These values are for third party collectors' benefit. From 20fe5714a9df84cb5f480690e87a3d97ba6f1cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 28 Nov 2024 10:35:32 +0100 Subject: [PATCH 02/11] Create test run before starting output.Cloud --- cmd/cloud.go | 27 +---- cmd/outputs.go | 12 +- cmd/outputs_cloud.go | 185 ++++++++++++++++++++++++++++++ cmd/test_load.go | 2 +- js/modules/k6/cloud/cloud.go | 17 +-- js/modules/k6/cloud/cloud_test.go | 3 +- lib/options.go | 9 ++ output/cloud/output.go | 128 +++------------------ 8 files changed, 223 insertions(+), 160 deletions(-) create mode 100644 cmd/outputs_cloud.go diff --git a/cmd/cloud.go b/cmd/cloud.go index 64d8e9e1780..9c189606c04 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -1,7 +1,6 @@ package cmd import ( - "bytes" "context" "encoding/json" "errors" @@ -183,15 +182,11 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { return err } + refID := cloudTestRun.ReferenceID if cloudTestRun.ConfigOverride != nil { cloudConfig = cloudConfig.Apply(*cloudTestRun.ConfigOverride) } - refID := cloudTestRun.ReferenceID - if err := storeTestRunIdInCloudOptions(testRunState, refID); err != nil { - return err - } - // Trap Interrupts, SIGINTs and SIGTERMs. gracefulStop := func(sig os.Signal) { logger.WithField("sig", sig).Print("Stopping cloud test run in response to signal...") @@ -427,23 +422,3 @@ func exactCloudArgs() cobra.PositionalArgs { return nil } } - -const testRunIdKey = "testRunId" - -func storeTestRunIdInCloudOptions(testRunState *lib.TestRunState, testRunId string) error { - tmpConfig, err := cloudapi.GetTemporaryCloudConfig(testRunState.Options.Cloud, nil) - if err != nil { - return err - } - - tmpConfig[testRunIdKey] = testRunId - - buffer := &bytes.Buffer{} - if err := json.NewEncoder(buffer).Encode(tmpConfig); err != nil { - return err - } - - testRunState.Options.Cloud = buffer.Bytes() - - return nil -} diff --git a/cmd/outputs.go b/cmd/outputs.go index fa50cd1530f..552b3c2eb4c 100644 --- a/cmd/outputs.go +++ b/cmd/outputs.go @@ -113,7 +113,6 @@ func createOutputs( StdOut: gs.Stdout, StdErr: gs.Stderr, FS: gs.FS, - ScriptOptions: test.derivedConfig.Options, RuntimeOptions: test.preInitState.RuntimeOptions, ExecutionPlan: executionPlan, Usage: test.preInitState.Usage, @@ -142,10 +141,19 @@ func createOutputs( } } + jsonConfig := test.derivedConfig.Collectors[outputType] + if isCloudOutput(outputType) { + jsonConfig, err = createCloudTest(gs, test, executionPlan, outputType, outputArg) + if err != nil { + return nil, fmt.Errorf("could not create the '%s' output: %w", outputType, err) + } + } + params := baseParams params.OutputType = outputType params.ConfigArgument = outputArg - params.JSONConfig = test.derivedConfig.Collectors[outputType] + params.JSONConfig = jsonConfig + params.ScriptOptions = test.derivedConfig.Options out, err := outputConstructor(params) if err != nil { diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go new file mode 100644 index 00000000000..79c7526322d --- /dev/null +++ b/cmd/outputs_cloud.go @@ -0,0 +1,185 @@ +package cmd + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" + + "go.k6.io/k6/cloudapi" + "go.k6.io/k6/cmd/state" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/consts" + "go.k6.io/k6/metrics" +) + +const defaultTestName = "k6 test" + +func isCloudOutput(outputType string) bool { + return outputType == builtinOutputCloud.String() +} + +// createCloudTest performs some test and Cloud configuration validations and if everything +// looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud. +// It is also responsible for filling the test run id on the test options, so it can be used later. +// It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, +// or an error if something goes wrong. +func createCloudTest( + gs *state.GlobalState, test *loadedAndConfiguredTest, executionPlan []lib.ExecutionStep, + outputType, outputArg string, +) (json.RawMessage, error) { + conf, warn, err := cloudapi.GetConsolidatedConfig( + test.derivedConfig.Collectors[outputType], + gs.Env, + outputArg, + test.derivedConfig.Options.Cloud, + test.derivedConfig.Options.External, + ) + if err != nil { + return nil, err + } + + if warn != "" { + gs.Logger.Warn(warn) + } + + logger := gs.Logger.WithFields(logrus.Fields{"output": "cloud"}) + + if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil { + return nil, err + } + + if !conf.Name.Valid || conf.Name.String == "" { + scriptPath := test.source.URL.String() + if scriptPath == "" { + // Script from stdin without a name, likely from stdin + return nil, errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") + } + + conf.Name = null.StringFrom(filepath.Base(scriptPath)) + } + if conf.Name.String == "-" { + conf.Name = null.StringFrom(defaultTestName) + } + + // We need to propagate the test run id to the derived config + // and to the init runner options, so they can be used later. + setTestRunId := func(id null.String) error { + // TODO: Not sure if this is a good idea :thinking: + runnerOpts := test.initRunner.GetOptions() + runnerOpts.TestRunID = id + err := test.initRunner.SetOptions(runnerOpts) + if err != nil { + return err + } + + test.derivedConfig.TestRunID = id + return nil + } + + // If this is true, then it means that this code is being executed in the k6 Cloud. + // Therefore, we don't need to continue with the test run creation, + // as we don't need to create any test run. + // Precisely, the identifier of the test run is conf.PushRefID. + if conf.PushRefID.Valid { + if err := setTestRunId(conf.PushRefID); err != nil { + return nil, err + } + return cloudConfToRawMessage(conf) + } + + // If not, we continue with the creation of the test run. + thresholds := make(map[string][]string) + for name, t := range test.derivedConfig.Thresholds { + for _, threshold := range t.Thresholds { + thresholds[name] = append(thresholds[name], threshold.Source) + } + } + + duration, testEnds := lib.GetEndOffset(executionPlan) + if !testEnds { + return nil, errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") + } + + if conf.MetricPushConcurrency.Int64 < 1 { + return nil, fmt.Errorf("metrics push concurrency must be a positive number but is %d", + conf.MetricPushConcurrency.Int64) + } + + if conf.MaxTimeSeriesInBatch.Int64 < 1 { + return nil, fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", + conf.MaxTimeSeriesInBatch.Int64) + } + + var testArchive *lib.Archive + if !test.derivedConfig.NoArchiveUpload.Bool { + testArchive = test.initRunner.MakeArchive() + } + + testRun := &cloudapi.TestRun{ + Name: conf.Name.String, + ProjectID: conf.ProjectID.Int64, + VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)), + Thresholds: thresholds, + Duration: int64(duration / time.Second), + Archive: testArchive, + } + + apiClient := cloudapi.NewClient( + logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) + + response, err := apiClient.CreateTestRun(testRun) + if err != nil { + return nil, err + } + + if response.ConfigOverride != nil { + logger.WithFields(logrus.Fields{"override": response.ConfigOverride}).Debug("overriding config options") + conf = conf.Apply(*response.ConfigOverride) + } + + testRunId := null.NewString(response.ReferenceID, true) + if err := setTestRunId(testRunId); err != nil { + return nil, err + } + + return cloudConfToRawMessage(conf) +} + +// validateRequiredSystemTags checks if all required tags are present. +func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { + var missingRequiredTags []string + requiredTags := metrics.SystemTagSet(metrics.TagName | + metrics.TagMethod | + metrics.TagStatus | + metrics.TagError | + metrics.TagCheck | + metrics.TagGroup) + for _, tag := range metrics.SystemTagValues() { + if requiredTags.Has(tag) && !scriptTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return fmt.Errorf( + "the cloud output needs the following system tags enabled: %s", + strings.Join(missingRequiredTags, ", "), + ) + } + return nil +} + +func cloudConfToRawMessage(conf cloudapi.Config) (json.RawMessage, error) { + var buff bytes.Buffer + enc := json.NewEncoder(&buff) + if err := enc.Encode(conf); err != nil { + return nil, err + } + return buff.Bytes(), nil +} diff --git a/cmd/test_load.go b/cmd/test_load.go index f022e415576..1caeedebfc5 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -265,7 +265,7 @@ func loadSystemCertPool(logger logrus.FieldLogger) { func (lct *loadedAndConfiguredTest) buildTestRunState( configToReinject lib.Options, ) (*lib.TestRunState, error) { - // This might be the full derived or just the consodlidated options + // This might be the full derived or just the consolidated options if err := lct.initRunner.SetOptions(configToReinject); err != nil { return nil, err } diff --git a/js/modules/k6/cloud/cloud.go b/js/modules/k6/cloud/cloud.go index c15a20e7162..61c078daa34 100644 --- a/js/modules/k6/cloud/cloud.go +++ b/js/modules/k6/cloud/cloud.go @@ -2,8 +2,6 @@ package cloud import ( - "bytes" - "encoding/json" "errors" "github.com/grafana/sobek" @@ -74,20 +72,9 @@ func (mi *ModuleInstance) testRunId() (sobek.Value, error) { return sobek.Undefined(), errRunInInitContext } - if vuState.Options.Cloud == nil { + if !vuState.Options.TestRunID.Valid { return sobek.Undefined(), nil } - config := make(map[string]interface{}) - reader := bytes.NewReader(vuState.Options.Cloud) - if err := json.NewDecoder(reader).Decode(&config); err != nil { - return nil, err - } - - testRunId, hasTestRunId := config["testRunId"] - if !hasTestRunId { - return sobek.Undefined(), nil - } - - return rt.ToValue(testRunId), nil + return rt.ToValue(vuState.Options.TestRunID.String), nil } diff --git a/js/modules/k6/cloud/cloud_test.go b/js/modules/k6/cloud/cloud_test.go index 7b54c6e0f8d..7a98f41c0c7 100644 --- a/js/modules/k6/cloud/cloud_test.go +++ b/js/modules/k6/cloud/cloud_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v3" "go.k6.io/k6/js/modulestest" "go.k6.io/k6/lib" @@ -44,7 +45,7 @@ func TestGetTestRunId(t *testing.T) { tRt := setupCloudTestEnv(t) tRt.MoveToVUContext(&lib.State{ Options: lib.Options{ - Cloud: []byte(`{"testRunId": "123"}`), + TestRunID: null.NewString("123", true), }, }) testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) diff --git a/lib/options.go b/lib/options.go index 55c0dcc84c8..dc090dd8757 100644 --- a/lib/options.go +++ b/lib/options.go @@ -311,6 +311,12 @@ type Options struct { // Cloud is the configuration for the k6 Cloud, formerly known as ext.loadimpact. Cloud json.RawMessage `json:"cloud,omitempty"` + // TestRunID represents the test run id. + // It relies on the `K6_CLOUD_PUSH_REF_ID` environment variable, + // because that's the variable the k6-agent uses to populates it. + // It is equivalent to cloudapi.Config.PushRefID. + TestRunID null.String `json:"testRunId" envconfig:"K6_CLOUD_PUSH_REF_ID"` + // These values are for third party collectors' benefit. // Can't be set through env vars. External map[string]json.RawMessage `json:"ext" ignored:"true"` @@ -475,6 +481,9 @@ func (o Options) Apply(opts Options) Options { if opts.Cloud != nil { o.Cloud = opts.Cloud } + if opts.TestRunID.Valid { + o.TestRunID = opts.TestRunID + } if opts.External != nil { o.External = opts.External } diff --git a/output/cloud/output.go b/output/cloud/output.go index c8532a65019..a860934dee7 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -4,11 +4,10 @@ package cloud import ( "errors" "fmt" - "path/filepath" - "strings" "time" "github.com/sirupsen/logrus" + "go.k6.io/k6/cloudapi" "go.k6.io/k6/errext" "go.k6.io/k6/lib" @@ -17,12 +16,8 @@ import ( "go.k6.io/k6/output" cloudv2 "go.k6.io/k6/output/cloud/expv2" "go.k6.io/k6/usage" - "gopkg.in/guregu/null.v3" ) -// TestName is the default k6 Cloud test name -const TestName = "k6 test" - // versionedOutput represents an output implementing // metrics samples aggregation and flushing to the // Cloud remote service. @@ -56,17 +51,8 @@ type Output struct { config cloudapi.Config testRunID string - executionPlan []lib.ExecutionStep - duration int64 // in seconds - thresholds map[string][]*metrics.Threshold - - // testArchive is the test archive to be uploaded to the cloud - // before the output is Start()-ed. - // - // It is set by the SetArchive method. If it is nil, the output - // will not upload any test archive, such as when the user - // uses the --no-archive-upload flag. - testArchive *lib.Archive + duration int64 // in seconds + thresholds map[string][]*metrics.Threshold client *cloudapi.Client testStopFunc func(error) @@ -103,76 +89,25 @@ func newOutput(params output.Params) (*Output, error) { params.Logger.Warn(warn) } - if err := validateRequiredSystemTags(params.ScriptOptions.SystemTags); err != nil { - return nil, err - } - logger := params.Logger.WithFields(logrus.Fields{"output": "cloud"}) - if !conf.Name.Valid || conf.Name.String == "" { - scriptPath := params.ScriptPath.String() - if scriptPath == "" { - // Script from stdin without a name, likely from stdin - return nil, errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") - } - - conf.Name = null.StringFrom(filepath.Base(scriptPath)) - } - if conf.Name.String == "-" { - conf.Name = null.StringFrom(TestName) - } - - duration, testEnds := lib.GetEndOffset(params.ExecutionPlan) - if !testEnds { - return nil, errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") - } - - if conf.MetricPushConcurrency.Int64 < 1 { - return nil, fmt.Errorf("metrics push concurrency must be a positive number but is %d", - conf.MetricPushConcurrency.Int64) - } - - if conf.MaxTimeSeriesInBatch.Int64 < 1 { - return nil, fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", - conf.MaxTimeSeriesInBatch.Int64) - } + duration, _ := lib.GetEndOffset(params.ExecutionPlan) apiClient := cloudapi.NewClient( logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) + testRunID := params.ScriptOptions.TestRunID.String + return &Output{ - config: conf, - client: apiClient, - executionPlan: params.ExecutionPlan, - duration: int64(duration / time.Second), - logger: logger, - usage: params.Usage, + config: conf, + testRunID: testRunID, + client: apiClient, + duration: int64(duration / time.Second), + logger: logger, + usage: params.Usage, }, nil } -// validateRequiredSystemTags checks if all required tags are present. -func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { - missingRequiredTags := []string{} - requiredTags := metrics.SystemTagSet(metrics.TagName | - metrics.TagMethod | - metrics.TagStatus | - metrics.TagError | - metrics.TagCheck | - metrics.TagGroup) - for _, tag := range metrics.SystemTagValues() { - if requiredTags.Has(tag) && !scriptTags.Has(tag) { - missingRequiredTags = append(missingRequiredTags, tag.String()) - } - } - if len(missingRequiredTags) > 0 { - return fmt.Errorf( - "the cloud output needs the following system tags enabled: %s", - strings.Join(missingRequiredTags, ", "), - ) - } - return nil -} - // Start calls the k6 Cloud API to initialize the test run, and then starts the // goroutine that would listen for metric samples and send them to the cloud. func (out *Output) Start() error { @@ -182,36 +117,7 @@ func (out *Output) Start() error { return out.startVersionedOutput() } - thresholds := make(map[string][]string) - for name, t := range out.thresholds { - for _, threshold := range t { - thresholds[name] = append(thresholds[name], threshold.Source) - } - } - - testRun := &cloudapi.TestRun{ - Name: out.config.Name.String, - ProjectID: out.config.ProjectID.Int64, - VUsMax: int64(lib.GetMaxPossibleVUs(out.executionPlan)), - Thresholds: thresholds, - Duration: out.duration, - Archive: out.testArchive, - } - - response, err := out.client.CreateTestRun(testRun) - if err != nil { - return err - } - out.testRunID = response.ReferenceID - - if response.ConfigOverride != nil { - out.logger.WithFields(logrus.Fields{ - "override": response.ConfigOverride, - }).Debug("overriding config options") - out.config = out.config.Apply(*response.ConfigOverride) - } - - err = out.startVersionedOutput() + err := out.startVersionedOutput() if err != nil { return fmt.Errorf("the Gateway Output failed to start a versioned output: %w", err) } @@ -244,14 +150,6 @@ func (out *Output) SetTestRunStopCallback(stopFunc func(error)) { out.testStopFunc = stopFunc } -// SetArchive receives the test artifact to be uploaded to the cloud -// before the output is Start()-ed. -func (out *Output) SetArchive(archive *lib.Archive) { - out.testArchive = archive -} - -var _ output.WithArchive = &Output{} - // Stop gracefully stops all metric emission from the output: when all metric // samples are emitted, it makes a cloud API call to finish the test run. // From 436e0a9edc2275161dec50c030d2cdf3f917c52d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 28 Nov 2024 13:53:12 +0100 Subject: [PATCH 03/11] Create the cloud test run earlier --- cmd/outputs.go | 12 ++------- cmd/outputs_cloud.go | 63 ++++++++++++++++++++++++++------------------ cmd/run.go | 9 +++++++ 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/cmd/outputs.go b/cmd/outputs.go index 552b3c2eb4c..fa50cd1530f 100644 --- a/cmd/outputs.go +++ b/cmd/outputs.go @@ -113,6 +113,7 @@ func createOutputs( StdOut: gs.Stdout, StdErr: gs.Stderr, FS: gs.FS, + ScriptOptions: test.derivedConfig.Options, RuntimeOptions: test.preInitState.RuntimeOptions, ExecutionPlan: executionPlan, Usage: test.preInitState.Usage, @@ -141,19 +142,10 @@ func createOutputs( } } - jsonConfig := test.derivedConfig.Collectors[outputType] - if isCloudOutput(outputType) { - jsonConfig, err = createCloudTest(gs, test, executionPlan, outputType, outputArg) - if err != nil { - return nil, fmt.Errorf("could not create the '%s' output: %w", outputType, err) - } - } - params := baseParams params.OutputType = outputType params.ConfigArgument = outputArg - params.JSONConfig = jsonConfig - params.ScriptOptions = test.derivedConfig.Options + params.JSONConfig = test.derivedConfig.Collectors[outputType] out, err := outputConstructor(params) if err != nil { diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go index 79c7526322d..eb6e742bb11 100644 --- a/cmd/outputs_cloud.go +++ b/cmd/outputs_cloud.go @@ -21,8 +21,14 @@ import ( const defaultTestName = "k6 test" -func isCloudOutput(outputType string) bool { - return outputType == builtinOutputCloud.String() +func findCloudOutput(outputs []string) (string, string, bool) { + for _, outFullArg := range outputs { + outType, outArg, _ := strings.Cut(outFullArg, "=") + if outType == builtinOutputCloud.String() { + return outType, outArg, true + } + } + return "", "", false } // createCloudTest performs some test and Cloud configuration validations and if everything @@ -30,10 +36,7 @@ func isCloudOutput(outputType string) bool { // It is also responsible for filling the test run id on the test options, so it can be used later. // It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, // or an error if something goes wrong. -func createCloudTest( - gs *state.GlobalState, test *loadedAndConfiguredTest, executionPlan []lib.ExecutionStep, - outputType, outputArg string, -) (json.RawMessage, error) { +func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outputType, outputArg string) error { conf, warn, err := cloudapi.GetConsolidatedConfig( test.derivedConfig.Collectors[outputType], gs.Env, @@ -42,7 +45,7 @@ func createCloudTest( test.derivedConfig.Options.External, ) if err != nil { - return nil, err + return err } if warn != "" { @@ -52,14 +55,14 @@ func createCloudTest( logger := gs.Logger.WithFields(logrus.Fields{"output": "cloud"}) if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil { - return nil, err + return err } if !conf.Name.Valid || conf.Name.String == "" { scriptPath := test.source.URL.String() if scriptPath == "" { // Script from stdin without a name, likely from stdin - return nil, errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") + return errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") } conf.Name = null.StringFrom(filepath.Base(scriptPath)) @@ -68,18 +71,17 @@ func createCloudTest( conf.Name = null.StringFrom(defaultTestName) } - // We need to propagate the test run id to the derived config - // and to the init runner options, so they can be used later. - setTestRunId := func(id null.String) error { - // TODO: Not sure if this is a good idea :thinking: - runnerOpts := test.initRunner.GetOptions() - runnerOpts.TestRunID = id - err := test.initRunner.SetOptions(runnerOpts) + // We need to propagate the test run id to the derived config, + // and update the Cloud configuration that later will be used + // by the Cloud output. + setTestRunIdAndConfig := func(id null.String, conf cloudapi.Config) error { + raw, err := cloudConfToRawMessage(conf) if err != nil { return err } test.derivedConfig.TestRunID = id + test.derivedConfig.Collectors["cloud"] = raw return nil } @@ -88,10 +90,10 @@ func createCloudTest( // as we don't need to create any test run. // Precisely, the identifier of the test run is conf.PushRefID. if conf.PushRefID.Valid { - if err := setTestRunId(conf.PushRefID); err != nil { - return nil, err + if err := setTestRunIdAndConfig(conf.PushRefID, conf); err != nil { + return err } - return cloudConfToRawMessage(conf) + return nil } // If not, we continue with the creation of the test run. @@ -102,18 +104,27 @@ func createCloudTest( } } + et, err := lib.NewExecutionTuple( + test.derivedConfig.Options.ExecutionSegment, + test.derivedConfig.Options.ExecutionSegmentSequence, + ) + if err != nil { + return err + } + executionPlan := test.derivedConfig.Options.Scenarios.GetFullExecutionRequirements(et) + duration, testEnds := lib.GetEndOffset(executionPlan) if !testEnds { - return nil, errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") + return errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") } if conf.MetricPushConcurrency.Int64 < 1 { - return nil, fmt.Errorf("metrics push concurrency must be a positive number but is %d", + return fmt.Errorf("metrics push concurrency must be a positive number but is %d", conf.MetricPushConcurrency.Int64) } if conf.MaxTimeSeriesInBatch.Int64 < 1 { - return nil, fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", + return fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", conf.MaxTimeSeriesInBatch.Int64) } @@ -136,7 +147,7 @@ func createCloudTest( response, err := apiClient.CreateTestRun(testRun) if err != nil { - return nil, err + return err } if response.ConfigOverride != nil { @@ -145,11 +156,11 @@ func createCloudTest( } testRunId := null.NewString(response.ReferenceID, true) - if err := setTestRunId(testRunId); err != nil { - return nil, err + if err := setTestRunIdAndConfig(testRunId, conf); err != nil { + return err } - return cloudConfToRawMessage(conf) + return nil } // validateRequiredSystemTags checks if all required tags are present. diff --git a/cmd/run.go b/cmd/run.go index 1a0e04c8ffc..e4e3ce05f64 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -123,6 +123,15 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } } + // Although outputs are created below, is at this point, before building + // the test run state, when we want to create the Cloud test run, if needed + // so that we can set the test run ID on the test options. + if outType, outArg, found := findCloudOutput(test.derivedConfig.Out); found { + if err := createCloudTest(c.gs, test, outType, outArg); err != nil { + return fmt.Errorf("could not create the '%s' output: %w", outType, err) + } + } + // Write the full consolidated *and derived* options back to the Runner. conf := test.derivedConfig testRunState, err := test.buildTestRunState(conf.Options) From 82400dedcf03ef28fdef9287f76f77829c3f4f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Mon, 2 Dec 2024 13:36:47 +0100 Subject: [PATCH 04/11] Move 'options.TestRunID' over 'options.cloud.TestRunID' --- cloudapi/config.go | 16 ++++++++--- cmd/outputs_cloud.go | 48 ++++++++++++------------------- js/modules/k6/cloud/cloud.go | 12 ++++++-- js/modules/k6/cloud/cloud_test.go | 3 +- lib/options.go | 9 ------ output/cloud/output.go | 4 +-- 6 files changed, 42 insertions(+), 50 deletions(-) diff --git a/cloudapi/config.go b/cloudapi/config.go index 54959d93eb4..ebe07d5a871 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -32,12 +32,17 @@ type Config struct { StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"` APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` - // PushRefID represents the test run id. - // Note: It is a legacy name used by the backend, the code in k6 open-source - // references it as test run id. - // Currently, a renaming is not planned. + // PushRefID is the identifier used by Cloud systems to correlate all the things that + // belong to the same test run/execution. Currently, it is equivalent to the TestRunID. + // But, in the future, or in future solutions (e.g. Synthetic Monitoring), there might be + // no TestRunID and we may still need an identifier to correlate all the things. PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"` + // TestRunID is the test run id, a unique identifier across all the test runs. + // It might be used to correlate all the things that belong to the same test run/execution, + // see PushRefID for more details. + TestRunID null.String `json:"testRunID" envconfig:"K6_CLOUD_TEST_RUN_ID"` + // Defines the max allowed number of time series in a single batch. MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"` @@ -121,6 +126,9 @@ func (c Config) Apply(cfg Config) Config { if cfg.PushRefID.Valid { c.PushRefID = cfg.PushRefID } + if cfg.TestRunID.Valid { + c.TestRunID = cfg.TestRunID + } if cfg.WebAppURL.Valid { c.WebAppURL = cfg.WebAppURL } diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go index eb6e742bb11..563e9ae3438 100644 --- a/cmd/outputs_cloud.go +++ b/cmd/outputs_cloud.go @@ -52,8 +52,16 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outpu gs.Logger.Warn(warn) } - logger := gs.Logger.WithFields(logrus.Fields{"output": "cloud"}) + // If this is true, then it means that this code is being executed in the k6 Cloud. + // Therefore, we don't need to continue with the test run creation, + // as we don't need to create any test run. + // + // Precisely, the identifier of the test run is conf.TestRunID. + if conf.TestRunID.Valid { + return nil + } + // If not, we continue with some validations and the creation of the test run. if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil { return err } @@ -71,32 +79,6 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outpu conf.Name = null.StringFrom(defaultTestName) } - // We need to propagate the test run id to the derived config, - // and update the Cloud configuration that later will be used - // by the Cloud output. - setTestRunIdAndConfig := func(id null.String, conf cloudapi.Config) error { - raw, err := cloudConfToRawMessage(conf) - if err != nil { - return err - } - - test.derivedConfig.TestRunID = id - test.derivedConfig.Collectors["cloud"] = raw - return nil - } - - // If this is true, then it means that this code is being executed in the k6 Cloud. - // Therefore, we don't need to continue with the test run creation, - // as we don't need to create any test run. - // Precisely, the identifier of the test run is conf.PushRefID. - if conf.PushRefID.Valid { - if err := setTestRunIdAndConfig(conf.PushRefID, conf); err != nil { - return err - } - return nil - } - - // If not, we continue with the creation of the test run. thresholds := make(map[string][]string) for name, t := range test.derivedConfig.Thresholds { for _, threshold := range t.Thresholds { @@ -142,6 +124,8 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outpu Archive: testArchive, } + logger := gs.Logger.WithFields(logrus.Fields{"output": "cloud"}) + apiClient := cloudapi.NewClient( logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) @@ -155,11 +139,15 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outpu conf = conf.Apply(*response.ConfigOverride) } - testRunId := null.NewString(response.ReferenceID, true) - if err := setTestRunIdAndConfig(testRunId, conf); err != nil { - return err + conf.TestRunID = null.NewString(response.ReferenceID, true) + + raw, err := cloudConfToRawMessage(conf) + if err != nil { + return fmt.Errorf("could not serialize cloud configuration: %w", err) } + test.derivedConfig.Collectors["cloud"] = raw + return nil } diff --git a/js/modules/k6/cloud/cloud.go b/js/modules/k6/cloud/cloud.go index 61c078daa34..a1dd2b677c2 100644 --- a/js/modules/k6/cloud/cloud.go +++ b/js/modules/k6/cloud/cloud.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/sobek" + "go.k6.io/k6/cloudapi" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" ) @@ -72,9 +73,16 @@ func (mi *ModuleInstance) testRunId() (sobek.Value, error) { return sobek.Undefined(), errRunInInitContext } - if !vuState.Options.TestRunID.Valid { + if vuState.Options.Cloud == nil { return sobek.Undefined(), nil } - return rt.ToValue(vuState.Options.TestRunID.String), nil + // We pass almost all values to zero/nil because here we only care about the cloud configuration present in options. + // TODO: Technically I guess we can do it only once and "cache" the value, as it shouldn't change over the test run. + conf, _, err := cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil) + if err != nil { + return sobek.Undefined(), err + } + + return rt.ToValue(conf.TestRunID.String), nil } diff --git a/js/modules/k6/cloud/cloud_test.go b/js/modules/k6/cloud/cloud_test.go index 7a98f41c0c7..78dcf1d402c 100644 --- a/js/modules/k6/cloud/cloud_test.go +++ b/js/modules/k6/cloud/cloud_test.go @@ -5,7 +5,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" "go.k6.io/k6/js/modulestest" "go.k6.io/k6/lib" @@ -45,7 +44,7 @@ func TestGetTestRunId(t *testing.T) { tRt := setupCloudTestEnv(t) tRt.MoveToVUContext(&lib.State{ Options: lib.Options{ - TestRunID: null.NewString("123", true), + Cloud: []byte(`{"testRunID": "123"}`), }, }) testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) diff --git a/lib/options.go b/lib/options.go index dc090dd8757..55c0dcc84c8 100644 --- a/lib/options.go +++ b/lib/options.go @@ -311,12 +311,6 @@ type Options struct { // Cloud is the configuration for the k6 Cloud, formerly known as ext.loadimpact. Cloud json.RawMessage `json:"cloud,omitempty"` - // TestRunID represents the test run id. - // It relies on the `K6_CLOUD_PUSH_REF_ID` environment variable, - // because that's the variable the k6-agent uses to populates it. - // It is equivalent to cloudapi.Config.PushRefID. - TestRunID null.String `json:"testRunId" envconfig:"K6_CLOUD_PUSH_REF_ID"` - // These values are for third party collectors' benefit. // Can't be set through env vars. External map[string]json.RawMessage `json:"ext" ignored:"true"` @@ -481,9 +475,6 @@ func (o Options) Apply(opts Options) Options { if opts.Cloud != nil { o.Cloud = opts.Cloud } - if opts.TestRunID.Valid { - o.TestRunID = opts.TestRunID - } if opts.External != nil { o.External = opts.External } diff --git a/output/cloud/output.go b/output/cloud/output.go index a860934dee7..905640fc5cf 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -96,11 +96,9 @@ func newOutput(params output.Params) (*Output, error) { apiClient := cloudapi.NewClient( logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) - testRunID := params.ScriptOptions.TestRunID.String - return &Output{ config: conf, - testRunID: testRunID, + testRunID: conf.TestRunID.String, client: apiClient, duration: int64(duration / time.Second), logger: logger, From e7eae06b19faa3546d2ad46881cc6db5c6793dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 5 Dec 2024 14:47:42 +0100 Subject: [PATCH 05/11] Cloud local execution creates the test run before delegating --- cmd/cloud_run.go | 19 +++++ cmd/outputs_cloud.go | 20 ++--- cmd/run.go | 9 -- js/modules/k6/cloud/cloud.go | 64 ++++++++++----- js/modules/k6/cloud/cloud_test.go | 76 +++++++++++------ output/cloud/output.go | 132 +++++++++++++++++++++++++++--- 6 files changed, 241 insertions(+), 79 deletions(-) diff --git a/cmd/cloud_run.go b/cmd/cloud_run.go index 54b13f5c34a..866a5b94af8 100644 --- a/cmd/cloud_run.go +++ b/cmd/cloud_run.go @@ -131,6 +131,25 @@ func (c *cmdCloudRun) preRun(cmd *cobra.Command, args []string) error { func (c *cmdCloudRun) run(cmd *cobra.Command, args []string) error { if c.localExecution { + // We know this execution requires a test run to be created in the Cloud. + // So, we create it before delegating the actual execution to the run command. + // To do that, we need to load the test and configure it. + test, err := loadAndConfigureLocalTest(c.runCmd.gs, cmd, args, getCloudRunLocalExecutionConfig) + if err != nil { + return fmt.Errorf("could not load and configure the test: %w", err) + } + + // As we've already loaded the test, we can modify the init function to + // reuse the initialized one. + c.runCmd.loadConfiguredTest = func(*cobra.Command, []string) (*loadedAndConfiguredTest, execution.Controller, error) { + return test, local.NewController(), nil + } + + // After that, we can create the remote test run. + if err := createCloudTest(c.runCmd.gs, test); err != nil { + return fmt.Errorf("could not create the cloud test run: %w", err) + } + return c.runCmd.run(cmd, args) } diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go index 563e9ae3438..ef2f64daffa 100644 --- a/cmd/outputs_cloud.go +++ b/cmd/outputs_cloud.go @@ -21,26 +21,16 @@ import ( const defaultTestName = "k6 test" -func findCloudOutput(outputs []string) (string, string, bool) { - for _, outFullArg := range outputs { - outType, outArg, _ := strings.Cut(outFullArg, "=") - if outType == builtinOutputCloud.String() { - return outType, outArg, true - } - } - return "", "", false -} - // createCloudTest performs some test and Cloud configuration validations and if everything // looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud. // It is also responsible for filling the test run id on the test options, so it can be used later. // It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, // or an error if something goes wrong. -func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outputType, outputArg string) error { +func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error { conf, warn, err := cloudapi.GetConsolidatedConfig( - test.derivedConfig.Collectors[outputType], + test.derivedConfig.Collectors[builtinOutputCloud.String()], gs.Env, - outputArg, + "", // Historically used for -o cloud=..., no longer used (deprecated). test.derivedConfig.Options.Cloud, test.derivedConfig.Options.External, ) @@ -124,7 +114,7 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outpu Archive: testArchive, } - logger := gs.Logger.WithFields(logrus.Fields{"output": "cloud"}) + logger := gs.Logger.WithFields(logrus.Fields{"output": builtinOutputCloud.String()}) apiClient := cloudapi.NewClient( logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) @@ -146,7 +136,7 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest, outpu return fmt.Errorf("could not serialize cloud configuration: %w", err) } - test.derivedConfig.Collectors["cloud"] = raw + test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw return nil } diff --git a/cmd/run.go b/cmd/run.go index e4e3ce05f64..1a0e04c8ffc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -123,15 +123,6 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } } - // Although outputs are created below, is at this point, before building - // the test run state, when we want to create the Cloud test run, if needed - // so that we can set the test run ID on the test options. - if outType, outArg, found := findCloudOutput(test.derivedConfig.Out); found { - if err := createCloudTest(c.gs, test, outType, outArg); err != nil { - return fmt.Errorf("could not create the '%s' output: %w", outType, err) - } - } - // Write the full consolidated *and derived* options back to the Runner. conf := test.derivedConfig testRunState, err := test.buildTestRunState(conf.Options) diff --git a/js/modules/k6/cloud/cloud.go b/js/modules/k6/cloud/cloud.go index a1dd2b677c2..ddc7a2da6f9 100644 --- a/js/modules/k6/cloud/cloud.go +++ b/js/modules/k6/cloud/cloud.go @@ -2,9 +2,10 @@ package cloud import ( - "errors" + "sync" "github.com/grafana/sobek" + "github.com/mstoykov/envconfig" "go.k6.io/k6/cloudapi" "go.k6.io/k6/js/common" @@ -20,6 +21,9 @@ type ( ModuleInstance struct { vu modules.VU obj *sobek.Object + + once sync.Once + testRunID sobek.Value } ) @@ -37,10 +41,12 @@ func New() *RootModule { // a new instance for each VU. func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { mi := &ModuleInstance{vu: vu} + rt := vu.Runtime() - o := rt.NewObject() + + mi.obj = rt.NewObject() defProp := func(name string, getter func() (sobek.Value, error)) { - err := o.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value { + err := mi.obj.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value { obj, err := getter() if err != nil { common.Throw(rt, err) @@ -53,7 +59,17 @@ func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { } defProp("testRunId", mi.testRunId) - mi.obj = o + // By default, we try to load the test run id from the environment variables, + // which corresponds to those scenarios where the k6 binary is running in the Cloud. + var envConf cloudapi.Config + if err := envconfig.Process("", &envConf, vu.InitEnv().LookupEnv); err != nil { + common.Throw(vu.Runtime(), err) + } + if envConf.TestRunID.Valid { + mi.testRunID = mi.vu.Runtime().ToValue(envConf.TestRunID.String) + } else { + mi.testRunID = sobek.Undefined() // Default value. + } return mi } @@ -63,26 +79,32 @@ func (mi *ModuleInstance) Exports() modules.Exports { return modules.Exports{Default: mi.obj} } -var errRunInInitContext = errors.New("getting cloud information outside of the VU context is not supported") - // testRunId returns a sobek.Value(string) with the Cloud test run id. +// +// This code can be executed in two situations, either when the k6 binary is running in the Cloud, in which case +// the value of the test run id would be available in the environment, and we would have loaded at module initialization +// time; or when the k6 binary is running locally and test run id is present in the options, which we try to read at +// time of running this method, but only once for the whole execution as options won't change anymore. func (mi *ModuleInstance) testRunId() (sobek.Value, error) { - rt := mi.vu.Runtime() - vuState := mi.vu.State() - if vuState == nil { - return sobek.Undefined(), errRunInInitContext - } - - if vuState.Options.Cloud == nil { - return sobek.Undefined(), nil + // In case we have a value (e.g. loaded from env), we return it. + // If we're in the init context (where we cannot read the options), we return undefined (the default value). + if !sobek.IsUndefined(mi.testRunID) || mi.vu.State() == nil { + return mi.testRunID, nil } - // We pass almost all values to zero/nil because here we only care about the cloud configuration present in options. - // TODO: Technically I guess we can do it only once and "cache" the value, as it shouldn't change over the test run. - conf, _, err := cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil) - if err != nil { - return sobek.Undefined(), err - } + // Otherwise, we try to read the test run id from options. + // We only try it once for the whole execution, as options won't change. + vuState := mi.vu.State() + var err error + mi.once.Do(func() { + // We pass almost all values to zero/nil because here we only care about the Cloud configuration present in options. + var optsConf cloudapi.Config + optsConf, _, err = cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil) + + if optsConf.TestRunID.Valid { + mi.testRunID = mi.vu.Runtime().ToValue(optsConf.TestRunID.String) + } + }) - return rt.ToValue(conf.TestRunID.String), nil + return mi.testRunID, err } diff --git a/js/modules/k6/cloud/cloud_test.go b/js/modules/k6/cloud/cloud_test.go index 78dcf1d402c..4adcfd18bf0 100644 --- a/js/modules/k6/cloud/cloud_test.go +++ b/js/modules/k6/cloud/cloud_test.go @@ -3,6 +3,7 @@ package cloud import ( "testing" + "github.com/grafana/sobek" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -10,8 +11,12 @@ import ( "go.k6.io/k6/lib" ) -func setupCloudTestEnv(t *testing.T) *modulestest.Runtime { +func setupCloudTestEnv(t *testing.T, env map[string]string) *modulestest.Runtime { tRt := modulestest.NewRuntime(t) + tRt.VU.InitEnv().LookupEnv = func(key string) (string, bool) { + v, ok := env[key] + return v, ok + } m, ok := New().NewModuleInstance(tRt.VU).(*ModuleInstance) require.True(t, ok) require.NoError(t, tRt.VU.Runtime().Set("cloud", m.Exports().Default)) @@ -21,34 +26,59 @@ func setupCloudTestEnv(t *testing.T) *modulestest.Runtime { func TestGetTestRunId(t *testing.T) { t.Parallel() - t.Run("init context", func(t *testing.T) { + t.Run("Cloud execution", func(t *testing.T) { t.Parallel() - tRt := setupCloudTestEnv(t) - _, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.ErrorIs(t, err, errRunInInitContext) - }) - t.Run("undefined", func(t *testing.T) { - t.Parallel() - tRt := setupCloudTestEnv(t) - tRt.MoveToVUContext(&lib.State{ - Options: lib.Options{}, + t.Run("Not defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, sobek.Undefined(), testRunId) + }) + + t.Run("Defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, map[string]string{"K6_CLOUD_TEST_RUN_ID": "123"}) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, "123", testRunId.String()) }) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, "undefined", testRunId.String()) }) - t.Run("defined", func(t *testing.T) { + t.Run("Local execution", func(t *testing.T) { t.Parallel() - tRt := setupCloudTestEnv(t) - tRt.MoveToVUContext(&lib.State{ - Options: lib.Options{ - Cloud: []byte(`{"testRunID": "123"}`), - }, + + t.Run("Init context", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, sobek.Undefined(), testRunId) + }) + + t.Run("Not defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + tRt.MoveToVUContext(&lib.State{ + Options: lib.Options{}, + }) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, sobek.Undefined(), testRunId) + }) + + t.Run("Defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + tRt.MoveToVUContext(&lib.State{ + Options: lib.Options{ + Cloud: []byte(`{"testRunID": "123"}`), + }, + }) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, "123", testRunId.String()) }) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, "123", testRunId.String()) }) } diff --git a/output/cloud/output.go b/output/cloud/output.go index 905640fc5cf..5054fcfd65e 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -4,9 +4,12 @@ package cloud import ( "errors" "fmt" + "path/filepath" + "strings" "time" "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" "go.k6.io/k6/cloudapi" "go.k6.io/k6/errext" @@ -18,6 +21,9 @@ import ( "go.k6.io/k6/usage" ) +// TestName is the default k6 Cloud test name +const TestName = "k6 test" + // versionedOutput represents an output implementing // metrics samples aggregation and flushing to the // Cloud remote service. @@ -51,8 +57,17 @@ type Output struct { config cloudapi.Config testRunID string - duration int64 // in seconds - thresholds map[string][]*metrics.Threshold + executionPlan []lib.ExecutionStep + duration int64 // in seconds + thresholds map[string][]*metrics.Threshold + + // testArchive is the test archive to be uploaded to the cloud + // before the output is Start()-ed. + // + // It is set by the SetArchive method. If it is nil, the output + // will not upload any test archive, such as when the user + // uses the --no-archive-upload flag. + testArchive *lib.Archive client *cloudapi.Client testStopFunc func(error) @@ -89,33 +104,120 @@ func newOutput(params output.Params) (*Output, error) { params.Logger.Warn(warn) } + if err := validateRequiredSystemTags(params.ScriptOptions.SystemTags); err != nil { + return nil, err + } + logger := params.Logger.WithFields(logrus.Fields{"output": "cloud"}) - duration, _ := lib.GetEndOffset(params.ExecutionPlan) + if !conf.Name.Valid || conf.Name.String == "" { + scriptPath := params.ScriptPath.String() + if scriptPath == "" { + // Script from stdin without a name, likely from stdin + return nil, errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") + } + + conf.Name = null.StringFrom(filepath.Base(scriptPath)) + } + if conf.Name.String == "-" { + conf.Name = null.StringFrom(TestName) + } + + duration, testEnds := lib.GetEndOffset(params.ExecutionPlan) + if !testEnds { + return nil, errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") + } + + if conf.MetricPushConcurrency.Int64 < 1 { + return nil, fmt.Errorf("metrics push concurrency must be a positive number but is %d", + conf.MetricPushConcurrency.Int64) + } + + if conf.MaxTimeSeriesInBatch.Int64 < 1 { + return nil, fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", + conf.MaxTimeSeriesInBatch.Int64) + } apiClient := cloudapi.NewClient( logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) return &Output{ - config: conf, - testRunID: conf.TestRunID.String, - client: apiClient, - duration: int64(duration / time.Second), - logger: logger, - usage: params.Usage, + config: conf, + client: apiClient, + executionPlan: params.ExecutionPlan, + duration: int64(duration / time.Second), + logger: logger, + usage: params.Usage, }, nil } +// validateRequiredSystemTags checks if all required tags are present. +func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { + missingRequiredTags := []string{} + requiredTags := metrics.SystemTagSet(metrics.TagName | + metrics.TagMethod | + metrics.TagStatus | + metrics.TagError | + metrics.TagCheck | + metrics.TagGroup) + for _, tag := range metrics.SystemTagValues() { + if requiredTags.Has(tag) && !scriptTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return fmt.Errorf( + "the cloud output needs the following system tags enabled: %s", + strings.Join(missingRequiredTags, ", "), + ) + } + return nil +} + // Start calls the k6 Cloud API to initialize the test run, and then starts the // goroutine that would listen for metric samples and send them to the cloud. func (out *Output) Start() error { - if out.config.PushRefID.Valid { + if out.config.TestRunID.Valid { + out.testRunID = out.config.TestRunID.String + } else if out.config.PushRefID.Valid { out.testRunID = out.config.PushRefID.String + } + + if out.testRunID != "" { out.logger.WithField("testRunId", out.testRunID).Debug("Directly pushing metrics without init") return out.startVersionedOutput() } - err := out.startVersionedOutput() + thresholds := make(map[string][]string) + for name, t := range out.thresholds { + for _, threshold := range t { + thresholds[name] = append(thresholds[name], threshold.Source) + } + } + + testRun := &cloudapi.TestRun{ + Name: out.config.Name.String, + ProjectID: out.config.ProjectID.Int64, + VUsMax: int64(lib.GetMaxPossibleVUs(out.executionPlan)), + Thresholds: thresholds, + Duration: out.duration, + Archive: out.testArchive, + } + + response, err := out.client.CreateTestRun(testRun) + if err != nil { + return err + } + out.testRunID = response.ReferenceID + + if response.ConfigOverride != nil { + out.logger.WithFields(logrus.Fields{ + "override": response.ConfigOverride, + }).Debug("overriding config options") + out.config = out.config.Apply(*response.ConfigOverride) + } + + err = out.startVersionedOutput() if err != nil { return fmt.Errorf("the Gateway Output failed to start a versioned output: %w", err) } @@ -148,6 +250,14 @@ func (out *Output) SetTestRunStopCallback(stopFunc func(error)) { out.testStopFunc = stopFunc } +// SetArchive receives the test artifact to be uploaded to the cloud +// before the output is Start()-ed. +func (out *Output) SetArchive(archive *lib.Archive) { + out.testArchive = archive +} + +var _ output.WithArchive = &Output{} + // Stop gracefully stops all metric emission from the output: when all metric // samples are emitted, it makes a cloud API call to finish the test run. // From 6cceed4f5b2ba8d4bc17dcebb59ceb4c367660f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Tue, 17 Dec 2024 18:17:19 +0100 Subject: [PATCH 06/11] Get rid of the recently introduced options.cloud.TestRunID --- cloudapi/config.go | 12 ++---------- cmd/outputs_cloud.go | 33 +++++++++++++++------------------ output/cloud/output.go | 6 ------ 3 files changed, 17 insertions(+), 34 deletions(-) diff --git a/cloudapi/config.go b/cloudapi/config.go index ebe07d5a871..41a4078c8b8 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -33,16 +33,11 @@ type Config struct { APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` // PushRefID is the identifier used by Cloud systems to correlate all the things that - // belong to the same test run/execution. Currently, it is equivalent to the TestRunID. + // belong to the same test run/execution. Currently, it is equivalent to the test run id. // But, in the future, or in future solutions (e.g. Synthetic Monitoring), there might be - // no TestRunID and we may still need an identifier to correlate all the things. + // no test run id, and we may still need an identifier to correlate all the things. PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"` - // TestRunID is the test run id, a unique identifier across all the test runs. - // It might be used to correlate all the things that belong to the same test run/execution, - // see PushRefID for more details. - TestRunID null.String `json:"testRunID" envconfig:"K6_CLOUD_TEST_RUN_ID"` - // Defines the max allowed number of time series in a single batch. MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"` @@ -126,9 +121,6 @@ func (c Config) Apply(cfg Config) Config { if cfg.PushRefID.Valid { c.PushRefID = cfg.PushRefID } - if cfg.TestRunID.Valid { - c.TestRunID = cfg.TestRunID - } if cfg.WebAppURL.Valid { c.WebAppURL = cfg.WebAppURL } diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go index ef2f64daffa..f1335dc68e7 100644 --- a/cmd/outputs_cloud.go +++ b/cmd/outputs_cloud.go @@ -19,7 +19,10 @@ import ( "go.k6.io/k6/metrics" ) -const defaultTestName = "k6 test" +const ( + defaultTestName = "k6 test" + testRunIDKey = "K6_CLOUDRUN_TEST_RUN_ID" +) // createCloudTest performs some test and Cloud configuration validations and if everything // looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud. @@ -27,6 +30,16 @@ const defaultTestName = "k6 test" // It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, // or an error if something goes wrong. func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error { + // If the "K6_CLOUDRUN_TEST_RUN_ID" is set, then it means that this code is being executed in the k6 Cloud. + // Therefore, we don't need to continue with the test run creation, as we don't need to create any test run. + // + // This should technically never happen, as k6, when executed in the Cloud, it uses the standard "run" + // command "locally", but we add this early return just in case, for safety. + if _, isSet := gs.Env[testRunIDKey]; isSet { + return nil + } + + // Otherwise, we continue normally with the creation of the test run in the k6 Cloud backend services. conf, warn, err := cloudapi.GetConsolidatedConfig( test.derivedConfig.Collectors[builtinOutputCloud.String()], gs.Env, @@ -42,15 +55,6 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error gs.Logger.Warn(warn) } - // If this is true, then it means that this code is being executed in the k6 Cloud. - // Therefore, we don't need to continue with the test run creation, - // as we don't need to create any test run. - // - // Precisely, the identifier of the test run is conf.TestRunID. - if conf.TestRunID.Valid { - return nil - } - // If not, we continue with some validations and the creation of the test run. if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil { return err @@ -129,14 +133,7 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error conf = conf.Apply(*response.ConfigOverride) } - conf.TestRunID = null.NewString(response.ReferenceID, true) - - raw, err := cloudConfToRawMessage(conf) - if err != nil { - return fmt.Errorf("could not serialize cloud configuration: %w", err) - } - - test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw + test.preInitState.RuntimeOptions.Env[testRunIDKey] = response.ReferenceID return nil } diff --git a/output/cloud/output.go b/output/cloud/output.go index 5054fcfd65e..f7e63dfdb8d 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -177,12 +177,6 @@ func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { // Start calls the k6 Cloud API to initialize the test run, and then starts the // goroutine that would listen for metric samples and send them to the cloud. func (out *Output) Start() error { - if out.config.TestRunID.Valid { - out.testRunID = out.config.TestRunID.String - } else if out.config.PushRefID.Valid { - out.testRunID = out.config.PushRefID.String - } - if out.testRunID != "" { out.logger.WithField("testRunId", out.testRunID).Debug("Directly pushing metrics without init") return out.startVersionedOutput() From 8a897226fe148d0532f111ca5b4440700acd52e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Tue, 17 Dec 2024 18:18:44 +0100 Subject: [PATCH 07/11] Remove the k6/cloud JS module --- js/modules/k6/cloud/cloud.go | 110 ------------------------------ js/modules/k6/cloud/cloud_test.go | 84 ----------------------- 2 files changed, 194 deletions(-) delete mode 100644 js/modules/k6/cloud/cloud.go delete mode 100644 js/modules/k6/cloud/cloud_test.go diff --git a/js/modules/k6/cloud/cloud.go b/js/modules/k6/cloud/cloud.go deleted file mode 100644 index ddc7a2da6f9..00000000000 --- a/js/modules/k6/cloud/cloud.go +++ /dev/null @@ -1,110 +0,0 @@ -// Package cloud implements k6/cloud which lets script find out more about the Cloud execution. -package cloud - -import ( - "sync" - - "github.com/grafana/sobek" - "github.com/mstoykov/envconfig" - - "go.k6.io/k6/cloudapi" - "go.k6.io/k6/js/common" - "go.k6.io/k6/js/modules" -) - -type ( - // RootModule is the global module instance that will create module - // instances for each VU. - RootModule struct{} - - // ModuleInstance represents an instance of the execution module. - ModuleInstance struct { - vu modules.VU - obj *sobek.Object - - once sync.Once - testRunID sobek.Value - } -) - -var ( - _ modules.Module = &RootModule{} - _ modules.Instance = &ModuleInstance{} -) - -// New returns a pointer to a new RootModule instance. -func New() *RootModule { - return &RootModule{} -} - -// NewModuleInstance implements the modules.Module interface to return -// a new instance for each VU. -func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { - mi := &ModuleInstance{vu: vu} - - rt := vu.Runtime() - - mi.obj = rt.NewObject() - defProp := func(name string, getter func() (sobek.Value, error)) { - err := mi.obj.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value { - obj, err := getter() - if err != nil { - common.Throw(rt, err) - } - return obj - }), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE) - if err != nil { - common.Throw(rt, err) - } - } - defProp("testRunId", mi.testRunId) - - // By default, we try to load the test run id from the environment variables, - // which corresponds to those scenarios where the k6 binary is running in the Cloud. - var envConf cloudapi.Config - if err := envconfig.Process("", &envConf, vu.InitEnv().LookupEnv); err != nil { - common.Throw(vu.Runtime(), err) - } - if envConf.TestRunID.Valid { - mi.testRunID = mi.vu.Runtime().ToValue(envConf.TestRunID.String) - } else { - mi.testRunID = sobek.Undefined() // Default value. - } - - return mi -} - -// Exports returns the exports of the execution module. -func (mi *ModuleInstance) Exports() modules.Exports { - return modules.Exports{Default: mi.obj} -} - -// testRunId returns a sobek.Value(string) with the Cloud test run id. -// -// This code can be executed in two situations, either when the k6 binary is running in the Cloud, in which case -// the value of the test run id would be available in the environment, and we would have loaded at module initialization -// time; or when the k6 binary is running locally and test run id is present in the options, which we try to read at -// time of running this method, but only once for the whole execution as options won't change anymore. -func (mi *ModuleInstance) testRunId() (sobek.Value, error) { - // In case we have a value (e.g. loaded from env), we return it. - // If we're in the init context (where we cannot read the options), we return undefined (the default value). - if !sobek.IsUndefined(mi.testRunID) || mi.vu.State() == nil { - return mi.testRunID, nil - } - - // Otherwise, we try to read the test run id from options. - // We only try it once for the whole execution, as options won't change. - vuState := mi.vu.State() - var err error - mi.once.Do(func() { - // We pass almost all values to zero/nil because here we only care about the Cloud configuration present in options. - var optsConf cloudapi.Config - optsConf, _, err = cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil) - - if optsConf.TestRunID.Valid { - mi.testRunID = mi.vu.Runtime().ToValue(optsConf.TestRunID.String) - } - }) - - return mi.testRunID, err -} diff --git a/js/modules/k6/cloud/cloud_test.go b/js/modules/k6/cloud/cloud_test.go deleted file mode 100644 index 4adcfd18bf0..00000000000 --- a/js/modules/k6/cloud/cloud_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package cloud - -import ( - "testing" - - "github.com/grafana/sobek" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.k6.io/k6/js/modulestest" - "go.k6.io/k6/lib" -) - -func setupCloudTestEnv(t *testing.T, env map[string]string) *modulestest.Runtime { - tRt := modulestest.NewRuntime(t) - tRt.VU.InitEnv().LookupEnv = func(key string) (string, bool) { - v, ok := env[key] - return v, ok - } - m, ok := New().NewModuleInstance(tRt.VU).(*ModuleInstance) - require.True(t, ok) - require.NoError(t, tRt.VU.Runtime().Set("cloud", m.Exports().Default)) - return tRt -} - -func TestGetTestRunId(t *testing.T) { - t.Parallel() - - t.Run("Cloud execution", func(t *testing.T) { - t.Parallel() - - t.Run("Not defined", func(t *testing.T) { - t.Parallel() - tRt := setupCloudTestEnv(t, nil) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, sobek.Undefined(), testRunId) - }) - - t.Run("Defined", func(t *testing.T) { - t.Parallel() - tRt := setupCloudTestEnv(t, map[string]string{"K6_CLOUD_TEST_RUN_ID": "123"}) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, "123", testRunId.String()) - }) - }) - - t.Run("Local execution", func(t *testing.T) { - t.Parallel() - - t.Run("Init context", func(t *testing.T) { - t.Parallel() - tRt := setupCloudTestEnv(t, nil) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, sobek.Undefined(), testRunId) - }) - - t.Run("Not defined", func(t *testing.T) { - t.Parallel() - tRt := setupCloudTestEnv(t, nil) - tRt.MoveToVUContext(&lib.State{ - Options: lib.Options{}, - }) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, sobek.Undefined(), testRunId) - }) - - t.Run("Defined", func(t *testing.T) { - t.Parallel() - tRt := setupCloudTestEnv(t, nil) - tRt.MoveToVUContext(&lib.State{ - Options: lib.Options{ - Cloud: []byte(`{"testRunID": "123"}`), - }, - }) - testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) - require.NoError(t, err) - assert.Equal(t, "123", testRunId.String()) - }) - }) -} From 893b76054296f24d04e6508e4347c62ec13854bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 19 Dec 2024 12:31:45 +0100 Subject: [PATCH 08/11] Move the cloud test run creation into the test configuration --- cmd/cloud_run.go | 34 ++++++++++++++------------- cmd/outputs_cloud.go | 41 +++++++++++++++++++-------------- cmd/tests/cmd_cloud_run_test.go | 40 ++++++++++++++++++++++++++++---- output/cloud/output.go | 9 +++++--- 4 files changed, 83 insertions(+), 41 deletions(-) diff --git a/cmd/cloud_run.go b/cmd/cloud_run.go index 866a5b94af8..744c4803325 100644 --- a/cmd/cloud_run.go +++ b/cmd/cloud_run.go @@ -131,25 +131,27 @@ func (c *cmdCloudRun) preRun(cmd *cobra.Command, args []string) error { func (c *cmdCloudRun) run(cmd *cobra.Command, args []string) error { if c.localExecution { - // We know this execution requires a test run to be created in the Cloud. - // So, we create it before delegating the actual execution to the run command. - // To do that, we need to load the test and configure it. - test, err := loadAndConfigureLocalTest(c.runCmd.gs, cmd, args, getCloudRunLocalExecutionConfig) - if err != nil { - return fmt.Errorf("could not load and configure the test: %w", err) - } - - // As we've already loaded the test, we can modify the init function to - // reuse the initialized one. c.runCmd.loadConfiguredTest = func(*cobra.Command, []string) (*loadedAndConfiguredTest, execution.Controller, error) { - return test, local.NewController(), nil - } + test, err := loadAndConfigureLocalTest(c.runCmd.gs, cmd, args, getCloudRunLocalExecutionConfig) + if err != nil { + return nil, nil, fmt.Errorf("could not load and configure the test: %w", err) + } + + // If the "K6_CLOUDRUN_TEST_RUN_ID" is set, then it means that this code is being executed in the k6 Cloud. + // Therefore, we don't need to continue with the test run creation, as we don't need to create any test run. + // This should technically never happen, as k6, when executed in the Cloud, it uses the standard "run" + // command "locally", but we add this early return just in case, for safety. + // + // If not, we know this execution requires a test run to be created in the Cloud. + // So, we create it as part of the process of loading and configuring the test. + if _, isSet := c.runCmd.gs.Env[testRunIDKey]; !isSet { + if err := createCloudTest(c.runCmd.gs, test); err != nil { + return nil, nil, fmt.Errorf("could not create the cloud test run: %w", err) + } + } - // After that, we can create the remote test run. - if err := createCloudTest(c.runCmd.gs, test); err != nil { - return fmt.Errorf("could not create the cloud test run: %w", err) + return test, local.NewController(), nil } - return c.runCmd.run(cmd, args) } diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go index f1335dc68e7..06368315f87 100644 --- a/cmd/outputs_cloud.go +++ b/cmd/outputs_cloud.go @@ -25,20 +25,15 @@ const ( ) // createCloudTest performs some test and Cloud configuration validations and if everything -// looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud. -// It is also responsible for filling the test run id on the test options, so it can be used later. -// It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, -// or an error if something goes wrong. +// looks good, then it creates a test run in the k6 Cloud, using the Cloud API, meant to be used +// for streaming test results. +// +// This method is also responsible for filling the test run id on the test environment, so it can be used later, +// and to populate the Cloud configuration back in case the Cloud API returned some overrides, +// as expected by the Cloud output. +// +//nolint:funlen func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error { - // If the "K6_CLOUDRUN_TEST_RUN_ID" is set, then it means that this code is being executed in the k6 Cloud. - // Therefore, we don't need to continue with the test run creation, as we don't need to create any test run. - // - // This should technically never happen, as k6, when executed in the Cloud, it uses the standard "run" - // command "locally", but we add this early return just in case, for safety. - if _, isSet := gs.Env[testRunIDKey]; isSet { - return nil - } - // Otherwise, we continue normally with the creation of the test run in the k6 Cloud backend services. conf, warn, err := cloudapi.GetConsolidatedConfig( test.derivedConfig.Collectors[builtinOutputCloud.String()], @@ -112,7 +107,7 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error testRun := &cloudapi.TestRun{ Name: conf.Name.String, ProjectID: conf.ProjectID.Int64, - VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)), + VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)), //nolint:gosec Thresholds: thresholds, Duration: int64(duration / time.Second), Archive: testArchive, @@ -128,12 +123,24 @@ func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error return err } + // We store the test run id in the environment, so it can be used later. + test.preInitState.RuntimeOptions.Env[testRunIDKey] = response.ReferenceID + + // If the Cloud API returned configuration overrides, we apply them to the current configuration. + // Then, we serialize the overridden configuration back, so it can be used by the Cloud output. if response.ConfigOverride != nil { logger.WithFields(logrus.Fields{"override": response.ConfigOverride}).Debug("overriding config options") - conf = conf.Apply(*response.ConfigOverride) - } - test.preInitState.RuntimeOptions.Env[testRunIDKey] = response.ReferenceID + raw, err := cloudConfToRawMessage(conf.Apply(*response.ConfigOverride)) + if err != nil { + return fmt.Errorf("could not serialize overridden cloud configuration: %w", err) + } + + if test.derivedConfig.Collectors == nil { + test.derivedConfig.Collectors = make(map[string]json.RawMessage) + } + test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw + } return nil } diff --git a/cmd/tests/cmd_cloud_run_test.go b/cmd/tests/cmd_cloud_run_test.go index b15d7269305..c6706a93b92 100644 --- a/cmd/tests/cmd_cloud_run_test.go +++ b/cmd/tests/cmd_cloud_run_test.go @@ -7,16 +7,16 @@ import ( "io" "net/http" "path/filepath" + "strconv" "testing" - "go.k6.io/k6/errext/exitcodes" - "go.k6.io/k6/lib/fsext" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.k6.io/k6/cloudapi" - "github.com/stretchr/testify/assert" + "go.k6.io/k6/cloudapi" "go.k6.io/k6/cmd" + "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/lib/fsext" ) func TestK6CloudRun(t *testing.T) { @@ -169,6 +169,36 @@ export default function() {};` assert.Contains(t, stdout, "execution: local") assert.Contains(t, stdout, "output: cloud (https://some.other.url/foo/tests/org/1337?bar=baz)") }) + + t.Run("the script can read the test run id to the environment", func(t *testing.T) { + t.Parallel() + + script := ` +export const options = { + cloud: { + name: 'Hello k6 Cloud!', + projectID: 123456, + }, +}; + +export default function() { + ` + "console.log(`The test run id is ${__ENV.K6_CLOUDRUN_TEST_RUN_ID}`);" + ` +};` + + ts := makeTestState(t, script, []string{"--local-execution", "--log-output=stdout"}, 0) + + const testRunID = 1337 + srv := getCloudTestEndChecker(t, testRunID, nil, cloudapi.RunStatusFinished, cloudapi.ResultStatusPassed) + ts.Env["K6_CLOUD_HOST"] = srv.URL + + cmd.ExecuteWithGlobalState(ts.GlobalState) + + stdout := ts.Stdout.String() + t.Log(stdout) + assert.Contains(t, stdout, "execution: local") + assert.Contains(t, stdout, "output: cloud (https://app.k6.io/runs/1337)") + assert.Contains(t, stdout, "The test run id is "+strconv.Itoa(testRunID)) + }) } func makeTestState(tb testing.TB, script string, cliFlags []string, expExitCode exitcodes.ExitCode) *GlobalTestState { diff --git a/output/cloud/output.go b/output/cloud/output.go index 1d2ccbef1d7..c4294c816f4 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -21,8 +21,10 @@ import ( "go.k6.io/k6/usage" ) -// TestName is the default k6 Cloud test name -const TestName = "k6 test" +const ( + defaultTestName = "k6 test" + testRunIDKey = "K6_CLOUDRUN_TEST_RUN_ID" +) // versionedOutput represents an output implementing // metrics samples aggregation and flushing to the @@ -120,7 +122,7 @@ func newOutput(params output.Params) (*Output, error) { conf.Name = null.StringFrom(filepath.Base(scriptPath)) } if conf.Name.String == "-" { - conf.Name = null.StringFrom(TestName) + conf.Name = null.StringFrom(defaultTestName) } duration, testEnds := lib.GetEndOffset(params.ExecutionPlan) @@ -148,6 +150,7 @@ func newOutput(params output.Params) (*Output, error) { duration: int64(duration / time.Second), logger: logger, usage: params.Usage, + testRunID: params.RuntimeOptions.Env[testRunIDKey], }, nil } From 167f3bd683faac9c2536ed4923e17be3ce4e2ca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Thu, 19 Dec 2024 12:43:38 +0100 Subject: [PATCH 09/11] Keep the compatibility w/old behavior in Cloud output --- output/cloud/output.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/output/cloud/output.go b/output/cloud/output.go index c4294c816f4..1d877d526cf 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -180,6 +180,10 @@ func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { // Start calls the k6 Cloud API to initialize the test run, and then starts the // goroutine that would listen for metric samples and send them to the cloud. func (out *Output) Start() error { + if out.config.PushRefID.Valid { + out.testRunID = out.config.PushRefID.String + } + if out.testRunID != "" { out.logger.WithField("testRunId", out.testRunID).Debug("Directly pushing metrics without init") return out.startVersionedOutput() From a22628f509d5f1af166a2c2fe2ab165304ab5c36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= Date: Fri, 20 Dec 2024 10:23:52 +0100 Subject: [PATCH 10/11] Remove the Cloud-specific handling code of the cloud local execution --- cmd/cloud_run.go | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/cmd/cloud_run.go b/cmd/cloud_run.go index 744c4803325..6824bbe1100 100644 --- a/cmd/cloud_run.go +++ b/cmd/cloud_run.go @@ -137,17 +137,8 @@ func (c *cmdCloudRun) run(cmd *cobra.Command, args []string) error { return nil, nil, fmt.Errorf("could not load and configure the test: %w", err) } - // If the "K6_CLOUDRUN_TEST_RUN_ID" is set, then it means that this code is being executed in the k6 Cloud. - // Therefore, we don't need to continue with the test run creation, as we don't need to create any test run. - // This should technically never happen, as k6, when executed in the Cloud, it uses the standard "run" - // command "locally", but we add this early return just in case, for safety. - // - // If not, we know this execution requires a test run to be created in the Cloud. - // So, we create it as part of the process of loading and configuring the test. - if _, isSet := c.runCmd.gs.Env[testRunIDKey]; !isSet { - if err := createCloudTest(c.runCmd.gs, test); err != nil { - return nil, nil, fmt.Errorf("could not create the cloud test run: %w", err) - } + if err := createCloudTest(c.runCmd.gs, test); err != nil { + return nil, nil, fmt.Errorf("could not create the cloud test run: %w", err) } return test, local.NewController(), nil From d22f376af2517eac0684413ffaac1497bb025a11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joan=20L=C3=B3pez=20de=20la=20Franca=20Beltran?= <5459617+joanlopez@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:55:57 +0100 Subject: [PATCH 11/11] Apply suggestions from code review Co-authored-by: Oleg Bespalov --- cloudapi/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudapi/config.go b/cloudapi/config.go index 41a4078c8b8..0dd796c73c9 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -32,7 +32,7 @@ type Config struct { StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"` APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` - // PushRefID is the identifier used by Cloud systems to correlate all the things that + // PushRefID is the identifier used by k6 Cloud to correlate all the things that // belong to the same test run/execution. Currently, it is equivalent to the test run id. // But, in the future, or in future solutions (e.g. Synthetic Monitoring), there might be // no test run id, and we may still need an identifier to correlate all the things.