Skip to content

Commit

Permalink
Merge branch 'main' into branch-3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Mar 11, 2024
2 parents 3b574c7 + ee1d7b9 commit 916af95
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 55 deletions.
26 changes: 23 additions & 3 deletions .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,20 @@ concurrency:

jobs:

formatting-check:
name: Formatting Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run clang-format style check for C/C++/Protobuf programs.
uses: jidicula/[email protected]
with:
clang-format-version: '11'
exclude-regex: '.*\.(proto|hpp)'

wireshark-dissector-build:
name: Build the Wireshark dissector
needs: formatting-check
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
Expand Down Expand Up @@ -61,6 +73,7 @@ jobs:
unit-tests:
name: Run unit tests
needs: formatting-check
runs-on: ubuntu-22.04
timeout-minutes: 120

Expand All @@ -81,8 +94,8 @@ jobs:
./vcpkg/vcpkg format-manifest vcpkg.json
if [[ $(git diff | wc -l) -gt 0 ]]; then
echo "Please run `./vcpkg/vcpkg format-manifest vcpkg.json` to reformat vcpkg.json"
exit 1
fi
make check-format
- name: Build tests
run: |
Expand All @@ -103,6 +116,7 @@ jobs:
cpp20-build:
name: Build with the C++20 standard
needs: formatting-check
runs-on: ubuntu-22.04
timeout-minutes: 60

Expand Down Expand Up @@ -288,8 +302,8 @@ jobs:
cpp-build-macos:
timeout-minutes: 120
name: Build CPP Client on macOS
needs: formatting-check
runs-on: macos-12
needs: unit-tests
steps:
- name: checkout
uses: actions/checkout@v3
Expand All @@ -306,6 +320,12 @@ jobs:
run: |
cmake --build ./build-macos --parallel --config Release
- name: Build with C++20
shell: bash
run: |
cmake -B build-macos-cpp20 -DCMAKE_CXX_STANDARD=20
cmake --build build-macos-cpp20 -j8
cpp-build-macos-static:
timeout-minutes: 120
name: Build CPP Client on macOS with static dependencies
Expand All @@ -332,7 +352,7 @@ jobs:
check-completion:
name: Check Completion
runs-on: ubuntu-latest
needs: [wireshark-dissector-build, unit-tests, cpp20-build, cpp-build-windows, package, cpp-build-macos]
needs: [formatting-check, wireshark-dissector-build, unit-tests, cpp20-build, cpp-build-windows, package, cpp-build-macos]

steps:
- run: true
2 changes: 1 addition & 1 deletion lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ void ClientConnection::close(Result result, bool detach) {
}
// Remove the connection from the pool before completing any promise
if (detach) {
pool_.remove(logicalAddress_ + "-" + std::to_string(poolIndex_), this);
pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
}

auto self = shared_from_this();
Expand Down
15 changes: 11 additions & 4 deletions lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ bool ConnectionPool::close() {
return true;
}

static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress,
size_t keySuffix) {
std::stringstream ss;
ss << logicalAddress << '-' << physicalAddress << '-' << keySuffix;
return ss.str();
}

Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress,
size_t keySuffix) {
Expand All @@ -77,9 +84,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const

std::unique_lock<std::recursive_mutex> lock(mutex_);

std::stringstream ss;
ss << logicalAddress << '-' << keySuffix;
const std::string key = ss.str();
auto key = getKey(logicalAddress, physicalAddress, keySuffix);

PoolMap::iterator cnxIt = pool_.find(key);
if (cnxIt != pool_.end()) {
Expand Down Expand Up @@ -127,7 +132,9 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
return future;
}

void ConnectionPool::remove(const std::string& key, ClientConnection* value) {
void ConnectionPool::remove(const std::string& logicalAddress, const std::string& physicalAddress,
size_t keySuffix, ClientConnection* value) {
auto key = getKey(logicalAddress, physicalAddress, keySuffix);
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto it = pool_.find(key);
if (it != pool_.end() && it->second.get() == value) {
Expand Down
3 changes: 2 additions & 1 deletion lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class PULSAR_PUBLIC ConnectionPool {
*/
bool close();

void remove(const std::string& key, ClientConnection* value);
void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix,
ClientConnection* value);

/**
* Get a connection from the pool.
Expand Down
87 changes: 50 additions & 37 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,14 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
// sending the subscribe request.
cnx->registerConsumer(consumerId_, get_shared_this_ptr());

if (duringSeek_) {
if (duringSeek()) {
ackGroupingTrackerPtr_->flushAndClean();
}

Lock lockForMessageId(mutexForMessageId_);
// Update startMessageId so that we can discard messages after delivery restarts
const auto startMessageId = clearReceiveQueue();
clearReceiveQueue();
const auto subscribeMessageId =
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none;
startMessageId_ = startMessageId;
(subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId_.get() : boost::none;
lockForMessageId.unlock();

unAckedMessageTrackerPtr_->clear();
Expand Down Expand Up @@ -1048,14 +1046,21 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that
* was
* not seen by the application
* `startMessageId_` is updated so that we can discard messages after delivery restarts.
*/
boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
bool expectedDuringSeek = true;
if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
return seekMessageId_.get();
void ConsumerImpl::clearReceiveQueue() {
if (duringSeek()) {
startMessageId_ = seekMessageId_.get();
SeekStatus expected = SeekStatus::COMPLETED;
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
auto seekCallback = seekCallback_.release();
executor_->postWork([seekCallback] { seekCallback(ResultOk); });
}
return;
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
return startMessageId_.get();
return;
}

Message nextMessageInQueue;
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
// There was at least one message pending in the queue
Expand All @@ -1071,16 +1076,12 @@ boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId() - 1)
.build();
return previousMessageId;
startMessageId_ = previousMessageId;
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just after the last one that has been
// dequeued
// in the past
return lastDequedMessageId_;
} else {
// No message was received or dequeued by this consumer. Next message would still be the
// startMessageId
return startMessageId_.get();
startMessageId_ = lastDequedMessageId_;
}
}

Expand Down Expand Up @@ -1500,18 +1501,15 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {

bool ConsumerImpl::isReadCompacted() { return readCompacted_; }

inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const MessageId& messageId) {
return lastMessageIdInBroker > messageId && lastMessageIdInBroker.entryId() != -1;
}

void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
const auto startMessageId = startMessageId_.get();
Lock lock(mutexForMessageId_);
const auto messageId =
(lastDequedMessageId_ == MessageId::earliest()) ? startMessageId.value() : lastDequedMessageId_;

if (messageId == MessageId::latest()) {
lock.unlock();
bool compareMarkDeletePosition;
{
std::lock_guard<std::mutex> lock{mutexForMessageId_};
compareMarkDeletePosition =
(lastDequedMessageId_ == MessageId::earliest()) &&
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
}
if (compareMarkDeletePosition) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result != ResultOk) {
Expand Down Expand Up @@ -1543,16 +1541,15 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
}
});
} else {
if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
lock.unlock();
if (hasMoreMessages()) {
callback(ResultOk, true);
return;
}
lock.unlock();

getLastMessageIdAsync([callback, messageId](Result result, const GetLastMessageIdResponse& response) {
callback(result, (result == ResultOk) && hasMoreMessages(response.getLastMessageId(), messageId));
});
auto self = get_shared_this_ptr();
getLastMessageIdAsync(
[this, self, callback](Result result, const GetLastMessageIdResponse& response) {
callback(result, (result == ResultOk) && hasMoreMessages());
});
}
}

Expand Down Expand Up @@ -1656,9 +1653,18 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
return;
}

auto expected = SeekStatus::NOT_STARTED;
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << timestamp << " when the status is "
<< static_cast<int>(expected));
callback(ResultNotAllowedError);
return;
}

const auto originalSeekMessageId = seekMessageId_.get();
seekMessageId_ = seekId;
duringSeek_ = true;
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = std::move(callback);
if (timestamp > 0) {
LOG_INFO(getName() << " Seeking subscription to " << timestamp);
} else {
Expand All @@ -1682,12 +1688,19 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
Lock lock(mutexForMessageId_);
lastDequedMessageId_ = MessageId::earliest();
lock.unlock();
if (getCnx().expired()) {
// It's during reconnection, complete the seek future after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {
startMessageId_ = seekMessageId_.get();
seekCallback_.release()(result);
}
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
seekMessageId_ = originalSeekMessageId;
duringSeek_ = false;
seekStatus_ = SeekStatus::NOT_STARTED;
seekCallback_.release()(result);
}
callback(result);
});
}

Expand Down
31 changes: 29 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC";
const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";

enum class SeekStatus : std::uint8_t
{
NOT_STARTED,
IN_PROGRESS,
COMPLETED
};

class ConsumerImpl : public ConsumerImplBase {
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
Expand Down Expand Up @@ -193,7 +200,7 @@ class ConsumerImpl : public ConsumerImplBase {
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback);

boost::optional<MessageId> clearReceiveQueue();
void clearReceiveQueue();
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
ResultCallback callback);
void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb);
Expand Down Expand Up @@ -239,10 +246,13 @@ class ConsumerImpl : public ConsumerImplBase {
MessageId lastDequedMessageId_{MessageId::earliest()};
MessageId lastMessageIdInBroker_{MessageId::earliest()};

std::atomic_bool duringSeek_{false};
std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};

bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }

class ChunkedMessageCtx {
public:
ChunkedMessageCtx() : totalChunks_(0) {}
Expand Down Expand Up @@ -332,6 +342,23 @@ class ConsumerImpl : public ConsumerImplBase {
const proto::MessageIdData& messageIdData,
const ClientConnectionPtr& cnx, MessageId& messageId);

bool hasMoreMessages() const {
std::lock_guard<std::mutex> lock{mutexForMessageId_};
if (lastMessageIdInBroker_.entryId() == -1L) {
return false;
}

const auto inclusive = config_.isStartMessageIdInclusive();
if (lastDequedMessageId_ == MessageId::earliest()) {
// If startMessageId_ is none, use latest so that this method will return false
const auto startMessageId = startMessageId_.get().value_or(MessageId::latest());
return inclusive ? (lastMessageIdInBroker_ >= startMessageId)
: (lastMessageIdInBroker_ > startMessageId);
} else {
return lastMessageIdInBroker_ > lastDequedMessageId_;
}
}

friend class PulsarFriend;
friend class MultiTopicsConsumerImpl;

Expand Down
4 changes: 2 additions & 2 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
};
const auto state = state_.load();
if (state == Closing || state == Closed) {
callback(ResultAlreadyClosed);
callback(ResultOk);
return;
}

Expand All @@ -488,7 +488,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic() << " subscription - " << subscriptionName_);
callback(ResultAlreadyClosed);
callback(ResultOk);
return;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/NamespaceName.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ std::shared_ptr<NamespaceName> NamespaceName::getNamespaceObject() {
return std::shared_ptr<NamespaceName>(this);
}

bool NamespaceName::operator==(const NamespaceName& namespaceName) {
bool NamespaceName::operator==(const NamespaceName& namespaceName) const {
return this->namespace_.compare(namespaceName.namespace_) == 0;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/NamespaceName.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PULSAR_PUBLIC NamespaceName : public ServiceUnitId {
static std::shared_ptr<NamespaceName> get(const std::string& property, const std::string& cluster,
const std::string& namespaceName);
static std::shared_ptr<NamespaceName> get(const std::string& property, const std::string& namespaceName);
bool operator==(const NamespaceName& namespaceName);
bool operator==(const NamespaceName& namespaceName) const;
bool isV2();
std::string toString();

Expand Down
Loading

0 comments on commit 916af95

Please sign in to comment.