Skip to content

Commit

Permalink
Merge branch 'master' into feature/KIP-580
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored Sep 27, 2023
2 parents 1bd95c8 + 73cf87c commit 8e65bf5
Show file tree
Hide file tree
Showing 21 changed files with 405 additions and 118 deletions.
10 changes: 2 additions & 8 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ name: 'librdkafka build and release artifact pipeline'
agent:
machine:
type: s1-prod-macos-arm64
execution_time_limit:
hours: 3
global_job_config:
prologue:
commands:
Expand Down Expand Up @@ -239,11 +241,8 @@ blocks:
value: UCRT64
prologue:
commands:
- cache restore msys2-x64-${Env:ARTIFACT_KEY}
# Set up msys2
- "& .\\win32\\setup-msys2.ps1"
- cache delete msys2-x64-${Env:ARTIFACT_KEY}
- cache store msys2-x64-${Env:ARTIFACT_KEY} c:/msys64
epilogue:
commands:
- if ($env:SEMAPHORE_GIT_TAG_NAME -ne "") { artifact push workflow artifacts/ --destination artifacts/$Env:ARTIFACT_KEY/ }
Expand Down Expand Up @@ -277,8 +276,6 @@ blocks:
# install vcpkg in the parent directory.
- pwd
- cd ..
# Restore vcpkg caches, if any.
- cache restore vcpkg-archives-$Env:ARTIFACT_KEY
# Setup vcpkg
- "& .\\librdkafka\\win32\\setup-vcpkg.ps1"
- cd librdkafka
Expand All @@ -287,11 +284,8 @@ blocks:
- ..\vcpkg\vcpkg --feature-flags=versions install --triplet $Env:triplet
- cd ..
- pwd
# Store vcpkg caches
- ls vcpkg/
- echo $Env:VCPKG_ROOT
- cache delete vcpkg-archives-$Env:ARTIFACT_KEY
- cache store vcpkg-archives-$Env:ARTIFACT_KEY C:/Users/semaphore/AppData/Local/vcpkg/archives
- pwd
- cd librdkafka
epilogue:
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# librdkafka v2.2.1

librdkafka v2.2.1 is a maintenance release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
* Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
* Add missing destroy that leads to leaking partition structure memory when there
are partition leader changes and a stale leader epoch is received (#4429).
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).



# librdkafka v2.2.0

librdkafka v2.2.0 is a feature release:
Expand Down
19 changes: 12 additions & 7 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-84 - SASL SCRAM | 0.10.2.0 | Supported |
| KIP-85 - SASL config properties | 0.10.2.0 | Supported |
| KIP-86 - Configurable SASL callbacks | 2.0.0 | Not supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-91 - Intuitive timeouts in Producer | 2.1.0 | Supported |
| KIP-92 - Per-partition lag metrics in Consumer | 0.10.2.0 | Supported |
| KIP-97 - Backwards compatibility with older brokers | 0.10.2.0 | Supported |
Expand All @@ -1901,7 +1901,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported |
| KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported |
| KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported |
| KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported |
| KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) |
Expand Down Expand Up @@ -1939,6 +1939,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-496 - AdminAPI: delete offsets | 2.4.0 | Supported |
| KIP-511 - Collect Client's Name and Version | 2.4.0 | Supported |
| KIP-514 - Bounded flush() | 2.4.0 | Supported |
| KIP-516 - Topic Identifiers | 2.8.0 (WIP) | Partially Supported |
| KIP-517 - Consumer poll() metrics | 2.4.0 | Not supported |
| KIP-518 - Allow listing consumer groups per state | 2.6.0 | Supported |
| KIP-519 - Make SSL engine configurable | 2.6.0 | Supported |
Expand Down Expand Up @@ -1966,17 +1967,17 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf

### Supported protocol versions

"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.4.0, while
"Kafka max" is the maximum ApiVersion supported in Apache Kafka 3.5.0, while
"librdkafka max" is the maximum ApiVersion supported in the latest
release of librdkafka.


| ApiKey | Request name | Kafka max | librdkafka max |
| ------- | ------------------------------| ----------- | ----------------------- |
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 13 | 11 |
| 2 | ListOffsets | 7 | 2 |
| 3 | Metadata | 12 | 9 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 5 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand All @@ -1992,10 +1993,14 @@ release of librdkafka.
| 20 | DeleteTopics | 6 | 1 |
| 21 | DeleteRecords | 2 | 1 |
| 22 | InitProducerId | 4 | 4 |
| 24 | AddPartitionsToTxn | 3 | 0 |
| 23 | OffsetForLeaderEpoch | 4 | 2 |
| 24 | AddPartitionsToTxn | 4 | 0 |
| 25 | AddOffsetsToTxn | 3 | 0 |
| 26 | EndTxn | 3 | 1 |
| 28 | TxnOffsetCommit | 3 | 3 |
| 29 | DescribeAcls | 3 | 1 |
| 30 | CreateAcls | 3 | 1 |
| 31 | DeleteAcls | 3 | 1 |
| 32 | DescribeConfigs | 4 | 1 |
| 33 | AlterConfigs | 2 | 2 |
| 36 | SaslAuthenticate | 2 | 1 |
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdbase64.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
Expand Down
127 changes: 127 additions & 0 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdbase64.h"

#if WITH_SSL
#include <openssl/ssl.h>
#endif

/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out. out->ptr will be NULL in case of some issue
* with the conversion or the conversion is not supported.
*
* @remark out->ptr must be freed after use.
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t max_len;

/* OpenSSL takes an |int| argument so the input cannot exceed that. */
if (in->size > INT_MAX) {
out->ptr = NULL;
return;
}

max_len = (((in->size + 2) / 3) * 4) + 1;
out->ptr = rd_malloc(max_len);

out->size = EVP_EncodeBlock((unsigned char *)out->ptr,
(unsigned char *)in->ptr, (int)in->size);

rd_assert(out->size < max_len);
out->ptr[out->size] = 0;
#else
out->ptr = NULL;
#endif
}


/**
* @brief Base64 encode binary input \p in.
* @returns a newly allocated, base64-encoded string or NULL in case of some
* issue with the conversion or the conversion is not supported.
*
* @remark Returned string must be freed after use.
*/
char *rd_base64_encode_str(const rd_chariov_t *in) {
rd_chariov_t out;
rd_base64_encode(in, &out);
return out.ptr;
}


/**
* @brief Base64 decode input string \p in. Ignores leading and trailing
* whitespace.
* @returns * 0 on successes in which case a newly allocated binary string is
* set in \p out (and size).
* * -1 on invalid Base64.
* * -2 on conversion not supported.
*/
int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t ret_len;

/* OpenSSL takes an |int| argument, so |in->size| must not exceed
* that. */
if (in->size % 4 != 0 || in->size > INT_MAX) {
return -1;
}

ret_len = ((in->size / 4) * 3);
out->ptr = rd_malloc(ret_len + 1);

if (EVP_DecodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr,
(int)in->size) == -1) {
rd_free(out->ptr);
out->ptr = NULL;
return -1;
}

/* EVP_DecodeBlock will pad the output with trailing NULs and count
* them in the return value. */
if (in->size > 1 && in->ptr[in->size - 1] == '=') {
if (in->size > 2 && in->ptr[in->size - 2] == '=') {
ret_len -= 2;
} else {
ret_len -= 1;
}
}

out->ptr[ret_len] = 0;
out->size = ret_len;

return 0;
#else
return -2;
#endif
}
41 changes: 41 additions & 0 deletions src/rdbase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef _RDBASE64_H_
#define _RDBASE64_H_

#include "rd.h"

void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);

char *rd_base64_encode_str(const rd_chariov_t *in);

int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out);

#endif /* _RDBASE64_H_ */
18 changes: 17 additions & 1 deletion src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,6 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) {
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}


/**
* @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
*
Expand Down Expand Up @@ -1428,4 +1427,21 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
rd_kafka_make_req_cb_t *make_cb,
void *make_opaque,
void (*free_make_opaque_cb)(void *make_opaque));


#define rd_kafka_buf_read_uuid(rkbuf, uuid) \
do { \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->most_significant_bits)); \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->least_significant_bits)); \
(uuid)->base64str[0] = '\0'; \
} while (0)

static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
rd_kafka_uuid_t *uuid) {
rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits);
}

#endif /* _RDKAFKA_BUF_H_ */
10 changes: 7 additions & 3 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) {
rd_list_destroy(&rkcg->rkcg_toppars);
rd_list_destroy(rkcg->rkcg_subscribed_topics);
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
rd_free(rkcg);
Expand Down Expand Up @@ -1917,7 +1918,9 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
"Unsupported assignment strategy \"%s\"",
protocol_name);
if (rkcg->rkcg_assignor) {
if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor
->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor
->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
Expand Down Expand Up @@ -1955,7 +1958,8 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
}

if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) {
if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
if (rkcg->rkcg_assignor->rkas_destroy_state_cb &&
rkcg->rkcg_assignor_state)
rkcg->rkcg_assignor->rkas_destroy_state_cb(
rkcg->rkcg_assignor_state);
rkcg->rkcg_assignor_state = NULL;
Expand Down
Loading

0 comments on commit 8e65bf5

Please sign in to comment.