From e0c3c627b80ae4468f9e1dee60306dc2ce3fb284 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 2 Nov 2023 16:20:54 +0000 Subject: [PATCH 1/2] fix(txmgr): ErrOffsetsLoadInProgress is retriable Also update the errors.go message to match Errors.COORDINATOR_LOAD_IN_PROGRESS from Kafka Signed-off-by: Dominic Evans --- errors.go | 186 ++++++++++++++++++------------------ mockresponses.go | 40 ++++++++ transaction_manager.go | 5 +- transaction_manager_test.go | 47 +++++++++ 4 files changed, 182 insertions(+), 96 deletions(-) diff --git a/errors.go b/errors.go index 8d1d16834..2c431aecb 100644 --- a/errors.go +++ b/errors.go @@ -173,98 +173,98 @@ type KError int16 // Numeric error codes returned by the Kafka server. const ( - ErrNoError KError = 0 - ErrUnknown KError = -1 - ErrOffsetOutOfRange KError = 1 - ErrInvalidMessage KError = 2 - ErrUnknownTopicOrPartition KError = 3 - ErrInvalidMessageSize KError = 4 - ErrLeaderNotAvailable KError = 5 - ErrNotLeaderForPartition KError = 6 - ErrRequestTimedOut KError = 7 - ErrBrokerNotAvailable KError = 8 - ErrReplicaNotAvailable KError = 9 - ErrMessageSizeTooLarge KError = 10 - ErrStaleControllerEpochCode KError = 11 - ErrOffsetMetadataTooLarge KError = 12 - ErrNetworkException KError = 13 - ErrOffsetsLoadInProgress KError = 14 - ErrConsumerCoordinatorNotAvailable KError = 15 - ErrNotCoordinatorForConsumer KError = 16 - ErrInvalidTopic KError = 17 - ErrMessageSetSizeTooLarge KError = 18 - ErrNotEnoughReplicas KError = 19 - ErrNotEnoughReplicasAfterAppend KError = 20 - ErrInvalidRequiredAcks KError = 21 - ErrIllegalGeneration KError = 22 - ErrInconsistentGroupProtocol KError = 23 - ErrInvalidGroupId KError = 24 - ErrUnknownMemberId KError = 25 - ErrInvalidSessionTimeout KError = 26 - ErrRebalanceInProgress KError = 27 - ErrInvalidCommitOffsetSize KError = 28 - ErrTopicAuthorizationFailed KError = 29 - ErrGroupAuthorizationFailed KError = 30 - ErrClusterAuthorizationFailed KError = 31 - ErrInvalidTimestamp KError = 32 - ErrUnsupportedSASLMechanism KError = 33 - ErrIllegalSASLState KError = 34 - ErrUnsupportedVersion KError = 35 - ErrTopicAlreadyExists KError = 36 - ErrInvalidPartitions KError = 37 - ErrInvalidReplicationFactor KError = 38 - ErrInvalidReplicaAssignment KError = 39 - ErrInvalidConfig KError = 40 - ErrNotController KError = 41 - ErrInvalidRequest KError = 42 - ErrUnsupportedForMessageFormat KError = 43 - ErrPolicyViolation KError = 44 - ErrOutOfOrderSequenceNumber KError = 45 - ErrDuplicateSequenceNumber KError = 46 - ErrInvalidProducerEpoch KError = 47 - ErrInvalidTxnState KError = 48 - ErrInvalidProducerIDMapping KError = 49 - ErrInvalidTransactionTimeout KError = 50 - ErrConcurrentTransactions KError = 51 - ErrTransactionCoordinatorFenced KError = 52 - ErrTransactionalIDAuthorizationFailed KError = 53 - ErrSecurityDisabled KError = 54 - ErrOperationNotAttempted KError = 55 - ErrKafkaStorageError KError = 56 - ErrLogDirNotFound KError = 57 - ErrSASLAuthenticationFailed KError = 58 - ErrUnknownProducerID KError = 59 - ErrReassignmentInProgress KError = 60 - ErrDelegationTokenAuthDisabled KError = 61 - ErrDelegationTokenNotFound KError = 62 - ErrDelegationTokenOwnerMismatch KError = 63 - ErrDelegationTokenRequestNotAllowed KError = 64 - ErrDelegationTokenAuthorizationFailed KError = 65 - ErrDelegationTokenExpired KError = 66 - ErrInvalidPrincipalType KError = 67 - ErrNonEmptyGroup KError = 68 - ErrGroupIDNotFound KError = 69 - ErrFetchSessionIDNotFound KError = 70 - ErrInvalidFetchSessionEpoch KError = 71 - ErrListenerNotFound KError = 72 - ErrTopicDeletionDisabled KError = 73 - ErrFencedLeaderEpoch KError = 74 - ErrUnknownLeaderEpoch KError = 75 - ErrUnsupportedCompressionType KError = 76 - ErrStaleBrokerEpoch KError = 77 - ErrOffsetNotAvailable KError = 78 - ErrMemberIdRequired KError = 79 - ErrPreferredLeaderNotAvailable KError = 80 - ErrGroupMaxSizeReached KError = 81 - ErrFencedInstancedId KError = 82 - ErrEligibleLeadersNotAvailable KError = 83 - ErrElectionNotNeeded KError = 84 - ErrNoReassignmentInProgress KError = 85 - ErrGroupSubscribedToTopic KError = 86 - ErrInvalidRecord KError = 87 - ErrUnstableOffsetCommit KError = 88 - ErrThrottlingQuotaExceeded KError = 89 - ErrProducerFenced KError = 90 + ErrUnknown KError = -1 // Errors.UNKNOWN_SERVER_ERROR + ErrNoError KError = 0 // Errors.NONE + ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE + ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE + ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION + ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE + ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE + ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER + ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT + ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE + ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE + ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE + ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH + ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE + ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION + ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS + ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE + ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR + ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION + ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE + ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS + ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND + ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS + ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION + ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL + ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID + ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID + ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT + ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS + ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE + ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED + ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED + ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED + ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP + ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM + ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE + ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION + ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS + ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS + ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR + ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT + ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG + ErrNotController KError = 41 // Errors.NOT_CONTROLLER + ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST + ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT + ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION + ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER + ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER + ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH + ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE + ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING + ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT + ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS + ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED + ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED + ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED + ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED + ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR + ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND + ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED + ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID + ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS + ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED + ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND + ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH + ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED + ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED + ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED + ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE + ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP + ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND + ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND + ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH + ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND + ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED + ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH + ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH + ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE + ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH + ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE + ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED + ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE + ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED + ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID + ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE + ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED + ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS + ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC + ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD + ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT + ErrThrottlingQuotaExceeded KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED + ErrProducerFenced KError = 90 // Errors.PRODUCER_FENCED ) func (err KError) Error() string { @@ -302,7 +302,7 @@ func (err KError) Error() string { case ErrNetworkException: return "kafka server: The server disconnected before a response was received" case ErrOffsetsLoadInProgress: - return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition" + return "kafka server: The coordinator is still loading offsets and cannot currently process requests" case ErrConsumerCoordinatorNotAvailable: return "kafka server: Offset's topic has not yet been created" case ErrNotCoordinatorForConsumer: diff --git a/mockresponses.go b/mockresponses.go index 688835278..d09415b49 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1467,3 +1467,43 @@ func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeade } return res } + +// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder. +type MockInitProducerIDResponse struct { + producerID int64 + producerEpoch int16 + err KError + t TestReporter +} + +func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse { + return &MockInitProducerIDResponse{ + t: t, + } +} + +func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse { + m.producerID = int64(id) + return m +} + +func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse { + m.producerEpoch = int16(epoch) + return m +} + +func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse { + m.err = err + return m +} + +func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*InitProducerIDRequest) + res := &InitProducerIDResponse{ + Version: req.Version, + Err: m.err, + ProducerID: m.producerID, + ProducerEpoch: m.producerEpoch, + } + return res +} diff --git a/transaction_manager.go b/transaction_manager.go index 546528f26..ca7e13dab 100644 --- a/transaction_manager.go +++ b/transaction_manager.go @@ -569,9 +569,8 @@ func (t *transactionManager) initProducerId() (int64, int16, error) { return response.ProducerID, response.ProducerEpoch, false, nil } switch response.Err { - case ErrConsumerCoordinatorNotAvailable: - fallthrough - case ErrNotCoordinatorForConsumer: + // Retriable errors + case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress: if t.isTransactional() { _ = coordinator.Close() _ = t.client.RefreshTransactionCoordinator(t.transactionalID) diff --git a/transaction_manager_test.go b/transaction_manager_test.go index 3100a9743..6a9b04ab0 100644 --- a/transaction_manager_test.go +++ b/transaction_manager_test.go @@ -125,6 +125,53 @@ func TestTxnmgrInitProducerIdTxn(t *testing.T) { require.Equal(t, ProducerTxnFlagReady, txmng.status) } +// TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress +func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = true + config.Producer.Transaction.ID = "txid-group" + config.Version = V0_11_0_0 + config.Producer.RequiredAcks = WaitForAll + config.Net.MaxOpenRequests = 1 + + broker := NewMockBroker(t, 1) + defer broker.Close() + + broker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(broker.BrokerID()). + SetBroker(broker.Addr(), broker.BrokerID()), + "FindCoordinatorRequest": NewMockSequence( + NewMockFindCoordinatorResponse(t). + SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress), + NewMockFindCoordinatorResponse(t). + SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress), + NewMockFindCoordinatorResponse(t). + SetCoordinator(CoordinatorTransaction, "txid-group", broker), + ), + "InitProducerIDRequest": NewMockSequence( + NewMockInitProducerIDResponse(t). + SetError(ErrOffsetsLoadInProgress), + NewMockInitProducerIDResponse(t). + SetError(ErrOffsetsLoadInProgress), + NewMockInitProducerIDResponse(t). + SetProducerID(1). + SetProducerEpoch(0), + ), + }) + + client, err := NewClient([]string{broker.Addr()}, config) + require.NoError(t, err) + defer client.Close() + + txmng, err := newTransactionManager(config, client) + require.NoError(t, err) + + require.Equal(t, int64(1), txmng.producerID) + require.Equal(t, int16(0), txmng.producerEpoch) + require.Equal(t, ProducerTxnFlagReady, txmng.status) +} + func TestMaybeAddPartitionToCurrentTxn(t *testing.T) { type testCase struct { initialFlags ProducerTxnStatusFlag From 7155d51d84883025253f297233f95ba13683de33 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 26 Oct 2023 10:24:18 +0100 Subject: [PATCH 2/2] chore(ci): add kafka 3.6.0 to FVT and versions - add V3_6_0_0 to Sarama - bump default dockerfile KAFKA_VERSION to 3.6.0 - pin ubi-minimal version and digest and include in dependabot - pin apidiff cmd in that workflow - fix some linter warnings in Dockerfile - add USER 65534:65534 to Dockerfile for nobody user Signed-off-by: Dominic Evans --- .github/dependabot.yml | 10 +++++++++ .github/workflows/apidiff.yml | 2 +- .github/workflows/fvt-main.yml | 4 ++-- .github/workflows/fvt-pr.yml | 4 ++-- .github/workflows/fvt.yml | 3 ++- Dockerfile.kafka | 27 +++++++++++++---------- docker-compose.yml | 40 +++++++++++++++++----------------- entrypoint.sh | 19 +++++++++------- utils.go | 6 +++-- 9 files changed, 67 insertions(+), 48 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b87f347ea..b3737c5ca 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -1,6 +1,15 @@ --- version: 2 updates: + - package-ecosystem: docker + directory: "/Dockerfile.*" + schedule: + interval: daily + labels: + - "ignore-for-release" + commit-message: + prefix: chore(ci) + - package-ecosystem: github-actions directory: / open-pull-requests-limit: 2 @@ -10,6 +19,7 @@ updates: - "ignore-for-release" commit-message: prefix: chore(ci) + - package-ecosystem: gomod directory: / open-pull-requests-limit: 5 diff --git a/.github/workflows/apidiff.yml b/.github/workflows/apidiff.yml index fe6fa330f..b41c10a06 100644 --- a/.github/workflows/apidiff.yml +++ b/.github/workflows/apidiff.yml @@ -27,7 +27,7 @@ jobs: - name: Add GOBIN to PATH run: echo "$(go env GOPATH)/bin" >>$GITHUB_PATH - name: Install apidiff cmd - run: go install golang.org/x/exp/cmd/apidiff@latest + run: go install golang.org/x/exp/cmd/apidiff@v0.0.0-20231006140011-7918f672742d - name: Checkout base code uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 with: diff --git a/.github/workflows/fvt-main.yml b/.github/workflows/fvt-main.yml index c17982b56..e1be6a84b 100644 --- a/.github/workflows/fvt-main.yml +++ b/.github/workflows/fvt-main.yml @@ -17,7 +17,7 @@ jobs: fail-fast: false matrix: go-version: [1.21.x] - kafka-version: [1.0.2, 2.0.1, 2.2.2, 2.6.2, 2.8.2, 3.0.2, 3.3.2, 3.5.1] + kafka-version: [1.0.2, 2.0.1, 2.2.2, 2.6.2, 2.8.2, 3.0.2, 3.3.2, 3.6.0] include: - kafka-version: 1.0.2 scala-version: 2.11 @@ -33,7 +33,7 @@ jobs: scala-version: 2.12 - kafka-version: 3.3.2 scala-version: 2.13 - - kafka-version: 3.5.1 + - kafka-version: 3.6.0 scala-version: 2.13 uses: ./.github/workflows/fvt.yml with: diff --git a/.github/workflows/fvt-pr.yml b/.github/workflows/fvt-pr.yml index 023c5b269..13e55ea14 100644 --- a/.github/workflows/fvt-pr.yml +++ b/.github/workflows/fvt-pr.yml @@ -16,13 +16,13 @@ jobs: fail-fast: false matrix: go-version: [1.21.x] - kafka-version: [1.0.2, 2.6.2, 3.5.1] + kafka-version: [1.0.2, 2.6.2, 3.6.0] include: - kafka-version: 1.0.2 scala-version: 2.11 - kafka-version: 2.6.2 scala-version: 2.12 - - kafka-version: 3.5.1 + - kafka-version: 3.6.0 scala-version: 2.13 uses: ./.github/workflows/fvt.yml with: diff --git a/.github/workflows/fvt.yml b/.github/workflows/fvt.yml index 4a570bfc0..f9d0afb50 100644 --- a/.github/workflows/fvt.yml +++ b/.github/workflows/fvt.yml @@ -9,7 +9,7 @@ on: kafka-version: required: false type: string - default: 3.5.1 + default: 3.6.0 scala-version: required: false type: string @@ -38,6 +38,7 @@ jobs: builder: ${{ steps.buildx.outputs.name }} files: docker-compose.yml load: true + targets: kafka-1 set: | *.cache-from=type=gha,scope=${{ github.workflow }} *.cache-to=type=gha,scope=${{ github.workflow }},mode=max diff --git a/Dockerfile.kafka b/Dockerfile.kafka index b11d899b9..186c2eb18 100644 --- a/Dockerfile.kafka +++ b/Dockerfile.kafka @@ -1,26 +1,27 @@ -FROM registry.access.redhat.com/ubi8/ubi-minimal:latest +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.8@sha256:b93deceb59a58588d5b16429fc47f98920f84740a1f2ed6454e33275f0701b59 USER root -RUN microdnf update \ - && microdnf install curl gzip java-11-openjdk-headless tar tzdata-java \ - && microdnf reinstall tzdata \ - && microdnf clean all +RUN microdnf update -y \ + && microdnf install -y curl gzip java-11-openjdk-headless tar tzdata-java \ + && microdnf reinstall -y tzdata \ + && microdnf clean all ENV JAVA_HOME=/usr/lib/jvm/jre-11 # https://docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html # Ensure Java doesn't cache any dns results RUN cd /etc/java/java-11-openjdk/*/conf/security \ - && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ - && echo 'networkaddress.cache.ttl=0' >> java.security \ - && echo 'networkaddress.cache.negative.ttl=0' >> java.security + && sed -e '/networkaddress.cache.ttl/d' -e '/networkaddress.cache.negative.ttl/d' -i java.security \ + && echo 'networkaddress.cache.ttl=0' >> java.security \ + && echo 'networkaddress.cache.negative.ttl=0' >> java.security ARG SCALA_VERSION="2.13" -ARG KAFKA_VERSION="3.5.1" +ARG KAFKA_VERSION="3.6.0" -# https://github.com/apache/kafka/blob/53eeaad946cd053e9eb1a762972d4efeacb8e4fc/tests/docker/Dockerfile#L65-L69 +# https://github.com/apache/kafka/blob/9989b68d0d38c8f1357f78bf9d53a58c1476188d/tests/docker/Dockerfile#L46-L72 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" +SHELL ["/bin/bash", "-o", "pipefail", "-c"] RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ && chmod a+rw "/opt/kafka-${KAFKA_VERSION}" \ && curl -s "$KAFKA_MIRROR/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${KAFKA_VERSION}" @@ -28,8 +29,8 @@ RUN mkdir -p "/opt/kafka-${KAFKA_VERSION}" \ # older kafka versions depend upon jaxb-api being bundled with the JDK, but it # was removed from Java 11 so work around that by including it in the kafka # libs dir regardless -RUN cd /tmp \ - && curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ +WORKDIR /tmp +RUN curl -sLO "https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api/2.3.0/jaxb-api-2.3.0.jar" \ && for DIR in /opt/kafka-*; do cp -v jaxb-api-2.3.0.jar $DIR/libs/ ; done \ && rm -f jaxb-api-2.3.0.jar @@ -41,4 +42,6 @@ RUN sed -e "s/JAVA_MAJOR_VERSION=.*/JAVA_MAJOR_VERSION=${JAVA_MAJOR_VERSION}/" - COPY entrypoint.sh / +USER 65534:65534 + ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker-compose.yml b/docker-compose.yml index 4fdf1a517..e916416d5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,18 +38,18 @@ services: ZOO_4LW_COMMANDS_WHITELIST: 'mntr,conf,ruok' kafka-1: hostname: 'kafka-1' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-1:9091', ] @@ -64,7 +64,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091' @@ -83,18 +83,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-2: hostname: 'kafka-2' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-2:9091', ] @@ -109,7 +109,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092' @@ -128,18 +128,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-3: hostname: 'kafka-3' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-3:9091', ] @@ -154,7 +154,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093' @@ -173,18 +173,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-4: hostname: 'kafka-4' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-4:9091', ] @@ -199,7 +199,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094' @@ -218,18 +218,18 @@ services: KAFKA_JVM_PERFORMANCE_OPTS: "-XX:+IgnoreUnrecognizedVMOptions" kafka-5: hostname: 'kafka-5' - image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.5.1}' + image: 'sarama/fv-kafka-${KAFKA_VERSION:-3.6.0}' build: context: . dockerfile: Dockerfile.kafka args: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} SCALA_VERSION: ${SCALA_VERSION:-2.13} healthcheck: test: [ 'CMD', - '/opt/kafka-${KAFKA_VERSION:-3.5.1}/bin/kafka-broker-api-versions.sh', + '/opt/kafka-${KAFKA_VERSION:-3.6.0}/bin/kafka-broker-api-versions.sh', '--bootstrap-server', 'kafka-5:9091', ] @@ -244,7 +244,7 @@ services: - toxiproxy restart: always environment: - KAFKA_VERSION: ${KAFKA_VERSION:-3.5.1} + KAFKA_VERSION: ${KAFKA_VERSION:-3.6.0} KAFKA_CFG_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' KAFKA_CFG_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095' KAFKA_CFG_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095' diff --git a/entrypoint.sh b/entrypoint.sh index 7f405f92d..9fe9a44b1 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,6 +1,9 @@ #!/bin/bash -KAFKA_VERSION="${KAFKA_VERSION:-3.5.1}" +set -eu +set -o pipefail + +KAFKA_VERSION="${KAFKA_VERSION:-3.6.0}" KAFKA_HOME="/opt/kafka-${KAFKA_VERSION}" if [ ! -d "${KAFKA_HOME}" ]; then @@ -10,19 +13,19 @@ fi cd "${KAFKA_HOME}" || exit 1 -# discard all empty/commented lines -sed -e '/^#/d' -e '/^$/d' -i".orig" config/server.properties +# discard all empty/commented lines from default config and copy to /tmp +sed -e '/^#/d' -e '/^$/d' config/server.properties >/tmp/server.properties -echo "########################################################################" >>config/server.properties +echo "########################################################################" >>/tmp/server.properties # emulate kafka_configure_from_environment_variables from bitnami/bitnami-docker-kafka for var in "${!KAFKA_CFG_@}"; do key="$(echo "$var" | sed -e 's/^KAFKA_CFG_//g' -e 's/_/\./g' -e 's/.*/\L&/')" - sed -e '/^'$key'/d' -i"" config/server.properties + sed -e '/^'$key'/d' -i"" /tmp/server.properties value="${!var}" - echo "$key=$value" >>config/server.properties + echo "$key=$value" >>/tmp/server.properties done -sort config/server.properties +sort /tmp/server.properties -exec bin/kafka-server-start.sh config/server.properties +exec bin/kafka-server-start.sh /tmp/server.properties diff --git a/utils.go b/utils.go index fe5f0a52f..feadc0065 100644 --- a/utils.go +++ b/utils.go @@ -198,6 +198,7 @@ var ( V3_4_1_0 = newKafkaVersion(3, 4, 1, 0) V3_5_0_0 = newKafkaVersion(3, 5, 0, 0) V3_5_1_0 = newKafkaVersion(3, 5, 1, 0) + V3_6_0_0 = newKafkaVersion(3, 6, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, @@ -258,9 +259,10 @@ var ( V3_4_1_0, V3_5_0_0, V3_5_1_0, + V3_6_0_0, } MinVersion = V0_8_2_0 - MaxVersion = V3_5_1_0 + MaxVersion = V3_6_0_0 DefaultVersion = V2_1_0_0 // reduced set of protocol versions to matrix test @@ -275,8 +277,8 @@ var ( V2_6_2_0, V2_8_2_0, V3_1_2_0, - V3_2_3_0, V3_3_2_0, + V3_6_0_0, } )