diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index 705d4c325c..63fd8a0154 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -15,7 +15,6 @@ import ( "net/http" "os" "os/signal" - "runtime" "strconv" "strings" "syscall" @@ -215,10 +214,6 @@ func TestMainFlow(t *testing.T) { }) t.Run("kafka", func(t *testing.T) { - if runtime.GOARCH == "arm64" && !overrideArm64Check { - t.Skip("arm64 is not supported yet") - } - kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Port) // Create new consumer @@ -389,20 +384,18 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} { require.NoError(t, err) containersGroup, containersCtx := errgroup.WithContext(context.TODO()) - if runtime.GOARCH != "arm64" || overrideArm64Check { - containersGroup.Go(func() (err error) { - kafkaContainer, err = destination.SetupKafka(pool, t, - destination.WithLogger(&testLogger{logger.NewLogger().Child("kafka")}), - destination.WithBrokers(1), - ) - if err != nil { - return err - } - kafkaCtx, kafkaCancel := context.WithTimeout(containersCtx, 3*time.Minute) - defer kafkaCancel() - return waitForKafka(kafkaCtx, t, kafkaContainer.Port) - }) - } + containersGroup.Go(func() (err error) { + kafkaContainer, err = destination.SetupKafka(pool, t, + destination.WithLogger(&testLogger{logger.NewLogger().Child("kafka")}), + destination.WithBrokers(1), + ) + if err != nil { + return err + } + kafkaCtx, kafkaCancel := context.WithTimeout(containersCtx, 3*time.Minute) + defer kafkaCancel() + return waitForKafka(kafkaCtx, t, kafkaContainer.Port) + }) containersGroup.Go(func() (err error) { redisContainer, err = destination.SetupRedis(pool, t) return err @@ -462,9 +455,7 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} { "minioEndpoint": minioContainer.Endpoint, "minioBucketName": minioContainer.BucketName, } - if runtime.GOARCH != "arm64" || overrideArm64Check { - mapWorkspaceConfig["kafkaPort"] = kafkaContainer.Port - } + mapWorkspaceConfig["kafkaPort"] = kafkaContainer.Port workspaceConfigPath := workspaceConfig.CreateTempFile(t, "testdata/workspaceConfigTemplate.json", mapWorkspaceConfig, diff --git a/integration_test/multi_tentant_test/multi_tenant_test.go b/integration_test/multi_tentant_test/multi_tenant_test.go index cdd7c43956..13df85fe56 100644 --- a/integration_test/multi_tentant_test/multi_tenant_test.go +++ b/integration_test/multi_tentant_test/multi_tenant_test.go @@ -220,7 +220,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) { // Pushing valid configuration via ETCD etcdReqKey := getETCDWorkspacesReqKey(releaseName, serverInstanceID, appType) - _, err = etcdContainer.Client.Put(ctx, etcdReqKey, `{"workspaces":"`+workspaceID+`","ack_key":"test-ack/1"}`) + _, err = etcdContainer.Client.Put(ctx, etcdReqKey, `{"workspaces":"`+workspaceID+`","ack_key":"test-ack-1/1"}`) require.NoError(t, err) // Checking now that the configuration has been processed and the server can start @@ -233,7 +233,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) { ) select { - case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/1"): + case ack := <-etcdContainer.Client.Watch(ctx, "test-ack-1/1", clientv3.WithRev(1)): v, err := unmarshalWorkspaceAckValue(t, &ack) require.NoError(t, err) require.Equal(t, "RELOADED", v.Status) @@ -245,7 +245,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) { grpc.WithBlock(), // block until the underlying connection is up }, }) - t.Fatalf("Timeout waiting for test-ack/1 (etcd status error: %v)", err) + t.Fatalf("Timeout waiting for test-ack-1/1 (etcd status error: %v)", err) } cleanupGwJobs := func() { @@ -299,7 +299,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) { t.Log("Triggering degraded mode") select { - case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/", clientv3.WithPrefix()): + case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/", clientv3.WithPrefix(), clientv3.WithRev(1)): require.Len(t, ack.Events, 1) require.Equal(t, "test-ack/normal", string(ack.Events[0].Kv.Key)) require.Equal(t, `{"status":"NORMAL"}`, string(ack.Events[0].Kv.Value)) @@ -321,17 +321,17 @@ func testMultiTenantByAppType(t *testing.T, appType string) { serverModeReqKey := getETCDServerModeReqKey(releaseName, serverInstanceID) t.Logf("Server mode ETCD key: %s", serverModeReqKey) - _, err := etcdContainer.Client.Put(ctx, serverModeReqKey, `{"mode":"DEGRADED","ack_key":"test-ack/2"}`) + _, err := etcdContainer.Client.Put(ctx, serverModeReqKey, `{"mode":"DEGRADED","ack_key":"test-ack-2/2"}`) require.NoError(t, err) t.Log("Triggering degraded mode") select { - case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/", clientv3.WithPrefix()): + case ack := <-etcdContainer.Client.Watch(ctx, "test-ack-2/", clientv3.WithPrefix(), clientv3.WithRev(1)): require.Len(t, ack.Events, 1) - require.Equal(t, "test-ack/2", string(ack.Events[0].Kv.Key)) + require.Equal(t, "test-ack-2/2", string(ack.Events[0].Kv.Key)) require.Equal(t, `{"status":"DEGRADED"}`, string(ack.Events[0].Kv.Value)) case <-time.After(60 * time.Second): - t.Fatal("Timeout waiting for server-mode test-ack") + t.Fatal("Timeout waiting for server-mode test-ack-2") } sendEventsToGateway(t, httpPort, writeKey) @@ -350,17 +350,17 @@ func testMultiTenantByAppType(t *testing.T, appType string) { // workspaces it is serving. t.Run("empty workspaces are accepted", func(t *testing.T) { _, err := etcdContainer.Client.Put(ctx, - etcdReqKey, `{"workspaces":"","ack_key":"test-ack/3"}`, + etcdReqKey, `{"workspaces":"","ack_key":"test-ack-3/3"}`, ) require.NoError(t, err) select { - case ack := <-etcdContainer.Client.Watch(ctx, "test-ack/3"): + case ack := <-etcdContainer.Client.Watch(ctx, "test-ack-3/3", clientv3.WithRev(1)): v, err := unmarshalWorkspaceAckValue(t, &ack) require.NoError(t, err) require.Equal(t, "RELOADED", v.Status) require.Equal(t, "", v.Error) case <-time.After(60 * time.Second): - t.Fatal("Timeout waiting for test-ack/3") + t.Fatal("Timeout waiting for test-ack-3/3") } }) } diff --git a/services/streammanager/kafka/client/client_test.go b/services/streammanager/kafka/client/client_test.go index c3def12c39..2086d6cbd1 100644 --- a/services/streammanager/kafka/client/client_test.go +++ b/services/streammanager/kafka/client/client_test.go @@ -7,7 +7,6 @@ import ( "io" "os" "path/filepath" - "runtime" "strings" "sync" "sync/atomic" @@ -32,10 +31,6 @@ func TestMain(m *testing.M) { if os.Getenv("OVERRIDE_ARM64_CHECK") == "1" { overrideArm64Check = true } - if runtime.GOARCH == "arm64" && !overrideArm64Check { - fmt.Println("arm64 is not supported yet") - os.Exit(0) - } os.Exit(m.Run()) } diff --git a/services/streammanager/kafka/kafkamanager_test.go b/services/streammanager/kafka/kafkamanager_test.go index ed0f409437..11f3db135b 100644 --- a/services/streammanager/kafka/kafkamanager_test.go +++ b/services/streammanager/kafka/kafkamanager_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "os" - "runtime" "testing" "time" @@ -119,10 +118,6 @@ func TestNewProducer(t *testing.T) { }) t.Run("ok", func(t *testing.T) { - if runtime.GOARCH == "arm64" && !overrideArm64Check { - t.Skip("arm64 is not supported yet") - } - kafkaStats.creationTime = getMockedTimer(t, gomock.NewController(t)) pool, err := dockertest.NewPool("") diff --git a/testhelper/destination/kafka.go b/testhelper/destination/kafka.go index ccb2480c26..242843c7d5 100644 --- a/testhelper/destination/kafka.go +++ b/testhelper/destination/kafka.go @@ -3,6 +3,7 @@ package destination import ( _ "encoding/json" "fmt" + "runtime" "strconv" _ "github.com/lib/pq" @@ -141,9 +142,13 @@ func SetupKafka(pool *dockertest.Pool, cln cleaner, opts ...Option) (*KafkaResou if err != nil { return nil, err } + zkImage := "bitnami/zookeeper" + if runtime.GOARCH == "arm64" { + zkImage = "zcube/bitnami-compat-zookeeper" + } zookeeperPort := fmt.Sprintf("%s/tcp", strconv.Itoa(zookeeperPortInt)) zookeeperContainer, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "bitnami/zookeeper", + Repository: zkImage, Tag: "latest", NetworkID: network.ID, Hostname: "zookeeper", @@ -255,8 +260,12 @@ func SetupKafka(pool *dockertest.Pool, cln cleaner, opts ...Option) (*KafkaResou nodeID := fmt.Sprintf("%d", i+1) hostname := "kafka" + nodeID + kImage := "bitnami/kafka" + if runtime.GOARCH == "arm64" { + kImage = "zcube/bitnami-compat-kafka" + } containers[i], err = pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "bitnami/kafka", + Repository: kImage, Tag: "latest", NetworkID: network.ID, Hostname: hostname, diff --git a/testhelper/etcd/etcd.go b/testhelper/etcd/etcd.go index 1ce1784ee5..3d7cf6f262 100644 --- a/testhelper/etcd/etcd.go +++ b/testhelper/etcd/etcd.go @@ -2,6 +2,7 @@ package etcd import ( "fmt" + "runtime" "strconv" "github.com/ory/dockertest/v3" @@ -21,7 +22,11 @@ type Resource struct { } func Setup(pool *dockertest.Pool, cln cleaner) (*Resource, error) { - container, err := pool.Run("bitnami/etcd", "3", []string{ + etcdImage := "bitnami/etcd" + if runtime.GOARCH == "arm64" { + etcdImage = "zcube/bitnami-compat-etcd" + } + container, err := pool.Run(etcdImage, "3.5", []string{ "ALLOW_NONE_AUTHENTICATION=yes", }) if err != nil {