Skip to content

Commit

Permalink
Merge branch 'master' into fix/consumer-pause-resume-functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Nevon authored Jun 27, 2022
2 parents cebbfce + ddf4f64 commit 214427f
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 5 deletions.
61 changes: 58 additions & 3 deletions src/admin/__tests__/createAcls.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const createAdmin = require('../index')
const createProducer = require('../../producer/index')

const {
secureRandom,
Expand All @@ -13,7 +14,7 @@ const ACL_OPERATION_TYPES = require('../../protocol/aclOperationTypes')
const ACL_PERMISSION_TYPES = require('../../protocol/aclPermissionTypes')
const RESOURCE_PATTERN_TYPES = require('../../protocol/resourcePatternTypes')

const createSASLAdminClientForUser = ({ username, password }) => {
const createSASLClientForUser = createClient => ({ username, password }) => {
const saslConnectionOpts = () => {
return Object.assign(sslConnectionOpts(), {
port: 9094,
Expand All @@ -25,7 +26,7 @@ const createSASLAdminClientForUser = ({ username, password }) => {
})
}

const admin = createAdmin({
const client = createClient({
logger: newLogger(),
cluster: createCluster(
{
Expand All @@ -36,9 +37,12 @@ const createSASLAdminClientForUser = ({ username, password }) => {
),
})

return admin
return client
}

const createSASLAdminClientForUser = createSASLClientForUser(createAdmin)
const createSASLProducerClientForUser = createSASLClientForUser(createProducer)

describe('Admin', () => {
let admin

Expand Down Expand Up @@ -247,5 +251,56 @@ describe('Admin', () => {

await expect(admin.fetchTopicMetadata({ topics: [topicName] })).resolves.toBeTruthy()
})

test('can produce to allowed topic after failing to produce to not-allowed topic', async () => {
const allowedTopic = `allowed-${secureRandom()}`
const notAllowedTopic = `disallowed-${secureRandom()}`

admin = createSASLAdminClientForUser({ username: 'test', password: 'testtest' })

await admin.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [allowedTopic, notAllowedTopic].map(topic => ({ topic, numPartitions: 1 })),
})
await admin.createAcls({
acl: [
{
resourceType: ACL_RESOURCE_TYPES.TOPIC,
resourceName: notAllowedTopic,
resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL,
principal: 'User:bob',
host: '*',
operation: ACL_OPERATION_TYPES.WRITE,
permissionType: ACL_PERMISSION_TYPES.DENY,
},
{
resourceType: ACL_RESOURCE_TYPES.TOPIC,
resourceName: allowedTopic,
resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL,
principal: 'User:bob',
host: '*',
operation: ACL_OPERATION_TYPES.WRITE,
permissionType: ACL_PERMISSION_TYPES.ALLOW,
},
],
})

await admin.disconnect()
const producer = createSASLProducerClientForUser({ username: 'bob', password: 'bobbob' })
await producer.connect()

await expect(
producer.send({ topic: allowedTopic, messages: [{ value: 'hello' }] })
).resolves.not.toBeUndefined()
await expect(
producer.send({ topic: notAllowedTopic, messages: [{ value: 'whoops' }] })
).rejects.not.toBeUndefined()
await expect(
producer.send({ topic: allowedTopic, messages: [{ value: 'world' }] })
).resolves.not.toBeUndefined()

await producer.disconnect()
})
})
})
6 changes: 5 additions & 1 deletion src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ module.exports = class Cluster {
try {
await this.refreshMetadata()
} catch (e) {
if (e.type === 'INVALID_TOPIC_EXCEPTION' || e.type === 'UNKNOWN_TOPIC_OR_PARTITION') {
if (
e.type === 'INVALID_TOPIC_EXCEPTION' ||
e.type === 'UNKNOWN_TOPIC_OR_PARTITION' ||
e.type === 'TOPIC_AUTHORIZATION_FAILED'
) {
this.targetTopics = previousTopics
}

Expand Down
2 changes: 1 addition & 1 deletion src/consumer/fetchManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const createFetchManager = ({
const current = getNodeIds()
const hasChanged =
nodeIds.length !== current.length || nodeIds.some(nodeId => !current.includes(nodeId))
if (hasChanged) {
if (hasChanged && current.length !== 0) {
throw new KafkaJSFetcherRebalanceError()
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/consumer/fetchManager.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const createFetchManager = require('./fetchManager')
const Batch = require('./batch')
const { newLogger } = require('testHelpers')
const waitFor = require('../utils/waitFor')
const { KafkaJSNonRetriableError } = require('../errors')

describe('FetchManager', () => {
let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize
Expand Down Expand Up @@ -72,4 +73,25 @@ describe('FetchManager', () => {
fetchers = fetchManager.getFetchers()
expect(fetchers).toHaveLength(3)
})

describe('when all brokers have become unavailable', () => {
it('should not rebalance and let the error bubble up', async () => {
const fetchMock = jest.fn().mockImplementation(async nodeId => {
if (!getNodeIds().includes(nodeId)) {
throw new KafkaJSNonRetriableError('Node not found')
}

return fetch(nodeId)
})
getNodeIds.mockImplementation(() => seq(1))

fetchManager = createTestFetchManager({ concurrency: 1, fetch: fetchMock })
const fetchManagerPromise = fetchManager.start()

expect(fetchManager.getFetchers()).toHaveLength(1)

getNodeIds.mockImplementation(() => seq(0))
await expect(fetchManagerPromise).rejects.toThrow('Node not found')
})
})
})
38 changes: 38 additions & 0 deletions src/producer/eosManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ module.exports = ({
*/
let transactionTopicPartitions = {}

/**
* Offsets have been added to the transaction
*/
let hasOffsetsAddedToTransaction = false

const stateMachine = createStateMachine({ logger })
stateMachine.on('transition', ({ to }) => {
if (to === STATES.READY) {
transactionTopicPartitions = {}
hasOffsetsAddedToTransaction = false
}
})

Expand All @@ -95,6 +101,22 @@ module.exports = ({
}
}

/**
* A transaction is ongoing when offsets or partitions added to it
*
* @returns {boolean}
*/
const isOngoing = () => {
return (
hasOffsetsAddedToTransaction ||
Object.entries(transactionTopicPartitions).some(([, partitions]) => {
return Object.entries(partitions).some(
([, isPartitionAddedToTransaction]) => isPartitionAddedToTransaction
)
})
)
}

const eosManager = stateMachine.createGuarded(
{
/**
Expand Down Expand Up @@ -267,6 +289,13 @@ module.exports = ({
transactionalGuard()
stateMachine.transitionTo(STATES.COMMITTING)

if (!isOngoing()) {
logger.debug('No partitions or offsets registered, not sending EndTxn')

stateMachine.transitionTo(STATES.READY)
return
}

const broker = await findTransactionCoordinator()
await broker.endTxn({
producerId,
Expand All @@ -285,6 +314,13 @@ module.exports = ({
transactionalGuard()
stateMachine.transitionTo(STATES.ABORTING)

if (!isOngoing()) {
logger.debug('No partitions or offsets registered, not sending EndTxn')

stateMachine.transitionTo(STATES.READY)
return
}

const broker = await findTransactionCoordinator()
await broker.endTxn({
producerId,
Expand Down Expand Up @@ -353,6 +389,8 @@ module.exports = ({
groupId: consumerGroupId,
})

hasOffsetsAddedToTransaction = true

let groupCoordinator = await cluster.findGroupCoordinator({
groupId: consumerGroupId,
coordinatorType: COORDINATOR_TYPES.GROUP,
Expand Down
78 changes: 78 additions & 0 deletions src/producer/eosManager/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ describe('Producer > eosManager', () => {
})

test('committing a transaction', async () => {
const consumerGroupId = 'consumer-group-id'
const topics = [{ topic: 'test-topic-1', partitions: [{ partition: 0 }] }]
const eosManager = createEosManager({
logger: newLogger(),
cluster,
Expand All @@ -211,6 +213,26 @@ describe('Producer > eosManager', () => {
await eosManager.beginTransaction()

cluster.findGroupCoordinator.mockClear()
await eosManager.addPartitionsToTransaction(topics)
await eosManager.commit()

expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({
groupId: transactionalId,
coordinatorType: COORDINATOR_TYPES.TRANSACTION,
})
expect(broker.endTxn).toHaveBeenCalledWith({
producerId,
producerEpoch,
transactionalId,
transactionResult: true,
})

await eosManager.beginTransaction()

cluster.findGroupCoordinator.mockClear()
broker.endTxn.mockClear()

await eosManager.sendOffsets({ consumerGroupId, topics })
await eosManager.commit()

expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({
Expand All @@ -226,6 +248,8 @@ describe('Producer > eosManager', () => {
})

test('aborting a transaction', async () => {
const consumerGroupId = 'consumer-group-id'
const topics = [{ topic: 'test-topic-1', partitions: [{ partition: 0 }] }]
const eosManager = createEosManager({
logger: newLogger(),
cluster,
Expand All @@ -248,6 +272,26 @@ describe('Producer > eosManager', () => {
await eosManager.beginTransaction()

cluster.findGroupCoordinator.mockClear()
await eosManager.addPartitionsToTransaction(topics)
await eosManager.abort()

expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({
groupId: transactionalId,
coordinatorType: COORDINATOR_TYPES.TRANSACTION,
})
expect(broker.endTxn).toHaveBeenCalledWith({
producerId,
producerEpoch,
transactionalId,
transactionResult: false,
})

await eosManager.beginTransaction()

cluster.findGroupCoordinator.mockClear()
broker.endTxn.mockClear()

await eosManager.sendOffsets({ consumerGroupId, topics })
await eosManager.abort()

expect(cluster.findGroupCoordinator).toHaveBeenCalledWith({
Expand Down Expand Up @@ -308,6 +352,40 @@ describe('Producer > eosManager', () => {
topics,
})
})

test('aborting transaction when no operation have been made should not send EndTxn', async () => {
const eosManager = createEosManager({
logger: newLogger(),
cluster,
transactionTimeout: 30000,
transactional: true,
transactionalId,
})

await eosManager.initProducerId()
await eosManager.beginTransaction()

await expect(eosManager.abort()).resolves.not.toThrow()
expect(eosManager.isInTransaction()).toEqual(false)
expect(broker.endTxn).not.toBeCalled()
})

test('commiting transaction when no operation have been made should not send EndTxn', async () => {
const eosManager = createEosManager({
logger: newLogger(),
cluster,
transactionTimeout: 30000,
transactional: true,
transactionalId,
})

await eosManager.initProducerId()
await eosManager.beginTransaction()

await expect(eosManager.commit()).resolves.not.toThrow()
expect(eosManager.isInTransaction()).toEqual(false)
expect(broker.endTxn).not.toBeCalled()
})
})

describe('if transactional=false', () => {
Expand Down
Loading

0 comments on commit 214427f

Please sign in to comment.