From ec78a0e6dc7e75696a63237c63792fc841b57784 Mon Sep 17 00:00:00 2001 From: dgyimesi Date: Fri, 3 Jun 2022 17:21:50 +0200 Subject: [PATCH 1/5] Revert topic when metadata fetching failes due to authorization. --- src/cluster/index.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/cluster/index.js b/src/cluster/index.js index cdbbc87f2..f0c294ee1 100644 --- a/src/cluster/index.js +++ b/src/cluster/index.js @@ -229,7 +229,11 @@ module.exports = class Cluster { try { await this.refreshMetadata() } catch (e) { - if (e.type === 'INVALID_TOPIC_EXCEPTION' || e.type === 'UNKNOWN_TOPIC_OR_PARTITION') { + if ( + e.type === 'INVALID_TOPIC_EXCEPTION' || + e.type === 'UNKNOWN_TOPIC_OR_PARTITION' || + e.type === 'TOPIC_AUTHORIZATION_FAILED' + ) { this.targetTopics = previousTopics } From 61051b12300e7e71ce35b39d92cfa2966c3a5853 Mon Sep 17 00:00:00 2001 From: dgyimesi Date: Fri, 3 Jun 2022 17:35:03 +0200 Subject: [PATCH 2/5] Add repro as test. --- src/admin/__tests__/createAcls.spec.js | 61 ++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/src/admin/__tests__/createAcls.spec.js b/src/admin/__tests__/createAcls.spec.js index ad57e1271..22c5ef6a4 100644 --- a/src/admin/__tests__/createAcls.spec.js +++ b/src/admin/__tests__/createAcls.spec.js @@ -1,4 +1,5 @@ const createAdmin = require('../index') +const createProducer = require('../../producer/index') const { secureRandom, @@ -13,7 +14,7 @@ const ACL_OPERATION_TYPES = require('../../protocol/aclOperationTypes') const ACL_PERMISSION_TYPES = require('../../protocol/aclPermissionTypes') const RESOURCE_PATTERN_TYPES = require('../../protocol/resourcePatternTypes') -const createSASLAdminClientForUser = ({ username, password }) => { +const createSASLClientForUser = createClient => ({ username, password }) => { const saslConnectionOpts = () => { return Object.assign(sslConnectionOpts(), { port: 9094, @@ -25,7 +26,7 @@ const createSASLAdminClientForUser = ({ username, password }) => { }) } - const admin = createAdmin({ + const client = createClient({ logger: newLogger(), cluster: createCluster( { @@ -36,9 +37,12 @@ const createSASLAdminClientForUser = ({ username, password }) => { ), }) - return admin + return client } +const createSASLAdminClientForUser = createSASLClientForUser(createAdmin) +const createSASLProducerClientForUser = createSASLClientForUser(createProducer) + describe('Admin', () => { let admin @@ -247,5 +251,56 @@ describe('Admin', () => { await expect(admin.fetchTopicMetadata({ topics: [topicName] })).resolves.toBeTruthy() }) + + test('can produce to allowed topic after failing to produce to not-allowed topic', async () => { + const allowedTopic = `allowed-${secureRandom()}` + const notAllowedTopic = `disallowed-${secureRandom()}` + + admin = createSASLAdminClientForUser({ username: 'test', password: 'testtest' }) + + await admin.connect() + await admin.createTopics({ + waitForLeaders: true, + topics: [allowedTopic, notAllowedTopic].map(topic => ({ topic, numPartitions: 1 })), + }) + await admin.createAcls({ + acl: [ + { + resourceType: ACL_RESOURCE_TYPES.TOPIC, + resourceName: notAllowedTopic, + resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL, + principal: 'User:bob', + host: '*', + operation: ACL_OPERATION_TYPES.WRITE, + permissionType: ACL_PERMISSION_TYPES.DENY, + }, + { + resourceType: ACL_RESOURCE_TYPES.TOPIC, + resourceName: allowedTopic, + resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL, + principal: 'User:bob', + host: '*', + operation: ACL_OPERATION_TYPES.WRITE, + permissionType: ACL_PERMISSION_TYPES.ALLOW, + }, + ], + }) + + await admin.disconnect() + const producer = createSASLProducerClientForUser({ username: 'bob', password: 'bobbob' }) + await producer.connect() + + await expect( + producer.send({ topic: allowedTopic, messages: [{ value: 'hello' }] }) + ).resolves.not.toBeUndefined() + await expect( + producer.send({ topic: notAllowedTopic, messages: [{ value: 'whoops' }] }) + ).rejects.not.toBeUndefined() + await expect( + producer.send({ topic: allowedTopic, messages: [{ value: 'world' }] }) + ).resolves.not.toBeUndefined() + + await producer.disconnect() + }) }) }) From 98149937841ccfdf523551d3bf03c19d2c198f78 Mon Sep 17 00:00:00 2001 From: arszen123 Date: Thu, 9 Jun 2022 16:59:11 +0200 Subject: [PATCH 3/5] Fixed ending a transaction in invalid state --- src/producer/eosManager/index.js | 42 +++++--- src/producer/eosManager/index.spec.js | 46 +++++++- src/producer/index.spec.js | 145 ++++++++++++++++++++++++++ 3 files changed, 219 insertions(+), 14 deletions(-) diff --git a/src/producer/eosManager/index.js b/src/producer/eosManager/index.js index e9d77d5c6..3955ef1ab 100644 --- a/src/producer/eosManager/index.js +++ b/src/producer/eosManager/index.js @@ -1,6 +1,6 @@ const createRetry = require('../../retry') const Lock = require('../../utils/lock') -const { KafkaJSNonRetriableError } = require('../../errors') +const { KafkaJSNonRetriableError, KafkaJSProtocolError } = require('../../errors') const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes') const createStateMachine = require('./transactionStateMachine') const { INT_32_MAX_VALUE } = require('../../constants') @@ -268,12 +268,20 @@ module.exports = ({ stateMachine.transitionTo(STATES.COMMITTING) const broker = await findTransactionCoordinator() - await broker.endTxn({ - producerId, - producerEpoch, - transactionalId, - transactionResult: true, - }) + try { + await broker.endTxn({ + producerId, + producerEpoch, + transactionalId, + transactionResult: true, + }) + } catch (e) { + if (e instanceof KafkaJSProtocolError && e.type === 'INVALID_TXN_STATE') { + logger.debug('The producer attempted a transactional operation in an invalid state') + } else { + throw e + } + } stateMachine.transitionTo(STATES.READY) }, @@ -286,12 +294,20 @@ module.exports = ({ stateMachine.transitionTo(STATES.ABORTING) const broker = await findTransactionCoordinator() - await broker.endTxn({ - producerId, - producerEpoch, - transactionalId, - transactionResult: false, - }) + try { + await broker.endTxn({ + producerId, + producerEpoch, + transactionalId, + transactionResult: false, + }) + } catch (e) { + if (e instanceof KafkaJSProtocolError && e.type === 'INVALID_TXN_STATE') { + logger.debug('The producer attempted a transactional operation in an invalid state') + } else { + throw e + } + } stateMachine.transitionTo(STATES.READY) }, diff --git a/src/producer/eosManager/index.spec.js b/src/producer/eosManager/index.spec.js index 823e5daf5..5a94325e2 100644 --- a/src/producer/eosManager/index.spec.js +++ b/src/producer/eosManager/index.spec.js @@ -1,6 +1,6 @@ const { newLogger } = require('testHelpers') const createEosManager = require('.') -const { KafkaJSNonRetriableError } = require('../../errors') +const { KafkaJSNonRetriableError, KafkaJSProtocolError } = require('../../errors') const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes') describe('Producer > eosManager', () => { @@ -308,6 +308,50 @@ describe('Producer > eosManager', () => { topics, }) }) + + test('aborting transaction in an invalid state should not throw', async () => { + broker.endTxn.mockRejectedValueOnce( + new KafkaJSProtocolError({ + type: 'INVALID_TXN_STATE', + }) + ) + + const eosManager = createEosManager({ + logger: newLogger(), + cluster, + transactionTimeout: 30000, + transactional: true, + transactionalId, + }) + + await eosManager.initProducerId() + await eosManager.beginTransaction() + + await expect(eosManager.abort()).resolves.not.toThrow() + expect(eosManager.isInTransaction()).toEqual(false) + }) + + test('commiting transaction in an invalid state should not throw', async () => { + broker.endTxn.mockRejectedValueOnce( + new KafkaJSProtocolError({ + type: 'INVALID_TXN_STATE', + }) + ) + + const eosManager = createEosManager({ + logger: newLogger(), + cluster, + transactionTimeout: 30000, + transactional: true, + transactionalId, + }) + + await eosManager.initProducerId() + await eosManager.beginTransaction() + + await expect(eosManager.commit()).resolves.not.toThrow() + expect(eosManager.isInTransaction()).toEqual(false) + }) }) describe('if transactional=false', () => { diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 6b5428e44..3a9ab4607 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -972,5 +972,150 @@ describe('Producer', () => { ]) }) }) + + describe('invalid transaction state', () => { + testIfKafkaAtLeast_0_11( + 'does not throw an error when aborting transaction in invalid state', + async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.abort()).toResolve() + } + ) + + testIfKafkaAtLeast_0_11( + 'does not throw an error when commiting transaction in invalid state', + async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.commit()).toResolve() + } + ) + + testIfKafkaAtLeast_0_11( + 'allows createing transaction when the previous was aborted in an invalid state', + async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.abort()).toResolve() + await expect(producer.transaction()).toResolve() + } + ) + + testIfKafkaAtLeast_0_11( + 'allows createing transaction when the previous was commited in an invalid state', + async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.commit()).toResolve() + await expect(producer.transaction()).toResolve() + } + ) + + testIfKafkaAtLeast_0_11( + 'the transaction that is created after the previous was ended in an invalid state should work', + async () => { + const partition = 0 + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const invalidTransaction = await producer.transaction() + await invalidTransaction.abort() + + const transaction = await producer.transaction() + await transaction.send({ + topic: topicName, + messages: [ + { + value: 'value', + partition, + }, + { + value: 'value', + partition, + }, + ], + }) + + await transaction.commit() + + const [topicOffset] = await cluster.fetchTopicsOffset([ + { topic: topicName, partitions: [{ partition }] }, + ]) + + expect(topicOffset).toEqual({ + topic: topicName, + partitions: expect.arrayContaining([ + { + partition, + offset: '3', + }, + ]), + }) + } + ) + }) }) }) From a5d740537d2d3e77547712d122390305e47f90b0 Mon Sep 17 00:00:00 2001 From: arszen123 Date: Fri, 10 Jun 2022 13:48:36 +0200 Subject: [PATCH 4/5] Added transaction ongoing check before sending EndTxn --- src/producer/eosManager/index.js | 80 ++++++++++++++-------- src/producer/eosManager/index.spec.js | 64 +++++++++++++----- src/producer/index.spec.js | 95 +++++++++++++-------------- 3 files changed, 146 insertions(+), 93 deletions(-) diff --git a/src/producer/eosManager/index.js b/src/producer/eosManager/index.js index 3955ef1ab..3010db785 100644 --- a/src/producer/eosManager/index.js +++ b/src/producer/eosManager/index.js @@ -1,6 +1,6 @@ const createRetry = require('../../retry') const Lock = require('../../utils/lock') -const { KafkaJSNonRetriableError, KafkaJSProtocolError } = require('../../errors') +const { KafkaJSNonRetriableError } = require('../../errors') const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes') const createStateMachine = require('./transactionStateMachine') const { INT_32_MAX_VALUE } = require('../../constants') @@ -75,10 +75,16 @@ module.exports = ({ */ let transactionTopicPartitions = {} + /** + * Offsets have been added to the transaction + */ + let hasOffsetsAddedToTransaction = false + const stateMachine = createStateMachine({ logger }) stateMachine.on('transition', ({ to }) => { if (to === STATES.READY) { transactionTopicPartitions = {} + hasOffsetsAddedToTransaction = false } }) @@ -95,6 +101,22 @@ module.exports = ({ } } + /** + * A transaction is ongoing when offsets or partitions added to it + * + * @returns {boolean} + */ + const isOngoing = () => { + return ( + hasOffsetsAddedToTransaction || + Object.entries(transactionTopicPartitions).some(([, partitions]) => { + return Object.entries(partitions).some( + ([, isPartitionAddedToTransaction]) => isPartitionAddedToTransaction + ) + }) + ) + } + const eosManager = stateMachine.createGuarded( { /** @@ -267,22 +289,21 @@ module.exports = ({ transactionalGuard() stateMachine.transitionTo(STATES.COMMITTING) - const broker = await findTransactionCoordinator() - try { - await broker.endTxn({ - producerId, - producerEpoch, - transactionalId, - transactionResult: true, - }) - } catch (e) { - if (e instanceof KafkaJSProtocolError && e.type === 'INVALID_TXN_STATE') { - logger.debug('The producer attempted a transactional operation in an invalid state') - } else { - throw e - } + if (!isOngoing()) { + logger.debug('No partitions or offsets registered, not sending EndTxn') + + stateMachine.transitionTo(STATES.READY) + return } + const broker = await findTransactionCoordinator() + await broker.endTxn({ + producerId, + producerEpoch, + transactionalId, + transactionResult: true, + }) + stateMachine.transitionTo(STATES.READY) }, @@ -293,22 +314,21 @@ module.exports = ({ transactionalGuard() stateMachine.transitionTo(STATES.ABORTING) - const broker = await findTransactionCoordinator() - try { - await broker.endTxn({ - producerId, - producerEpoch, - transactionalId, - transactionResult: false, - }) - } catch (e) { - if (e instanceof KafkaJSProtocolError && e.type === 'INVALID_TXN_STATE') { - logger.debug('The producer attempted a transactional operation in an invalid state') - } else { - throw e - } + if (!isOngoing()) { + logger.debug('No partitions or offsets registered, not sending EndTxn') + + stateMachine.transitionTo(STATES.READY) + return } + const broker = await findTransactionCoordinator() + await broker.endTxn({ + producerId, + producerEpoch, + transactionalId, + transactionResult: false, + }) + stateMachine.transitionTo(STATES.READY) }, @@ -369,6 +389,8 @@ module.exports = ({ groupId: consumerGroupId, }) + hasOffsetsAddedToTransaction = true + let groupCoordinator = await cluster.findGroupCoordinator({ groupId: consumerGroupId, coordinatorType: COORDINATOR_TYPES.GROUP, diff --git a/src/producer/eosManager/index.spec.js b/src/producer/eosManager/index.spec.js index 5a94325e2..a6cf6a813 100644 --- a/src/producer/eosManager/index.spec.js +++ b/src/producer/eosManager/index.spec.js @@ -1,6 +1,6 @@ const { newLogger } = require('testHelpers') const createEosManager = require('.') -const { KafkaJSNonRetriableError, KafkaJSProtocolError } = require('../../errors') +const { KafkaJSNonRetriableError } = require('../../errors') const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes') describe('Producer > eosManager', () => { @@ -189,6 +189,8 @@ describe('Producer > eosManager', () => { }) test('committing a transaction', async () => { + const consumerGroupId = 'consumer-group-id' + const topics = [{ topic: 'test-topic-1', partitions: [{ partition: 0 }] }] const eosManager = createEosManager({ logger: newLogger(), cluster, @@ -211,6 +213,26 @@ describe('Producer > eosManager', () => { await eosManager.beginTransaction() cluster.findGroupCoordinator.mockClear() + await eosManager.addPartitionsToTransaction(topics) + await eosManager.commit() + + expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ + groupId: transactionalId, + coordinatorType: COORDINATOR_TYPES.TRANSACTION, + }) + expect(broker.endTxn).toHaveBeenCalledWith({ + producerId, + producerEpoch, + transactionalId, + transactionResult: true, + }) + + await eosManager.beginTransaction() + + cluster.findGroupCoordinator.mockClear() + broker.endTxn.mockClear() + + await eosManager.sendOffsets({ consumerGroupId, topics }) await eosManager.commit() expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ @@ -226,6 +248,8 @@ describe('Producer > eosManager', () => { }) test('aborting a transaction', async () => { + const consumerGroupId = 'consumer-group-id' + const topics = [{ topic: 'test-topic-1', partitions: [{ partition: 0 }] }] const eosManager = createEosManager({ logger: newLogger(), cluster, @@ -248,6 +272,26 @@ describe('Producer > eosManager', () => { await eosManager.beginTransaction() cluster.findGroupCoordinator.mockClear() + await eosManager.addPartitionsToTransaction(topics) + await eosManager.abort() + + expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ + groupId: transactionalId, + coordinatorType: COORDINATOR_TYPES.TRANSACTION, + }) + expect(broker.endTxn).toHaveBeenCalledWith({ + producerId, + producerEpoch, + transactionalId, + transactionResult: false, + }) + + await eosManager.beginTransaction() + + cluster.findGroupCoordinator.mockClear() + broker.endTxn.mockClear() + + await eosManager.sendOffsets({ consumerGroupId, topics }) await eosManager.abort() expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ @@ -309,13 +353,7 @@ describe('Producer > eosManager', () => { }) }) - test('aborting transaction in an invalid state should not throw', async () => { - broker.endTxn.mockRejectedValueOnce( - new KafkaJSProtocolError({ - type: 'INVALID_TXN_STATE', - }) - ) - + test('aborting transaction when no operation have been made should not send EndTxn', async () => { const eosManager = createEosManager({ logger: newLogger(), cluster, @@ -329,15 +367,10 @@ describe('Producer > eosManager', () => { await expect(eosManager.abort()).resolves.not.toThrow() expect(eosManager.isInTransaction()).toEqual(false) + expect(broker.endTxn).not.toBeCalled() }) - test('commiting transaction in an invalid state should not throw', async () => { - broker.endTxn.mockRejectedValueOnce( - new KafkaJSProtocolError({ - type: 'INVALID_TXN_STATE', - }) - ) - + test('commiting transaction when no operation have been made should not send EndTxn', async () => { const eosManager = createEosManager({ logger: newLogger(), cluster, @@ -351,6 +384,7 @@ describe('Producer > eosManager', () => { await expect(eosManager.commit()).resolves.not.toThrow() expect(eosManager.isInTransaction()).toEqual(false) + expect(broker.endTxn).not.toBeCalled() }) }) diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 3a9ab4607..2f0b60a99 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -973,53 +973,47 @@ describe('Producer', () => { }) }) - describe('invalid transaction state', () => { - testIfKafkaAtLeast_0_11( - 'does not throw an error when aborting transaction in invalid state', - async () => { - const cluster = createCluster({ - createPartitioner: createModPartitioner, - }) + describe('without operations', () => { + testIfKafkaAtLeast_0_11('does not throw an error when aborting transaction', async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) - await createTopic({ topic: topicName }) + await createTopic({ topic: topicName }) - producer = createProducer({ - cluster, - logger: newLogger(), - transactionalId, - }) - await producer.connect() + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() - const transaction = await producer.transaction() + const transaction = await producer.transaction() - await expect(transaction.abort()).toResolve() - } - ) + await expect(transaction.abort()).toResolve() + }) - testIfKafkaAtLeast_0_11( - 'does not throw an error when commiting transaction in invalid state', - async () => { - const cluster = createCluster({ - createPartitioner: createModPartitioner, - }) + testIfKafkaAtLeast_0_11('does not throw an error when commiting transaction', async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) - await createTopic({ topic: topicName }) + await createTopic({ topic: topicName }) - producer = createProducer({ - cluster, - logger: newLogger(), - transactionalId, - }) - await producer.connect() + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() - const transaction = await producer.transaction() + const transaction = await producer.transaction() - await expect(transaction.commit()).toResolve() - } - ) + await expect(transaction.commit()).toResolve() + }) testIfKafkaAtLeast_0_11( - 'allows createing transaction when the previous was aborted in an invalid state', + 'allows createing transaction when the previous was aborted without any operations made in it', async () => { const cluster = createCluster({ createPartitioner: createModPartitioner, @@ -1042,7 +1036,7 @@ describe('Producer', () => { ) testIfKafkaAtLeast_0_11( - 'allows createing transaction when the previous was commited in an invalid state', + 'allows createing transaction when the previous was commited without any operations made in it', async () => { const cluster = createCluster({ createPartitioner: createModPartitioner, @@ -1065,9 +1059,10 @@ describe('Producer', () => { ) testIfKafkaAtLeast_0_11( - 'the transaction that is created after the previous was ended in an invalid state should work', + "transaction that is created after a transaction that hasn't made any operations should work", async () => { const partition = 0 + const retry = createRetrier({ retries: 5 }) const cluster = createCluster({ createPartitioner: createModPartitioner, }) @@ -1101,18 +1096,20 @@ describe('Producer', () => { await transaction.commit() - const [topicOffset] = await cluster.fetchTopicsOffset([ - { topic: topicName, partitions: [{ partition }] }, - ]) + await retry(async () => { + const [topicOffset] = await cluster.fetchTopicsOffset([ + { topic: topicName, partitions: [{ partition }] }, + ]) - expect(topicOffset).toEqual({ - topic: topicName, - partitions: expect.arrayContaining([ - { - partition, - offset: '3', - }, - ]), + expect(topicOffset).toEqual({ + topic: topicName, + partitions: expect.arrayContaining([ + { + partition, + offset: '3', + }, + ]), + }) }) } ) From a4b190ac584b0c8567cb084068417b647f0f9a2d Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Mon, 27 Jun 2022 12:47:03 +0200 Subject: [PATCH 5/5] Let consumer crash when no brokers are available The fetch manager rebalancing mechanism caused an infinite loop when there were no brokers available, causing the consumer to never become aware of any connection issues. Fixes #1384 --- src/consumer/fetchManager.js | 2 +- src/consumer/fetchManager.spec.js | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/consumer/fetchManager.js b/src/consumer/fetchManager.js index 7ec46eac1..309a0986e 100644 --- a/src/consumer/fetchManager.js +++ b/src/consumer/fetchManager.js @@ -38,7 +38,7 @@ const createFetchManager = ({ const current = getNodeIds() const hasChanged = nodeIds.length !== current.length || nodeIds.some(nodeId => !current.includes(nodeId)) - if (hasChanged) { + if (hasChanged && current.length !== 0) { throw new KafkaJSFetcherRebalanceError() } } diff --git a/src/consumer/fetchManager.spec.js b/src/consumer/fetchManager.spec.js index 5438a5410..9dbf26df8 100644 --- a/src/consumer/fetchManager.spec.js +++ b/src/consumer/fetchManager.spec.js @@ -4,6 +4,7 @@ const createFetchManager = require('./fetchManager') const Batch = require('./batch') const { newLogger } = require('testHelpers') const waitFor = require('../utils/waitFor') +const { KafkaJSNonRetriableError } = require('../errors') describe('FetchManager', () => { let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize @@ -72,4 +73,25 @@ describe('FetchManager', () => { fetchers = fetchManager.getFetchers() expect(fetchers).toHaveLength(3) }) + + describe('when all brokers have become unavailable', () => { + it('should not rebalance and let the error bubble up', async () => { + const fetchMock = jest.fn().mockImplementation(async nodeId => { + if (!getNodeIds().includes(nodeId)) { + throw new KafkaJSNonRetriableError('Node not found') + } + + return fetch(nodeId) + }) + getNodeIds.mockImplementation(() => seq(1)) + + fetchManager = createTestFetchManager({ concurrency: 1, fetch: fetchMock }) + const fetchManagerPromise = fetchManager.start() + + expect(fetchManager.getFetchers()).toHaveLength(1) + + getNodeIds.mockImplementation(() => seq(0)) + await expect(fetchManagerPromise).rejects.toThrow('Node not found') + }) + }) })