Skip to content

Commit

Permalink
[native pos] Report estimated size and row count of broadcast data
Browse files Browse the repository at this point in the history
Report an exact number of rows broadcasted along with an estimate of broadcast
size in bytes. The estimate is based on serialized data size and roughly
matches the size of the broadcast file. This size may be different from the
size of the hash table that will be built from the broadcast data on the
reducer.

The estimate of the broadcast size will be used to check whether total size of
broadcast data is within a configured limit. If not, the map stage will be
retried with broadcast disabled.
  • Loading branch information
singcha committed Oct 11, 2023
1 parent 627da72 commit 72763e6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ BroadcastFileWriter::BroadcastFileWriter(
: fileSystem_(std::move(fileSystem)),
filename_(filename),
numRows_(0),
maxSerializedSize_(0),
pool_(pool),
serde_(std::make_unique<serializer::presto::PrestoVectorSerde>()),
inputType_(inputType) {}
Expand All @@ -83,21 +84,32 @@ void BroadcastFileWriter::collect(const RowVectorPtr& input) {

void BroadcastFileWriter::noMoreData() {}

// TODO: Add file stats - size, checksum, number of rows.
RowVectorPtr BroadcastFileWriter::fileStats() {
// No rows written.
if (numRows_ == 0) {
return nullptr;
}

auto data = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 1, pool_);
data->set(0, StringView(filename_));
auto fileNameVector =
BaseVector::create<FlatVector<StringView>>(VARCHAR(), 1, pool_);
fileNameVector->set(0, StringView(filename_));
auto maxSerializedSizeVector =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), 1, pool_);
maxSerializedSizeVector->set(0, maxSerializedSize_);
auto numRowsVector =
BaseVector::create<FlatVector<int64_t>>(BIGINT(), 1, pool_);
numRowsVector->set(0, numRows_);

return std::make_shared<RowVector>(
pool_,
ROW({"filepath"}, {VARCHAR()}),
ROW({"filepath", "maxserializedsize", "numrows"},
{VARCHAR(), BIGINT(), BIGINT()}),
nullptr,
1,
std::vector<VectorPtr>({std::move(data)}));
std::vector<VectorPtr>(
{std::move(fileNameVector),
std::move(maxSerializedSizeVector),
std::move(numRowsVector)}));
}

void BroadcastFileWriter::initializeWriteFile() {
Expand All @@ -121,6 +133,7 @@ void BroadcastFileWriter::write(const RowVectorPtr& rowVector) {
auto serializer = serde_->createSerializer(inputType_, numRows, arena.get());

serializer->append(rowVector, folly::Range(&allRows, 1));
maxSerializedSize_ += serializer->maxSerializedSize();
IOBufOutputStream out(*pool_);
serializer->flush(&out);
auto iobuf = out.getIOBuf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class BroadcastFileWriter {
std::shared_ptr<velox::filesystems::FileSystem> fileSystem_;
std::string filename_;
int64_t numRows_;
int64_t maxSerializedSize_;
velox::memory::MemoryPool* pool_;
std::unique_ptr<velox::VectorSerde> serde_;
const velox::RowTypePtr& inputType_;
Expand Down

0 comments on commit 72763e6

Please sign in to comment.