Skip to content

Commit

Permalink
[util] Add support for 32 & 64 byte alignment to Arena allocator
Browse files Browse the repository at this point in the history
On adding couple of member variables and methods to BlockBloomFilter class
for importing Or() function, predicate-test would sometimes randomly crash
with SIGSEGV in AVX operations.

On debugging, the crash would only happen when the "directory_" structure
in BlockBloomFilter is not 32-bytes aligned. It's surprising without the
addition of those methods, "directory_" so far has always been 32 bytes
aligned despite Arena allocator not supporting alignment values greater than
16 bytes.

With input from Todd, explored 3 options to fix the alignment problem:
1) Update the HeapBufferAllocator in util/memory to align allocation to
64 bytes. See AllocateInternal() implementation. Surprisingly the
FLAGS_allocator_aligned_mode is OFF by default so we appear to be relying
on the allocator to always allocate 16 byte aligned buffers. So this option
would require turning ON the FLAGS_allocator_aligned_mode flag by default.
2) Update the Arena allocator such that when needed extra bytes are allocated
to allow aligning with 32/64 bytes considering the new component will always
be 16 byte aligned. This requires updating some address/alignment logic
with offset_ and the new component allocation.
3) Don't touch the Arena allocator and simply add padding in the
ArenaBlockBloomFilterBufferAllocator to minimize any risk to other parts of
the codebase.

Opted for option #2 since it broadly adds support for 32/64 byte alignment
instead of limited scope of option #3. Option #1 is tempting but unsure about
the unknowns that turning on the allocator_aligned_mode would bring.

Although we need only support for 32 byte alignment for AVX operations,
also added support for 64 bytes to better align with cache line size.

Additionally this change:
- Adds a simple BlockBloomFilter unit test that reproduced the alignment
problem compared to end-to-end predicate-test which was turning out to be
difficult to debug.
- Fixes and enhances the arena-test with bunch of variations.

Change-Id: Ib665115fa0fc262a8b76c48f52947dedb84be2a7
Reviewed-on: http://gerrit.cloudera.org:8080/15372
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
  • Loading branch information
bbhavsar authored and adembo committed Mar 10, 2020
1 parent eaed5fb commit cf555ce
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 52 deletions.
16 changes: 16 additions & 0 deletions src/kudu/util/block_bloom_filter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "kudu/util/hash.pb.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
Expand Down Expand Up @@ -114,6 +115,21 @@ TEST_F(BlockBloomFilterTest, InvalidSpace) {
bf.Close();
}

// Simple Arena allocator based test that would sometimes trigger
// SIGSEGV due to misalignment of the "directory_" ptr on AVX operations
// before adding support for 32/64 byte alignment in Arena allocator.
// It simulates the allocation pattern in wire-protocol.cc.
TEST_F(BlockBloomFilterTest, ArenaAligned) {
Arena a(64);
auto* allocator = a.NewObject<ArenaBlockBloomFilterBufferAllocator>(&a);
auto* bf = a.NewObject<BlockBloomFilter>(allocator);
bf->Init(6, FAST_HASH, 0);
bool key = true;
Slice s(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
bf->Insert(s);
ASSERT_TRUE(bf->Find(s));
}

TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) {
BlockBloomFilter bf(allocator_);
Status s = bf.Init(4, UNKNOWN_HASH, 0);
Expand Down
7 changes: 3 additions & 4 deletions src/kudu/util/block_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,9 @@ ArenaBlockBloomFilterBufferAllocator::~ArenaBlockBloomFilterBufferAllocator() {

Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
DCHECK_NOTNULL(arena_);
// 16-bytes is the max alignment supported in arena currently whereas CACHELINE_SIZE
// is typically 64 bytes on modern CPUs.
// TODO(bankim): Needs investigation to support larger alignment values.
*ptr = arena_->AllocateBytesAligned(bytes, 16);
static_assert(CACHELINE_SIZE >= 32,
"For AVX operations, need buffers to be 32-bytes aligned or higher");
*ptr = arena_->AllocateBytesAligned(bytes, CACHELINE_SIZE);
return *ptr == nullptr ?
Status::RuntimeError(Substitute("Arena bad_alloc. bytes: $0", bytes)) :
Status::OK();
Expand Down
123 changes: 97 additions & 26 deletions src/kudu/util/memory/arena-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@
// specific language governing permissions and limitations
// under the License.

#include "kudu/util/memory/arena.h"

#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/gutil/stringprintf.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/memory/memory.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/memory.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_util.h"

DEFINE_int32(num_threads, 16, "Number of threads to test");
DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do");
Expand All @@ -42,19 +47,52 @@ using std::string;
using std::thread;
using std::vector;

// From the "arena" allocate number of bytes required to copy the "to_write" buffer
// and add the allocated buffer to the output "ptrs" vector.
template<class ArenaType>
static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
std::vector<void *> ptrs;
static void AllocateBytesAndWrite(ArenaType* arena, const Slice& to_write, vector<void*>* ptrs) {
void *allocated_bytes = arena->AllocateBytes(to_write.size());
ASSERT_NE(nullptr, allocated_bytes);
memcpy(allocated_bytes, to_write.data(), to_write.size());
ptrs->push_back(allocated_bytes);
}

// From the "arena" allocate aligned bytes as specified by "alignment". Number of bytes
// must be at least the size required to copy the "to_write" buffer.
// Add the allocated aligned buffer to the output "ptrs" vector.
template<class ArenaType, class RNG>
static void AllocateAlignedBytesAndWrite(ArenaType* arena, const size_t alignment,
const Slice& to_write, RNG* r, vector<void*>* ptrs) {
// To test alignment we allocate random number of bytes within bounds
// but write to fixed number of bytes "to_write".
size_t num_bytes = FLAGS_alloc_size + r->Uniform32(128 - FLAGS_alloc_size + 1);
ASSERT_LE(to_write.size(), num_bytes);
void* allocated_bytes = arena->AllocateBytesAligned(num_bytes, alignment);
ASSERT_NE(nullptr, allocated_bytes);
ASSERT_EQ(0, reinterpret_cast<uintptr_t>(allocated_bytes) % alignment) <<
"failed to align on " << alignment << "b boundary: " << allocated_bytes;
memcpy(allocated_bytes, to_write.data(), to_write.size());
ptrs->push_back(allocated_bytes);
}

// Thread callback function used by bunch of test cases below.
template<class ArenaType, class RNG, bool InvokeAligned = false>
static void AllocateAndTestThreadFunc(ArenaType *arena, uint8_t thread_index, RNG* r) {
vector<void *> ptrs;
ptrs.reserve(FLAGS_allocs_per_thread);

char buf[FLAGS_alloc_size];
uint8_t buf[FLAGS_alloc_size];
memset(buf, thread_index, FLAGS_alloc_size);
Slice data(buf, FLAGS_alloc_size);

for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
CHECK(alloced);
memcpy(alloced, buf, FLAGS_alloc_size);
ptrs.push_back(alloced);
if (InvokeAligned) {
// Test alignment up to 64 bytes.
const size_t alignment = 1 << (i % 7);
AllocateAlignedBytesAndWrite(arena, alignment, data, r, &ptrs);
} else {
AllocateBytesAndWrite(arena, data, &ptrs);
}
}

for (void *p : ptrs) {
Expand All @@ -65,43 +103,76 @@ static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
}

// Non-templated function to forward to above -- simplifies thread creation
static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) {
AllocateThread(arena, thread_index);
static void AllocateAndTestTSArenaFunc(ThreadSafeArena *arena, uint8_t thread_index,
ThreadSafeRandom* r) {
AllocateAndTestThreadFunc(arena, thread_index, r);
}

template<typename ArenaType, typename RNG>
static void TestArenaAlignmentHelper() {
RNG r(SeedRandom());

for (size_t initial_size = 16; initial_size <= 2048; initial_size <<= 1) {
ArenaType arena(initial_size);
static constexpr bool kIsMultiThreaded = std::is_same<ThreadSafeArena, ArenaType>::value;
if (kIsMultiThreaded) {
vector<thread> threads;
threads.reserve(FLAGS_num_threads);
for (auto i = 0; i < FLAGS_num_threads; i++) {
threads.emplace_back(
AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokeAligned */>, &arena, i, &r);
}
for (thread& thr : threads) {
thr.join();
}
} else {
// Invoke the helper method on the same thread avoiding separate single
// thread creation/join.
AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokedAligned */>(&arena, 0, &r);
}
}
}

TEST(TestArena, TestSingleThreaded) {
Arena arena(128);
AllocateThread(&arena, 0);
Random r(SeedRandom());
AllocateAndTestThreadFunc(&arena, 0, &r);
}



TEST(TestArena, TestMultiThreaded) {
CHECK(FLAGS_num_threads < 256);

ThreadSafeRandom r(SeedRandom());
ThreadSafeArena arena(1024);

vector<thread> threads;
for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
threads.reserve(FLAGS_num_threads);
for (auto i = 0; i < FLAGS_num_threads; i++) {
threads.emplace_back(AllocateAndTestTSArenaFunc, &arena, i, &r);
}

for (thread& thr : threads) {
thr.join();
}
}

TEST(TestArena, TestAlignment) {
ThreadSafeArena arena(1024);
for (int i = 0; i < 1000; i++) {
int alignment = 1 << (1 % 5);
TEST(TestArena, TestAlignmentThreadSafe) {
TestArenaAlignmentHelper<ThreadSafeArena, ThreadSafeRandom>();
}

void *ret = arena.AllocateBytesAligned(5, alignment);
ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
"failed to align on " << alignment << "b boundary: " <<
ret;
}
TEST(TestArena, TestAlignmentNotThreadSafe) {
TestArenaAlignmentHelper<Arena, Random>();
}

TEST(TestArena, TestAlignmentSmallArena) {
// Start with small initial size and allocate bytes more than the size of the current
// component to trigger fallback code path in Arena. Moreover allocate number of bytes
// with alignment such that "aligned_size" exceeds "next_component_size".
Arena arena(16);
constexpr size_t alignment = 32;
void *ret = arena.AllocateBytesAligned(33, alignment);
ASSERT_NE(nullptr, ret);
ASSERT_EQ(0, reinterpret_cast<uintptr_t>(ret) % alignment) <<
"failed to align on " << alignment << "b boundary: " << ret;
}

TEST(TestArena, TestObjectAlignment) {
Expand Down
31 changes: 22 additions & 9 deletions src/kudu/util/memory/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,36 @@ void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size
// Really need to allocate more space.
size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
// But, allocate enough, even if the request is large. In this case,
// might violate the max_element_size bound.
if (next_component_size < size) {
next_component_size = size;
// might violate the "max_buffer_size_" bound.
// Component allocation is guaranteed to be 16-byte aligned, see NewComponent(),
// but we also need to support higher alignment values of 32 and 64 bytes and
// hence we add padding so that first request to allocate bytes after new
// component creation doesn't fail.
size_t aligned_size;
if (align <= 16) {
aligned_size = size;
} else {
DCHECK(align == 32 || align == 64);
aligned_size = size + align - 16;
}

if (next_component_size < aligned_size) {
next_component_size = aligned_size;
}

// If soft quota is exhausted we will only get the "minimal" amount of memory
// we ask for. In this case if we always use "size" as minimal, we may degrade
// we ask for. In this case if we always use "aligned_size" as minimal, we may degrade
// to allocating a lot of tiny components, one for each string added to the
// arena. This would be very inefficient, so let's first try something between
// "size" and "next_component_size". If it fails due to hard quota being
// exhausted, we'll fall back to using "size" as minimal.
size_t minimal = (size + next_component_size) / 2;
CHECK_LE(size, minimal);
// "aligned_size" and "next_component_size". If it fails due to hard quota being
// exhausted, we'll fall back to using "aligned_size" as minimal.
size_t minimal = (aligned_size + next_component_size) / 2;
CHECK_LE(aligned_size, minimal);
CHECK_LE(minimal, next_component_size);
// Now, just make sure we can actually get the memory.
Component* component = NewComponent(next_component_size, minimal);
if (component == nullptr) {
component = NewComponent(next_component_size, size);
component = NewComponent(next_component_size, aligned_size);
}
if (!component) return nullptr;

Expand Down
42 changes: 29 additions & 13 deletions src/kudu/util/memory/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
// Memory arena for variable-length datatypes and STL collections.
#pragma once

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <memory>
#include <new>
#include <ostream>
#include <utility>
#include <vector>

#include <boost/signals2/dummy_mutex.hpp>
Expand Down Expand Up @@ -160,8 +160,8 @@ class ArenaBase {
}

// Allocate bytes, ensuring a specified alignment.
// NOTE: alignment MUST be a power of two, or else this will break.
void* AllocateBytesAligned(const size_t size, const size_t alignment);
// NOTE: alignment MUST be a power of two and only upto 64 bytes is supported.
void* AllocateBytesAligned(size_t size, size_t alignment);

// Removes all data from the arena. (Invalidates all pointers returned by
// AddSlice and AllocateBytes). Does not cause memory allocation.
Expand All @@ -183,8 +183,9 @@ class ArenaBase {
class Component;

// Fallback for AllocateBytes non-fast-path
void* AllocateBytesFallback(const size_t size, const size_t align);
void* AllocateBytesFallback(size_t size, size_t align);

// Returned component is guaranteed to be 16-byte aligned.
Component* NewComponent(size_t requested_size, size_t minimum_size);
void AddComponent(Component *component);

Expand Down Expand Up @@ -367,6 +368,19 @@ class ArenaBase<THREADSAFE>::Component {
}

private:
// Adjusts the supplied "offset" such that the combined "data" ptr and "offset" aligns
// with "alignment" bytes.
//
// Component start address "data_" is only guaranteed to be 16-byte aligned with enough
// bytes for the first request size plus any padding needed for alignment.
// So to support alignment values greater than 16 bytes, align the destination address ptr
// that'll be returned by AllocatedBytesAligned() and not just the "offset_".
template<typename T>
static inline T AlignOffset(const uint8_t* data, const T offset, const size_t alignment) {
const auto data_start_addr = reinterpret_cast<uintptr_t>(data);
return KUDU_ALIGN_UP((data_start_addr + offset), alignment) - data_start_addr;
}

// Mark the given range unpoisoned in ASAN.
// This is a no-op in a non-ASAN build.
void AsanUnpoison(const void* addr, size_t size);
Expand All @@ -386,21 +400,21 @@ class ArenaBase<THREADSAFE>::Component {
DISALLOW_COPY_AND_ASSIGN(Component);
};


// Thread-safe implementation
template <>
inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
const size_t size, const size_t alignment) {
// Special case check the allowed alignments. Currently, we only ensure
// the allocated buffer components are 16-byte aligned, and the code path
// doesn't support larger alignment.
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
alignment == 8 || alignment == 16)
// the allocated buffer components are 16-byte aligned and add extra padding
// to support 32/64 byte alignment but the code path hasn't been tested
// with larger alignment values nor has there been a need.
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
alignment == 16 || alignment == 32 || alignment == 64)
<< "bad alignment: " << alignment;
retry:
Atomic32 offset = Acquire_Load(&offset_);

Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
Atomic32 aligned = AlignOffset(data_, offset, alignment);
Atomic32 new_offset = aligned + size;

if (PREDICT_TRUE(new_offset <= size_)) {
Expand All @@ -421,13 +435,15 @@ inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
template <>
inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
const size_t size, const size_t alignment) {
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
alignment == 8 || alignment == 16)
DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
alignment == 16 || alignment == 32 || alignment == 64)
<< "bad alignment: " << alignment;
size_t aligned = KUDU_ALIGN_UP(offset_, alignment);

size_t aligned = AlignOffset(data_, offset_, alignment);
uint8_t* destination = data_ + aligned;
size_t save_offset = offset_;
offset_ = aligned + size;

if (PREDICT_TRUE(offset_ <= size_)) {
AsanUnpoison(data_ + aligned, size);
return destination;
Expand Down

0 comments on commit cf555ce

Please sign in to comment.