Skip to content

Commit

Permalink
[native pos] Process PartitionAndSerialize in batches to reduce memor…
Browse files Browse the repository at this point in the history
…y pressure

As PartitionAndSerialize can be memory intensive, enforce
preferredOutputBatchBytes by processing input in batches.
  • Loading branch information
vermapratyush committed Oct 20, 2023
1 parent 70ecc39 commit 411ea36
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
return obj["id"].asString();
}

/// The output of this operator has 2 columns:
/// (1) partition number (INTEGER);
/// (2) serialized row (VARBINARY)
///
/// When replicateNullsAndAny is true, there is an extra boolean column that
/// indicated whether a row should be replicated to all partitions.
// The output of this operator has 2 columns:
// (1) partition number (INTEGER);
// (2) serialized row (VARBINARY)
//
// When replicateNullsAndAny is true, there is an extra boolean column that
// indicated whether a row should be replicated to all partitions.
class PartitionAndSerializeOperator : public Operator {
public:
PartitionAndSerializeOperator(
Expand Down Expand Up @@ -74,32 +74,73 @@ class PartitionAndSerializeOperator : public Operator {

void addInput(RowVectorPtr input) override {
input_ = std::move(input);
const auto numInput = input_->size();

// Reset state variables.
nextOutputRow_ = 0;
rowSizes_.clear();
rowSizes_.resize(numInput);

compactRow_ =
std::make_unique<velox::row::CompactRow>(reorderInputsIfNeeded());
calculateRowSize();

// Process partitionVector and replicateVector once, and reuse on subsequent
// batch.
prepareOutput();
auto* partitionVector = output_->childAt(0)->asFlatVector<int32_t>();
computePartitions(*partitionVector);
if (replicateNullsAndAny_) {
auto* replicateVector = output_->childAt(1)->asFlatVector<bool>();
populateReplicateFlags(*replicateVector);
}
}

RowVectorPtr getOutput() override {
if (!input_) {
return nullptr;
}

const auto numInput = input_->size();

// TODO Reuse output vector.
auto output = BaseVector::create<RowVector>(outputType_, numInput, pool());
auto partitionVector = output->childAt(0)->asFlatVector<int32_t>();
partitionVector->resize(numInput);
auto dataVector = output->childAt(1)->asFlatVector<StringView>();
dataVector->resize(numInput);

computePartitions(*partitionVector);
serializeRows(*dataVector);

vector_size_t endOutputRow;
uint32_t outputBufferSize{0};
prepareNextOutput(endOutputRow, outputBufferSize);
VELOX_CHECK_LT(nextOutputRow_, endOutputRow);

const auto batchSize = endOutputRow - nextOutputRow_;
auto dataVector = BaseVector::create<FlatVector<StringView>>(
VARBINARY(), batchSize, pool());
serializeRows(*dataVector, outputBufferSize, nextOutputRow_, endOutputRow);

// Extract slice from output_ and construct the output vector.
std::vector<VectorPtr> childrenVectors;
childrenVectors.push_back(
output_->childAt(0)->slice(nextOutputRow_, batchSize));
childrenVectors.push_back(dataVector);
RowVectorPtr outputBatch;
// Handle replicateVector based on 'replicateNullsAndAny_' as it
// is optional.
if (replicateNullsAndAny_) {
auto replicateVector = output->childAt(2)->asFlatVector<bool>();
populateReplicateFlags(*replicateVector);
childrenVectors.push_back(
output_->childAt(1)->slice(nextOutputRow_, batchSize));
outputBatch = std::make_shared<RowVector>(
pool(),
outputType_,
nullptr /*nulls*/,
batchSize,
std::move(childrenVectors));
} else {
outputBatch = std::make_shared<RowVector>(
pool(),
ROW({INTEGER(), VARBINARY()}),
nullptr /*nulls*/,
batchSize,
std::move(childrenVectors));
}

input_.reset();
return output;
nextOutputRow_ = endOutputRow;
if (nextOutputRow_ == input_->size()) {
input_ = nullptr;
}
return outputBatch;
}

BlockingReason isBlocked(ContinueFuture* future) override {
Expand All @@ -111,11 +152,63 @@ class PartitionAndSerializeOperator : public Operator {
}

private:
void prepareOutput() {
const auto size = input_->size();
// Try to re-use memory for the output vectors that contain partitionVector
// and replicateVector.
if (output_) {
VectorPtr output = std::move(output_);
BaseVector::prepareForReuse(output, size);
output_ = std::static_pointer_cast<RowVector>(output);
} else {
output_ = BaseVector::create<RowVector>(
ROW({INTEGER(), BOOLEAN()}), size, pool());
}
}

// Invoked to calculate how many rows to output: ['nextOutputRow_',
// 'endOutputRow_') and the corresponding output buffer size returns in
// 'outputBufferSize'.
size_t prepareNextOutput(
vector_size_t& endOutputRow,
uint32_t& outputBufferSize) {
const auto& queryConfig = operatorCtx_->driverCtx()->queryConfig();
const auto preferredOutputBytes = queryConfig.preferredOutputBatchBytes();
const auto preferredOutputRows = queryConfig.preferredOutputBatchRows();
endOutputRow = nextOutputRow_;

VELOX_DCHECK(!rowSizes_.empty(), "rowSizes_ can not be empty");
do {
outputBufferSize += rowSizes_[endOutputRow++];
} while (endOutputRow < input_->size() &&
outputBufferSize < preferredOutputBytes &&
(endOutputRow - nextOutputRow_) < preferredOutputRows);

return outputBufferSize;
}

// Calculates the size of each row. This is done once per input and reused for
// each batch.
void calculateRowSize() {
if (auto fixedRowSize =
compactRow_->fixedRowSize(asRowType(serializedRowType_))) {
std::fill(rowSizes_.begin(), rowSizes_.end(), fixedRowSize.value());
} else {
const auto numInput = input_->size();
for (auto i = 0; i < numInput; ++i) {
const size_t rowSize = compactRow_->rowSize(i);
rowSizes_[i] = rowSize;
}
}
}

// Populate the replicate flags for each row. This is done once per input and
// reused for each batch.
void populateReplicateFlags(FlatVector<bool>& replicateVector) {
const auto numInput = input_->size();
replicateVector.resize(numInput);
auto* rawValues = replicateVector.mutableRawValues<uint64_t>();
memset(rawValues, 0, bits::nbytes(numInput));
auto* rawReplicatedValues = replicateVector.mutableRawValues<uint64_t>();
memset(rawReplicatedValues, 0, bits::nbytes(numInput));

decodedVectors_.resize(keyChannels_.size());

Expand All @@ -124,22 +217,26 @@ class PartitionAndSerializeOperator : public Operator {
if (keyVector->mayHaveNulls()) {
decodedVectors_[partitionKey].decode(*keyVector);
if (auto* rawNulls = decodedVectors_[partitionKey].nulls()) {
bits::orWithNegatedBits(rawValues, rawNulls, 0, numInput);
bits::orWithNegatedBits(rawReplicatedValues, rawNulls, 0, numInput);
}
}
}

if (!replicatedAny_) {
if (bits::countBits(rawValues, 0, numInput) == 0) {
if (bits::countBits(rawReplicatedValues, 0, numInput) == 0) {
replicateVector.set(0, true);
}
replicatedAny_ = true;
}
}

// Computes the partition id for each row. This is done once per input and
// reused for each batch.
void computePartitions(FlatVector<int32_t>& partitionsVector) {
auto numInput = input_->size();
partitions_.resize(numInput);
partitionsVector.resize(numInput);

if (numPartitions_ == 1) {
std::fill(partitions_.begin(), partitions_.end(), 0);
} else {
Expand Down Expand Up @@ -170,53 +267,43 @@ class PartitionAndSerializeOperator : public Operator {
}

// The logic of this method is logically identical with
// UnsafeRowVectorSerializer::append() and UnsafeRowVectorSerializer::flush().
// Rewriting of the serialization logic here to avoid additional copies so
// that contents are directly written into passed in vector.
void serializeRows(FlatVector<StringView>& dataVector) {
const auto numInput = input_->size();

// Compute row sizes.
rowSizes_.resize(numInput);

velox::row::CompactRow compactRow(reorderInputsIfNeeded());

size_t totalSize = 0;
if (auto fixedRowSize =
compactRow.fixedRowSize(asRowType(serializedRowType_))) {
totalSize += fixedRowSize.value() * numInput;
std::fill(rowSizes_.begin(), rowSizes_.end(), fixedRowSize.value());
} else {
for (auto i = 0; i < numInput; ++i) {
const size_t rowSize = compactRow.rowSize(i);
rowSizes_[i] = rowSize;
totalSize += rowSize;
}
}

// UnsafeRowVectorSerializer::append() and
// UnsafeRowVectorSerializer::flush(). Rewriting of the serialization logic
// here to avoid additional copies so that contents are directly written into
// passed in vector.
// Since serialization can be memory intensive, we process only one batch at
// a time.
void serializeRows(
FlatVector<StringView>& dataVector,
size_t outputBufferSize,
vector_size_t from,
vector_size_t to) {
vector_size_t batchSize = to - from;
// Allocate memory.
auto buffer = dataVector.getBufferWithSpace(totalSize);
auto buffer = dataVector.getBufferWithSpace(outputBufferSize);

// getBufferWithSpace() may return a buffer that already has content, so we
// only use the space after that.
auto rawBuffer = buffer->asMutable<char>() + buffer->size();
buffer->setSize(buffer->size() + totalSize);
memset(rawBuffer, 0, totalSize);
buffer->setSize(buffer->size() + outputBufferSize);
memset(rawBuffer, 0, outputBufferSize);

// Serialize rows.
size_t offset = 0;
for (auto i = 0; i < numInput; ++i) {
for (auto i = 0; i < batchSize; ++i) {
// Write row data.
auto size = compactRow.serialize(i, rawBuffer + offset);
VELOX_DCHECK_EQ(size, rowSizes_[i]);

dataVector.setNoCopy(i, StringView(rawBuffer + offset, rowSizes_[i]));
auto size = compactRow_->serialize(from + i, rawBuffer + offset);
VELOX_DCHECK_EQ(size, rowSizes_[from + i]);

dataVector.setNoCopy(
i, StringView(rawBuffer + offset, rowSizes_[from + i]));
offset += size;
}

{
auto lockedStats = stats_.wlock();
lockedStats->addRuntimeStat("serializedBytes", RuntimeCounter(totalSize));
lockedStats->addRuntimeStat(
"serializedBytes", RuntimeCounter(outputBufferSize));
}
}

Expand All @@ -227,13 +314,17 @@ class PartitionAndSerializeOperator : public Operator {
const bool replicateNullsAndAny_;
bool replicatedAny_{false};
std::vector<column_index_t> serializedColumnIndices_;
// Holder for partitionVector and replicateVector.
RowVectorPtr output_;

std::unique_ptr<velox::row::CompactRow> compactRow_;
// Decoded 'keyChannels_' columns.
std::vector<velox::DecodedVector> decodedVectors_;
// Reusable vector for storing partition id for each input row.
std::vector<uint32_t> partitions_;
// Reusable vector for storing serialised row size for each input row.
std::vector<uint32_t> rowSizes_;
vector_size_t nextOutputRow_{0};
};
} // namespace

Expand Down
Loading

0 comments on commit 411ea36

Please sign in to comment.