From f74fb3fae0aa8dd6357108eb733955af810f1aea Mon Sep 17 00:00:00 2001 From: Eduardo Cabral <47820549+FerroEduardo@users.noreply.github.com> Date: Mon, 21 Oct 2024 14:57:05 -0300 Subject: [PATCH] feat: add gRPC keepalive server parameters (#52) # Reason Due to the nature of HTTP/2 of keeping persistent connections, the usage of the default gRPC Keepalive parameters may become a problem when dealing with k8s autoscaling. The possibility of setting up [`MAX_CONNECTION_AGE`](https://grpc.io/docs/guides/keepalive/#keepalive-configuration-specification) server parameter also enables others client capabilities, such as gRPC client load balancing ([1](https://grpc.io/blog/grpc-load-balancing/), [2](https://github.com/grpc/grpc/blob/master/doc/load-balancing.md)) (without service mesh, such as [Istio](https://istio.io/)). # Description This PR proposes the addition of configurable [gRPC server Keepalive parameters](https://grpc.io/docs/guides/keepalive/#keepalive-configuration-specification). As the environment variables and `keepalive.ServerParameters` struct fields are optional, Deckard's previous behavior will remain the same. ## References - [gRPC load balancing](https://grpc.io/blog/grpc-load-balancing/) - Default keepalive values: [1](https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L214-L236), [2](https://github.com/grpc/grpc-go/blob/master/internal/transport/defaults.go) - [Lessons learned from running a large gRPC mesh at Datadog](https://www.datadoghq.com/blog/grpc-at-datadog/) --- .github/workflows/helm.yaml | 10 +- .github/workflows/lint.yaml | 6 +- .github/workflows/perform_release.yaml | 10 +- .github/workflows/release.yaml | 22 +-- .github/workflows/test.yaml | 14 +- .../workflows/test_engine_compatibility.yaml | 12 +- README.md | 16 ++ internal/config/service.go | 20 +++ internal/service/deckard_service.go | 28 ++++ internal/service/deckard_service_test.go | 151 ++++++++++++++++++ 10 files changed, 252 insertions(+), 37 deletions(-) diff --git a/.github/workflows/helm.yaml b/.github/workflows/helm.yaml index 52b245d..599fbf5 100644 --- a/.github/workflows/helm.yaml +++ b/.github/workflows/helm.yaml @@ -25,11 +25,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Pages - uses: actions/configure-pages@v3 + uses: actions/configure-pages@v5 - name: Setup Helm - uses: azure/setup-helm@v3 + uses: azure/setup-helm@v4 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Setup bitnami repo @@ -47,9 +47,9 @@ jobs: working-directory: helm run: helm repo index . - name: Upload artifact - uses: actions/upload-pages-artifact@v1 + uses: actions/upload-pages-artifact@v3 with: path: 'helm/' - name: Deploy to GitHub Pages id: deployment - uses: actions/deploy-pages@v2 \ No newline at end of file + uses: actions/deploy-pages@v4 \ No newline at end of file diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index ea8abc3..ffdf4a5 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -14,11 +14,11 @@ jobs: name: lint runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install mockgen run: go install github.com/golang/mock/mockgen@latest @@ -27,7 +27,7 @@ jobs: run: make gen-mocks - name: golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v6 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version version: latest diff --git a/.github/workflows/perform_release.yaml b/.github/workflows/perform_release.yaml index 69ccf08..c9f1ca6 100644 --- a/.github/workflows/perform_release.yaml +++ b/.github/workflows/perform_release.yaml @@ -5,11 +5,11 @@ on: jobs: perform-release: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Change versions run: | @@ -44,13 +44,13 @@ jobs: echo "deckard_patch=$PATCH" >> $GITHUB_ENV - name: Commit changes - uses: stefanzweifel/git-auto-commit-action@v4 + uses: stefanzweifel/git-auto-commit-action@v5 with: commit_message: 'build: ${{ env.deckard_new_version }} release' file_pattern: 'internal/project/project.go java/pom.xml csharp/Deckard.csproj helm/Chart.yaml' - name: Release - uses: softprops/action-gh-release@v1 + uses: softprops/action-gh-release@v2 with: tag_name: v${{ env.deckard_new_version }} generate_release_notes: true @@ -78,7 +78,7 @@ jobs: echo "deckard_new_snapshot_version=$NEW_SNAPSHOT_VERSION" >> $GITHUB_ENV - name: Commit changes - uses: stefanzweifel/git-auto-commit-action@v4 + uses: stefanzweifel/git-auto-commit-action@v5 with: commit_message: 'chore: ${{ env.deckard_new_snapshot_version }}' file_pattern: 'internal/project/project.go java/pom.xml csharp/Deckard.csproj' diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index df7e72a..3902ff2 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -16,14 +16,14 @@ jobs: contents: write steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} - name: Cache Go modules - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.cache/go-build @@ -67,14 +67,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Cache Go modules - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.cache/go-build @@ -84,7 +84,7 @@ jobs: ${{ runner.os }}-golang- - name: Setup Ko - uses: ko-build/setup-ko@v0.6 + uses: ko-build/setup-ko@v0.7 - name: Publish image env: @@ -99,9 +99,9 @@ jobs: contents: read packages: write steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - - uses: actions/setup-java@v3 + - uses: actions/setup-java@v4 with: java-version: '11' distribution: 'adopt' @@ -127,10 +127,10 @@ jobs: contents: read packages: write steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup .NET Core SDK ${{ env.DOTNET_VERSION }} - uses: actions/setup-dotnet@v3 + uses: actions/setup-dotnet@v4 with: dotnet-version: ${{ env.DOTNET_VERSION }} diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c22ab61..10e736f 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -32,14 +32,14 @@ jobs: steps: - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Cache Go modules - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.cache/go-build @@ -67,21 +67,21 @@ jobs: run: make test - name: Publish test results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: name: gotest.out path: gotest.out - name: Publish JUnit report - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: name: junit.xml path: junit.xml - name: JUnit report action - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 if: always() with: report_paths: junit.xml @@ -89,7 +89,7 @@ jobs: check_name: JUnit Test Report - name: Coverage Report - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 if: always() with: token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/test_engine_compatibility.yaml b/.github/workflows/test_engine_compatibility.yaml index fd0d7d1..a80b1eb 100644 --- a/.github/workflows/test_engine_compatibility.yaml +++ b/.github/workflows/test_engine_compatibility.yaml @@ -32,14 +32,14 @@ jobs: steps: - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Cache Go modules - uses: actions/cache@v3 + uses: actions/cache@v4 with: path: | ~/.cache/go-build @@ -67,21 +67,21 @@ jobs: run: make integration-test - name: Publish test results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: name: gotest.out path: gotest.out - name: Publish JUnit report - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: name: junit.xml path: junit.xml - name: JUnit report action - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 if: always() with: report_paths: junit.xml diff --git a/README.md b/README.md index fbe9d43..dd0e60e 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,22 @@ All available environment variables are listed below: | `DECKARD_GRPC_ENABLED` | `true` | To enable the gRPC service. You can disable gRPC service if you want an instance to perform only housekeeper tasks. | | `DECKARD_GRPC_PORT` | `8081` | The gRPC port to listen. | +### gRPC Server Configuration + +| Environment Variable | Default | Description | +|------------------------------|---------|-------------| +| `DECKARD_GRPC_SERVER_KEEPALIVE_TIME` | | The interval after which a keepalive ping is sent. | +| `DECKARD_GRPC_SERVER_KEEPALIVE_TIMEOUT` | | The duration the gRPC server waits for a keepalive response before closing the connection. | +| `DECKARD_GRPC_SERVER_MAX_CONNECTION_IDLE` | | The maximum duration a connection can remain idle. | +| `DECKARD_GRPC_SERVER_MAX_CONNECTION_AGE` | | The maximum duration a connection can exist. | +| `DECKARD_GRPC_SERVER_MAX_CONNECTION_AGE_GRACE` | | The additional time the gRPC server allows for a connection to complete its current operations before closing it after reaching the maximum connection age. | + +> Values should be specified using time units such as `1s` for seconds, `1m` for minutes, `1h` for hours. + +The default values depends on the server implementation. For more information check these links: +- [Keepalive configuration specification](https://grpc.io/docs/guides/keepalive/#keepalive-configuration-specification) +- [`grpc-go/internal/transport/defaults.go`](https://github.com/grpc/grpc-go/blob/master/internal/transport/defaults.go) + ### Cache Configuration | Environment Variable | Default | Description | diff --git a/internal/config/service.go b/internal/config/service.go index dd0c0ba..ed28302 100644 --- a/internal/config/service.go +++ b/internal/config/service.go @@ -10,6 +10,26 @@ var GrpcPort = Create(&ViperConfigKey{ Default: 8081, }) +var GrpcServerKeepaliveTime = Create(&ViperConfigKey{ + Key: "grpc.server.keepalive_time", +}) + +var GrpcServerKeepaliveTimeout = Create(&ViperConfigKey{ + Key: "grpc.server.keepalive_timeout", +}) + +var GrpcServerMaxConnectionIdle = Create(&ViperConfigKey{ + Key: "grpc.server.max_connection_idle", +}) + +var GrpcServerMaxConnectionAge = Create(&ViperConfigKey{ + Key: "grpc.server.max_connection_age", +}) + +var GrpcServerMaxConnectionAgeGrace = Create(&ViperConfigKey{ + Key: "grpc.server.max_connection_age_grace", +}) + var TlsServerCertFilePaths = Create(&ViperConfigKey{ Key: "tls.server.cert_file_paths", }) diff --git a/internal/service/deckard_service.go b/internal/service/deckard_service.go index 0a566c4..8a1b8a6 100644 --- a/internal/service/deckard_service.go +++ b/internal/service/deckard_service.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" grpchealth "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" @@ -76,6 +77,7 @@ func (d *Deckard) ServeGRPCServer(ctx context.Context) (*grpc.Server, error) { options := []grpc.ServerOption{ grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()), + getGrpcKeepaliveParams(), } credentials, err := loadTLSCredentials() @@ -682,3 +684,29 @@ func addSpanAttributes(ctx context.Context, attributes ...attribute.KeyValue) { span.SetAttributes(attributes...) } + +func getGrpcKeepaliveParams() grpc.ServerOption { + parameters := keepalive.ServerParameters{} + + if config.GrpcServerKeepaliveTime.Get() != "" { + parameters.Time = config.GrpcServerKeepaliveTime.GetDuration() + } + + if config.GrpcServerKeepaliveTimeout.Get() != "" { + parameters.Timeout = config.GrpcServerKeepaliveTimeout.GetDuration() + } + + if config.GrpcServerMaxConnectionIdle.Get() != "" { + parameters.MaxConnectionIdle = config.GrpcServerMaxConnectionIdle.GetDuration() + } + + if config.GrpcServerMaxConnectionAge.Get() != "" { + parameters.MaxConnectionAge = config.GrpcServerMaxConnectionAge.GetDuration() + } + + if config.GrpcServerMaxConnectionAgeGrace.Get() != "" { + parameters.MaxConnectionAgeGrace = config.GrpcServerMaxConnectionAgeGrace.GetDuration() + } + + return grpc.KeepaliveParams(parameters) +} diff --git a/internal/service/deckard_service_test.go b/internal/service/deckard_service_test.go index 6afd16b..24b12b6 100644 --- a/internal/service/deckard_service_test.go +++ b/internal/service/deckard_service_test.go @@ -25,6 +25,7 @@ import ( "github.com/takenet/deckard/internal/queue/score" "github.com/takenet/deckard/internal/queue/storage" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) @@ -696,3 +697,153 @@ func loadClientCredentials(loadClientCert bool) (credentials.TransportCredential return credentials.NewTLS(config), nil } + +func TestDeckardServerKeepalive(t *testing.T) { + if testing.Short() { + return + } + t.Parallel() + + t.Run("set all fields", func(t *testing.T) { + config.Configure(true) + + config.GrpcPort.Set("8088") + + config.GrpcServerKeepaliveTime.Set(1 * time.Second) + config.GrpcServerKeepaliveTimeout.Set(5 * time.Second) + config.GrpcServerMaxConnectionIdle.Set(30 * time.Second) + config.GrpcServerMaxConnectionAge.Set(1 * time.Minute) + config.GrpcServerMaxConnectionAgeGrace.Set(2 * time.Minute) + + storage := storage.NewMemoryStorage(ctx) + cache := cache.NewMemoryCache() + + queueService := queue.NewQueueConfigurationService(ctx, storage) + + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) + + srv := NewMemoryDeckardService(queue, queueService) + + server, err := srv.ServeGRPCServer(ctx) + require.NoError(t, err) + defer server.Stop() + + // Set up a connection to the server. + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + conn, err := grpc.DialContext(ctx, fmt.Sprint("localhost:", config.GrpcPort.GetInt()), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + client := deckard.NewDeckardClient(conn) + + response, err := client.Add(ctx, &deckard.AddRequest{ + Messages: []*deckard.AddMessage{ + { + Id: "1", + Queue: "queue", + Timeless: true, + }, + }, + }) + + require.NoError(t, err) + require.Equal(t, int64(1), response.CreatedCount) + require.Equal(t, int64(0), response.UpdatedCount) + + getResponse, err := client.Pull(ctx, &deckard.PullRequest{Queue: "queue"}) + require.NoError(t, err) + require.Len(t, getResponse.Messages, 1) + require.Equal(t, "1", getResponse.Messages[0].Id) + }) + + t.Run("with max connection age", func(t *testing.T) { + config.Configure(true) + + config.GrpcPort.Set("8089") + maxConnectionAge := 1 * time.Second + config.GrpcServerMaxConnectionAge.Set(maxConnectionAge.String()) + + storage := storage.NewMemoryStorage(ctx) + cache := cache.NewMemoryCache() + + queueService := queue.NewQueueConfigurationService(ctx, storage) + + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) + + srv := NewMemoryDeckardService(queue, queueService) + + server, err := srv.ServeGRPCServer(ctx) + require.NoError(t, err) + defer server.Stop() + + // Set up a connection to the server. + conn, err := grpc.DialContext(context.Background(), fmt.Sprint("localhost:", config.GrpcPort.GetInt()), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + start := time.Now() + result := conn.WaitForStateChange(waitCtx, connectivity.Ready) + elapsed := time.Since(start) + + maxJitter := maxConnectionAge.Seconds() * float64(1.1) // https://github.com/grpc/grpc-go/blob/56df169480cdb4928a24a50b5289f909f0d81ba7/internal/transport/http2_server.go#L221-L225 + + // If the connection state changes before the context timeout and the elapsed time is less or equal to the max jitter + // then the max connection age is working as expected + if elapsed.Seconds() > maxJitter || !result { + t.Error("Expected to timeout or elapsed time being less then the jitter due to the max connection age") + } + }) + + t.Run("without max connection age", func(t *testing.T) { + config.Configure(true) + + config.GrpcPort.Set("8090") + + storage := storage.NewMemoryStorage(ctx) + cache := cache.NewMemoryCache() + + queueService := queue.NewQueueConfigurationService(ctx, storage) + + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) + + srv := NewMemoryDeckardService(queue, queueService) + + server, err := srv.ServeGRPCServer(ctx) + require.NoError(t, err) + defer server.Stop() + + // Set up a connection to the server. + conn, err := grpc.DialContext( + context.Background(), + fmt.Sprint("localhost:", config.GrpcPort.GetInt()), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + timeoutDuration := 5 * time.Second + waitCtx, cancel := context.WithTimeout(ctx, timeoutDuration) + defer cancel() + + start := time.Now() + result := conn.WaitForStateChange(waitCtx, connectivity.Ready) + elapsed := time.Since(start) + + // If the connection state changes before the context timeout and the elapsed time is less than the timeout duration + // then the connection age is not defined and the connection should not timeout + if elapsed < timeoutDuration || result { + t.Error("Expected the context to timeout, due to the default max connection age being infinite") + } + }) +}