diff --git a/src/admin/__tests__/createAcls.spec.js b/src/admin/__tests__/createAcls.spec.js index ad57e1271..22c5ef6a4 100644 --- a/src/admin/__tests__/createAcls.spec.js +++ b/src/admin/__tests__/createAcls.spec.js @@ -1,4 +1,5 @@ const createAdmin = require('../index') +const createProducer = require('../../producer/index') const { secureRandom, @@ -13,7 +14,7 @@ const ACL_OPERATION_TYPES = require('../../protocol/aclOperationTypes') const ACL_PERMISSION_TYPES = require('../../protocol/aclPermissionTypes') const RESOURCE_PATTERN_TYPES = require('../../protocol/resourcePatternTypes') -const createSASLAdminClientForUser = ({ username, password }) => { +const createSASLClientForUser = createClient => ({ username, password }) => { const saslConnectionOpts = () => { return Object.assign(sslConnectionOpts(), { port: 9094, @@ -25,7 +26,7 @@ const createSASLAdminClientForUser = ({ username, password }) => { }) } - const admin = createAdmin({ + const client = createClient({ logger: newLogger(), cluster: createCluster( { @@ -36,9 +37,12 @@ const createSASLAdminClientForUser = ({ username, password }) => { ), }) - return admin + return client } +const createSASLAdminClientForUser = createSASLClientForUser(createAdmin) +const createSASLProducerClientForUser = createSASLClientForUser(createProducer) + describe('Admin', () => { let admin @@ -247,5 +251,56 @@ describe('Admin', () => { await expect(admin.fetchTopicMetadata({ topics: [topicName] })).resolves.toBeTruthy() }) + + test('can produce to allowed topic after failing to produce to not-allowed topic', async () => { + const allowedTopic = `allowed-${secureRandom()}` + const notAllowedTopic = `disallowed-${secureRandom()}` + + admin = createSASLAdminClientForUser({ username: 'test', password: 'testtest' }) + + await admin.connect() + await admin.createTopics({ + waitForLeaders: true, + topics: [allowedTopic, notAllowedTopic].map(topic => ({ topic, numPartitions: 1 })), + }) + await admin.createAcls({ + acl: [ + { + resourceType: ACL_RESOURCE_TYPES.TOPIC, + resourceName: notAllowedTopic, + resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL, + principal: 'User:bob', + host: '*', + operation: ACL_OPERATION_TYPES.WRITE, + permissionType: ACL_PERMISSION_TYPES.DENY, + }, + { + resourceType: ACL_RESOURCE_TYPES.TOPIC, + resourceName: allowedTopic, + resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL, + principal: 'User:bob', + host: '*', + operation: ACL_OPERATION_TYPES.WRITE, + permissionType: ACL_PERMISSION_TYPES.ALLOW, + }, + ], + }) + + await admin.disconnect() + const producer = createSASLProducerClientForUser({ username: 'bob', password: 'bobbob' }) + await producer.connect() + + await expect( + producer.send({ topic: allowedTopic, messages: [{ value: 'hello' }] }) + ).resolves.not.toBeUndefined() + await expect( + producer.send({ topic: notAllowedTopic, messages: [{ value: 'whoops' }] }) + ).rejects.not.toBeUndefined() + await expect( + producer.send({ topic: allowedTopic, messages: [{ value: 'world' }] }) + ).resolves.not.toBeUndefined() + + await producer.disconnect() + }) }) }) diff --git a/src/cluster/index.js b/src/cluster/index.js index cdbbc87f2..f0c294ee1 100644 --- a/src/cluster/index.js +++ b/src/cluster/index.js @@ -229,7 +229,11 @@ module.exports = class Cluster { try { await this.refreshMetadata() } catch (e) { - if (e.type === 'INVALID_TOPIC_EXCEPTION' || e.type === 'UNKNOWN_TOPIC_OR_PARTITION') { + if ( + e.type === 'INVALID_TOPIC_EXCEPTION' || + e.type === 'UNKNOWN_TOPIC_OR_PARTITION' || + e.type === 'TOPIC_AUTHORIZATION_FAILED' + ) { this.targetTopics = previousTopics } diff --git a/src/consumer/fetchManager.js b/src/consumer/fetchManager.js index 7ec46eac1..309a0986e 100644 --- a/src/consumer/fetchManager.js +++ b/src/consumer/fetchManager.js @@ -38,7 +38,7 @@ const createFetchManager = ({ const current = getNodeIds() const hasChanged = nodeIds.length !== current.length || nodeIds.some(nodeId => !current.includes(nodeId)) - if (hasChanged) { + if (hasChanged && current.length !== 0) { throw new KafkaJSFetcherRebalanceError() } } diff --git a/src/consumer/fetchManager.spec.js b/src/consumer/fetchManager.spec.js index 5438a5410..9dbf26df8 100644 --- a/src/consumer/fetchManager.spec.js +++ b/src/consumer/fetchManager.spec.js @@ -4,6 +4,7 @@ const createFetchManager = require('./fetchManager') const Batch = require('./batch') const { newLogger } = require('testHelpers') const waitFor = require('../utils/waitFor') +const { KafkaJSNonRetriableError } = require('../errors') describe('FetchManager', () => { let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize @@ -72,4 +73,25 @@ describe('FetchManager', () => { fetchers = fetchManager.getFetchers() expect(fetchers).toHaveLength(3) }) + + describe('when all brokers have become unavailable', () => { + it('should not rebalance and let the error bubble up', async () => { + const fetchMock = jest.fn().mockImplementation(async nodeId => { + if (!getNodeIds().includes(nodeId)) { + throw new KafkaJSNonRetriableError('Node not found') + } + + return fetch(nodeId) + }) + getNodeIds.mockImplementation(() => seq(1)) + + fetchManager = createTestFetchManager({ concurrency: 1, fetch: fetchMock }) + const fetchManagerPromise = fetchManager.start() + + expect(fetchManager.getFetchers()).toHaveLength(1) + + getNodeIds.mockImplementation(() => seq(0)) + await expect(fetchManagerPromise).rejects.toThrow('Node not found') + }) + }) }) diff --git a/src/producer/eosManager/index.js b/src/producer/eosManager/index.js index e9d77d5c6..3010db785 100644 --- a/src/producer/eosManager/index.js +++ b/src/producer/eosManager/index.js @@ -75,10 +75,16 @@ module.exports = ({ */ let transactionTopicPartitions = {} + /** + * Offsets have been added to the transaction + */ + let hasOffsetsAddedToTransaction = false + const stateMachine = createStateMachine({ logger }) stateMachine.on('transition', ({ to }) => { if (to === STATES.READY) { transactionTopicPartitions = {} + hasOffsetsAddedToTransaction = false } }) @@ -95,6 +101,22 @@ module.exports = ({ } } + /** + * A transaction is ongoing when offsets or partitions added to it + * + * @returns {boolean} + */ + const isOngoing = () => { + return ( + hasOffsetsAddedToTransaction || + Object.entries(transactionTopicPartitions).some(([, partitions]) => { + return Object.entries(partitions).some( + ([, isPartitionAddedToTransaction]) => isPartitionAddedToTransaction + ) + }) + ) + } + const eosManager = stateMachine.createGuarded( { /** @@ -267,6 +289,13 @@ module.exports = ({ transactionalGuard() stateMachine.transitionTo(STATES.COMMITTING) + if (!isOngoing()) { + logger.debug('No partitions or offsets registered, not sending EndTxn') + + stateMachine.transitionTo(STATES.READY) + return + } + const broker = await findTransactionCoordinator() await broker.endTxn({ producerId, @@ -285,6 +314,13 @@ module.exports = ({ transactionalGuard() stateMachine.transitionTo(STATES.ABORTING) + if (!isOngoing()) { + logger.debug('No partitions or offsets registered, not sending EndTxn') + + stateMachine.transitionTo(STATES.READY) + return + } + const broker = await findTransactionCoordinator() await broker.endTxn({ producerId, @@ -353,6 +389,8 @@ module.exports = ({ groupId: consumerGroupId, }) + hasOffsetsAddedToTransaction = true + let groupCoordinator = await cluster.findGroupCoordinator({ groupId: consumerGroupId, coordinatorType: COORDINATOR_TYPES.GROUP, diff --git a/src/producer/eosManager/index.spec.js b/src/producer/eosManager/index.spec.js index 823e5daf5..a6cf6a813 100644 --- a/src/producer/eosManager/index.spec.js +++ b/src/producer/eosManager/index.spec.js @@ -189,6 +189,8 @@ describe('Producer > eosManager', () => { }) test('committing a transaction', async () => { + const consumerGroupId = 'consumer-group-id' + const topics = [{ topic: 'test-topic-1', partitions: [{ partition: 0 }] }] const eosManager = createEosManager({ logger: newLogger(), cluster, @@ -211,6 +213,26 @@ describe('Producer > eosManager', () => { await eosManager.beginTransaction() cluster.findGroupCoordinator.mockClear() + await eosManager.addPartitionsToTransaction(topics) + await eosManager.commit() + + expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ + groupId: transactionalId, + coordinatorType: COORDINATOR_TYPES.TRANSACTION, + }) + expect(broker.endTxn).toHaveBeenCalledWith({ + producerId, + producerEpoch, + transactionalId, + transactionResult: true, + }) + + await eosManager.beginTransaction() + + cluster.findGroupCoordinator.mockClear() + broker.endTxn.mockClear() + + await eosManager.sendOffsets({ consumerGroupId, topics }) await eosManager.commit() expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ @@ -226,6 +248,8 @@ describe('Producer > eosManager', () => { }) test('aborting a transaction', async () => { + const consumerGroupId = 'consumer-group-id' + const topics = [{ topic: 'test-topic-1', partitions: [{ partition: 0 }] }] const eosManager = createEosManager({ logger: newLogger(), cluster, @@ -248,6 +272,26 @@ describe('Producer > eosManager', () => { await eosManager.beginTransaction() cluster.findGroupCoordinator.mockClear() + await eosManager.addPartitionsToTransaction(topics) + await eosManager.abort() + + expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ + groupId: transactionalId, + coordinatorType: COORDINATOR_TYPES.TRANSACTION, + }) + expect(broker.endTxn).toHaveBeenCalledWith({ + producerId, + producerEpoch, + transactionalId, + transactionResult: false, + }) + + await eosManager.beginTransaction() + + cluster.findGroupCoordinator.mockClear() + broker.endTxn.mockClear() + + await eosManager.sendOffsets({ consumerGroupId, topics }) await eosManager.abort() expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({ @@ -308,6 +352,40 @@ describe('Producer > eosManager', () => { topics, }) }) + + test('aborting transaction when no operation have been made should not send EndTxn', async () => { + const eosManager = createEosManager({ + logger: newLogger(), + cluster, + transactionTimeout: 30000, + transactional: true, + transactionalId, + }) + + await eosManager.initProducerId() + await eosManager.beginTransaction() + + await expect(eosManager.abort()).resolves.not.toThrow() + expect(eosManager.isInTransaction()).toEqual(false) + expect(broker.endTxn).not.toBeCalled() + }) + + test('commiting transaction when no operation have been made should not send EndTxn', async () => { + const eosManager = createEosManager({ + logger: newLogger(), + cluster, + transactionTimeout: 30000, + transactional: true, + transactionalId, + }) + + await eosManager.initProducerId() + await eosManager.beginTransaction() + + await expect(eosManager.commit()).resolves.not.toThrow() + expect(eosManager.isInTransaction()).toEqual(false) + expect(broker.endTxn).not.toBeCalled() + }) }) describe('if transactional=false', () => { diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 6b5428e44..2f0b60a99 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -972,5 +972,147 @@ describe('Producer', () => { ]) }) }) + + describe('without operations', () => { + testIfKafkaAtLeast_0_11('does not throw an error when aborting transaction', async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.abort()).toResolve() + }) + + testIfKafkaAtLeast_0_11('does not throw an error when commiting transaction', async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.commit()).toResolve() + }) + + testIfKafkaAtLeast_0_11( + 'allows createing transaction when the previous was aborted without any operations made in it', + async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.abort()).toResolve() + await expect(producer.transaction()).toResolve() + } + ) + + testIfKafkaAtLeast_0_11( + 'allows createing transaction when the previous was commited without any operations made in it', + async () => { + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const transaction = await producer.transaction() + + await expect(transaction.commit()).toResolve() + await expect(producer.transaction()).toResolve() + } + ) + + testIfKafkaAtLeast_0_11( + "transaction that is created after a transaction that hasn't made any operations should work", + async () => { + const partition = 0 + const retry = createRetrier({ retries: 5 }) + const cluster = createCluster({ + createPartitioner: createModPartitioner, + }) + + await createTopic({ topic: topicName }) + + producer = createProducer({ + cluster, + logger: newLogger(), + transactionalId, + }) + await producer.connect() + + const invalidTransaction = await producer.transaction() + await invalidTransaction.abort() + + const transaction = await producer.transaction() + await transaction.send({ + topic: topicName, + messages: [ + { + value: 'value', + partition, + }, + { + value: 'value', + partition, + }, + ], + }) + + await transaction.commit() + + await retry(async () => { + const [topicOffset] = await cluster.fetchTopicsOffset([ + { topic: topicName, partitions: [{ partition }] }, + ]) + + expect(topicOffset).toEqual({ + topic: topicName, + partitions: expect.arrayContaining([ + { + partition, + offset: '3', + }, + ]), + }) + }) + } + ) + }) }) })