Skip to content

Commit

Permalink
[native pos] Update ExchangeSources to use the new request API
Browse files Browse the repository at this point in the history
Update UnsafeRowExchangeSource and BroadcastExchangeSource to use the new
ExchangeSource::request API.
  • Loading branch information
mbasmanova committed Aug 28, 2023
1 parent 5207f26 commit fc399fc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ std::optional<std::string> getBroadcastInfo(folly::Uri& uri) {
}
} // namespace

void BroadcastExchangeSource::request() {
ContinueFuture BroadcastExchangeSource::request(uint32_t maxBytes) {
std::vector<velox::ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (atEnd_) {
return;
return folly::makeFuture<folly::Unit>(folly::Unit());
}

if (!reader_->hasNext()) {
Expand All @@ -54,6 +54,8 @@ void BroadcastExchangeSource::request() {
for (auto& promise : promises) {
promise.setValue();
}

return folly::makeFuture<folly::Unit>(folly::Unit());
}

folly::F14FastMap<std::string, int64_t> BroadcastExchangeSource::stats() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BroadcastExchangeSource : public velox::exec::ExchangeSource {
return !atEnd_;
}

void request() override;
ContinueFuture request(uint32_t maxBytes) override;

void close() override {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ namespace facebook::presto::operators {
VELOX_FAIL("ShuffleReader::{} failed: {}", methodName, e.what()); \
}

void UnsafeRowExchangeSource::request() {
velox::ContinueFuture UnsafeRowExchangeSource::request(uint32_t maxBytes) {
std::vector<velox::ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (atEnd_) {
return;
return folly::makeFuture<folly::Unit>(folly::Unit());
}

bool hasNext;
Expand Down Expand Up @@ -63,6 +63,8 @@ void UnsafeRowExchangeSource::request() {
for (auto& promise : promises) {
promise.setValue();
}

return folly::makeFuture<folly::Unit>(folly::Unit());
}

folly::F14FastMap<std::string, int64_t> UnsafeRowExchangeSource::stats() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UnsafeRowExchangeSource : public velox::exec::ExchangeSource {
return !atEnd_;
}

void request() override;
velox::ContinueFuture request(uint32_t maxBytes) override;

void close() override {}

Expand Down

0 comments on commit fc399fc

Please sign in to comment.