Skip to content

Commit

Permalink
support collation in group by and join operator (#771)
Browse files Browse the repository at this point in the history
* support collation in agg

* support collation in hash method based agg/set

* support collation in join

* fix bug

* refine

* refine

* refine

* support join with new collators
  • Loading branch information
windtalker authored Jul 7, 2020
1 parent cb3eabf commit 84c2650
Show file tree
Hide file tree
Showing 43 changed files with 483 additions and 338 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnAggregateFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limi
}

/// Is required to support operations with Set
void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) const
void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const
{
WriteBufferFromOwnString wbuf;
func->serialize(getData()[n], wbuf);
Expand Down Expand Up @@ -280,15 +280,15 @@ void ColumnAggregateFunction::insertDefault()
function->create(getData().back());
}

StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & dst, const char *& begin) const
StringRef ColumnAggregateFunction::serializeValueIntoArena(size_t n, Arena & dst, const char *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const
{
IAggregateFunction * function = func.get();
WriteBufferFromArena out(dst, begin);
function->serialize(getData()[n], out);
return out.finish();
}

const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * src_arena)
const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char * src_arena, std::shared_ptr<TiDB::ITiDBCollator>)
{
IAggregateFunction * function = func.get();

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega

void insertDefault() override;

StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;

const char * deserializeAndInsertFromArena(const char * pos) override;
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>) override;

void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;

size_t byteSize() const override;

Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
}


StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);
Expand All @@ -178,33 +178,33 @@ StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char con

size_t values_size = 0;
for (size_t i = 0; i < array_size; ++i)
values_size += getData().serializeValueIntoArena(offset + i, arena, begin).size;
values_size += getData().serializeValueIntoArena(offset + i, arena, begin, collator, sort_key_container).size;

return StringRef(begin, sizeof(array_size) + values_size);
}


const char * ColumnArray::deserializeAndInsertFromArena(const char * pos)
const char * ColumnArray::deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator> collator)
{
size_t array_size = *reinterpret_cast<const size_t *>(pos);
pos += sizeof(array_size);

for (size_t i = 0; i < array_size; ++i)
pos = getData().deserializeAndInsertFromArena(pos);
pos = getData().deserializeAndInsertFromArena(pos, collator);

getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + array_size);
return pos;
}


void ColumnArray::updateHashWithValue(size_t n, SipHash & hash) const
void ColumnArray::updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const
{
size_t array_size = sizeAt(n);
size_t offset = offsetAt(n);

hash.update(array_size);
for (size_t i = 0; i < array_size; ++i)
getData().updateHashWithValue(offset + i, hash);
getData().updateHashWithValue(offset + i, hash, collator, sort_key_container);
}


Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
void get(size_t n, Field & res) const override;
StringRef getDataAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>) override;
void updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insert(const Field & x) override;
void insertFrom(const IColumn & src_, size_t n) override;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Columns/ColumnConst.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,23 @@ class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>
s -= n;
}

StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const override
{
return data->serializeValueIntoArena(0, arena, begin);
return data->serializeValueIntoArena(0, arena, begin, collator, sort_key_container);
}

const char * deserializeAndInsertFromArena(const char * pos) override
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator> collator) override
{
auto & mutable_data = data->assumeMutableRef();
auto res = mutable_data.deserializeAndInsertFromArena(pos);
auto res = mutable_data.deserializeAndInsertFromArena(pos, collator);
mutable_data.popBack(1);
++s;
return res;
}

void updateHashWithValue(size_t, SipHash & hash) const override
void updateHashWithValue(size_t, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const override
{
data->updateHashWithValue(0, hash);
data->updateHashWithValue(0, hash, collator, sort_key_container);
}

ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ int ColumnDecimal<T>::compareAt(size_t n, size_t m, const IColumn & rhs_, int) c
}

template <typename T>
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnDecimal<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const
{
auto pos = arena.allocContinue(sizeof(T), begin);
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}

template <typename T>
const char * ColumnDecimal<T>::deserializeAndInsertFromArena(const char * pos)
const char * ColumnDecimal<T>::deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>)
{
data.push_back(unalignedLoad<T>(pos));
return pos + sizeof(T);
Expand All @@ -57,7 +57,7 @@ UInt64 ColumnDecimal<T>::get64(size_t n) const
}

template <typename T>
void ColumnDecimal<T>::updateHashWithValue(size_t n, SipHash & hash) const
void ColumnDecimal<T>::updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const
{
hash.update(data[n]);
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ class ColumnDecimal final : public COWPtrHelper<ColumnVectorHelper, ColumnDecima

void popBack(size_t n) override { data.resize_assume_reserved(data.size() - n); }

StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>) override;
void updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,22 @@ void ColumnFixedString::insertData(const char * pos, size_t length)
memcpy(&chars[old_size], pos, length);
}

StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const
StringRef ColumnFixedString::serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const
{
auto pos = arena.allocContinue(n, begin);
memcpy(pos, &chars[n * index], n);
return StringRef(pos, n);
}

const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos)
const char * ColumnFixedString::deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>)
{
size_t old_size = chars.size();
chars.resize(old_size + n);
memcpy(&chars[old_size], pos, n);
return pos + n;
}

void ColumnFixedString::updateHashWithValue(size_t index, SipHash & hash) const
void ColumnFixedString::updateHashWithValue(size_t index, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const
{
hash.update(reinterpret_cast<const char *>(&chars[n * index]), n);
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnFixedString.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ class ColumnFixedString final : public COWPtrHelper<IColumn, ColumnFixedString>
chars.resize_assume_reserved(chars.size() - n * elems);
}

StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin) const override;
StringRef serializeValueIntoArena(size_t index, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;

const char * deserializeAndInsertFromArena(const char * pos) override;
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>) override;

void updateHashWithValue(size_t index, SipHash & hash) const override;
void updateHashWithValue(size_t index, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;

int compareAt(size_t p1, size_t p2, const IColumn & rhs_, int /*nan_direction_hint*/) const override
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ class ColumnFunction final : public COWPtrHelper<IColumn, ColumnFunction>
throw Exception("Cannot insert into " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

StringRef serializeValueIntoArena(size_t, Arena &, char const *&) const override
StringRef serializeValueIntoArena(size_t, Arena &, char const *&, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override
{
throw Exception("Cannot serialize from " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

const char * deserializeAndInsertFromArena(const char *) override
const char * deserializeAndInsertFromArena(const char *, std::shared_ptr<TiDB::ITiDBCollator>) override
{
throw Exception("Cannot deserialize to " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

void updateHashWithValue(size_t, SipHash &) const override
void updateHashWithValue(size_t, SipHash &, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override
{
throw Exception("updateHashWithValue is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Columns/ColumnNullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ ColumnNullable::ColumnNullable(MutableColumnPtr && nested_column_, MutableColumn
}


void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash) const
void ColumnNullable::updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const
{
const auto & arr = getNullMapData();
hash.update(arr[n]);
if (arr[n] == 0)
getNestedColumn().updateHashWithValue(n, hash);
getNestedColumn().updateHashWithValue(n, hash, collator, sort_key_container);
}


Expand Down Expand Up @@ -87,7 +87,7 @@ void ColumnNullable::insertData(const char * /*pos*/, size_t /*length*/)
throw Exception{"Method insertData is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED};
}

StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const
{
const auto & arr = getNullMapData();
static constexpr auto s = sizeof(arr[0]);
Expand All @@ -98,20 +98,20 @@ StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char
size_t nested_size = 0;

if (arr[n] == 0)
nested_size = getNestedColumn().serializeValueIntoArena(n, arena, begin).size;
nested_size = getNestedColumn().serializeValueIntoArena(n, arena, begin, collator, sort_key_container).size;

return StringRef{begin, s + nested_size};
}

const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator> collator)
{
UInt8 val = *reinterpret_cast<const UInt8 *>(pos);
pos += sizeof(val);

getNullMapData().push_back(val);

if (val == 0)
pos = getNestedColumn().deserializeAndInsertFromArena(pos);
pos = getNestedColumn().deserializeAndInsertFromArena(pos, collator);
else
getNestedColumn().insertDefault();

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
UInt64 get64(size_t n) const override { return nested_column->get64(n); }
StringRef getDataAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
const char * deserializeAndInsertFromArena(const char * pos) override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>) override;
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;;
void insert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;
Expand All @@ -74,7 +74,7 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
size_t byteSize() const override;
size_t allocatedBytes() const override;
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override;
void getExtremes(Field & min, Field & max) const override;

MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override
Expand Down
21 changes: 15 additions & 6 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
offsets.resize_assume_reserved(offsets.size() - n);
}

StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator>, String &) const override
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);
Expand All @@ -191,7 +191,7 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
return res;
}

const char * deserializeAndInsertFromArena(const char * pos) override
const char * deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator>) override
{
const size_t string_size = *reinterpret_cast<const size_t *>(pos);
pos += sizeof(string_size);
Expand All @@ -205,13 +205,22 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
return pos + string_size;
}

void updateHashWithValue(size_t n, SipHash & hash) const override
void updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const override
{
size_t string_size = sizeAt(n);
size_t offset = offsetAt(n);

hash.update(reinterpret_cast<const char *>(&string_size), sizeof(string_size));
hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size);
if (collator != nullptr)
{
auto sort_key = collator->sortKey(reinterpret_cast<const char *>(&chars[offset]), string_size, sort_key_container);
string_size = sort_key.size;
hash.update(reinterpret_cast<const char *>(&string_size), sizeof(string_size));
hash.update(sort_key.data, sort_key.size);
}
else
{
hash.update(reinterpret_cast<const char *>(&string_size), sizeof(string_size));
hash.update(reinterpret_cast<const char *>(&chars[offset]), string_size);
}
}

void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Columns/ColumnTuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,27 @@ void ColumnTuple::popBack(size_t n)
column->assumeMutableRef().popBack(n);
}

StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const
{
size_t values_size = 0;
for (auto & column : columns)
values_size += column->serializeValueIntoArena(n, arena, begin).size;
values_size += column->serializeValueIntoArena(n, arena, begin, collator, sort_key_container).size;

return StringRef(begin, values_size);
}

const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos)
const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos, std::shared_ptr<TiDB::ITiDBCollator> collator)
{
for (auto & column : columns)
pos = column->assumeMutableRef().deserializeAndInsertFromArena(pos);
pos = column->assumeMutableRef().deserializeAndInsertFromArena(pos, collator);

return pos;
}

void ColumnTuple::updateHashWithValue(size_t n, SipHash & hash) const
void ColumnTuple::updateHashWithValue(size_t n, SipHash & hash, std::shared_ptr<TiDB::ITiDBCollator> collator, String & sort_key_container) const
{
for (auto & column : columns)
column->updateHashWithValue(n, hash);
column->updateHashWithValue(n, hash, collator, sort_key_container);
}

void ColumnTuple::insertRangeFrom(const IColumn & src, size_t start, size_t length)
Expand Down
Loading

0 comments on commit 84c2650

Please sign in to comment.