Skip to content

Commit

Permalink
fix crash when the aggregated element was decimal (#1402)
Browse files Browse the repository at this point in the history
    1. Fix the crash first
    2. then redesign the entire aggregated data stream
  • Loading branch information
adofsauron authored and mergify[bot] committed Mar 31, 2023
1 parent 9c87c73 commit 4bd09ac
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 39 deletions.
53 changes: 53 additions & 0 deletions mysql-test/suite/tianmu/r/issue1402.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
DROP DATABASE IF EXISTS issue1402_test;
CREATE DATABASE issue1402_test;
USE issue1402_test;
CREATE TABLE t1 (id INT, d DECIMAL(10,4)) ENGINE=tianmu;
INSERT INTO t1 VALUES (1, 1.2345), (2, 2.3456), (3, 3.4567);
SELECT COUNT(*), SUM(d), AVG(d), MIN(d), MAX(d) FROM t1;
COUNT(*) SUM(d) AVG(d) MIN(d) MAX(d)
3 7.0368 2.34560000 1.2345 3.4567
SELECT id, SUM(d) FROM t1 GROUP BY id;
id SUM(d)
1 1.2345
2 2.3456
3 3.4567
SELECT * FROM t1 ORDER BY d DESC;
id d
3 3.4567
2 2.3456
1 1.2345
SELECT * FROM t1 WHERE d > (SELECT AVG(d) FROM t1);
id d
3 3.4567
SELECT id, SUM(d) FROM t1 WHERE d > (SELECT AVG(d) FROM t1) GROUP BY id;
id SUM(d)
3 3.4567
SELECT * FROM t1 WHERE d > (SELECT AVG(d) FROM t1) ORDER BY d DESC;
id d
3 3.4567
SELECT id, SUM(d) FROM t1 WHERE d > (SELECT AVG(d) FROM t1) GROUP BY id ORDER BY SUM(d) DESC;
id SUM(d)
3 3.4567
SELECT t1.id, t1.d, t2.d FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id;
id d d
2 2.3456 2.3456
3 3.4567 3.4567
SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id;
id SUM(t1.d) SUM(t2.d)
2 2.3456 2.3456
3 3.4567 3.4567
SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id ORDER BY SUM(t1.d) DESC;
id SUM(t1.d) SUM(t2.d)
3 3.4567 3.4567
2 2.3456 2.3456
SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id ORDER BY SUM(t1.d) DESC LIMIT 1;
id SUM(t1.d) SUM(t2.d)
3 3.4567 3.4567
SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id ORDER BY SUM(t1.d) DESC LIMIT 1 OFFSET 1;
id SUM(t1.d) SUM(t2.d)
2 2.3456 2.3456
SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id HAVING SUM(t1.d) > 3 ORDER BY SUM(t1.d) DESC LIMIT 1 OFFSET 1;
id SUM(t1.d) SUM(t2.d)
SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id WHERE t1.id > 1 GROUP BY t1.id HAVING SUM(t1.d) > 3 ORDER BY SUM(t1.d) DESC LIMIT 1 OFFSET 1;
id SUM(t1.d) SUM(t2.d)
drop database issue1402_test;
43 changes: 43 additions & 0 deletions mysql-test/suite/tianmu/t/issue1402.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
--source include/have_tianmu.inc

--disable_warnings
DROP DATABASE IF EXISTS issue1402_test;
--enable_warnings

CREATE DATABASE issue1402_test;

USE issue1402_test;

CREATE TABLE t1 (id INT, d DECIMAL(10,4)) ENGINE=tianmu;

INSERT INTO t1 VALUES (1, 1.2345), (2, 2.3456), (3, 3.4567);

SELECT COUNT(*), SUM(d), AVG(d), MIN(d), MAX(d) FROM t1;

SELECT id, SUM(d) FROM t1 GROUP BY id;

SELECT * FROM t1 ORDER BY d DESC;

SELECT * FROM t1 WHERE d > (SELECT AVG(d) FROM t1);

SELECT id, SUM(d) FROM t1 WHERE d > (SELECT AVG(d) FROM t1) GROUP BY id;

SELECT * FROM t1 WHERE d > (SELECT AVG(d) FROM t1) ORDER BY d DESC;

SELECT id, SUM(d) FROM t1 WHERE d > (SELECT AVG(d) FROM t1) GROUP BY id ORDER BY SUM(d) DESC;

SELECT t1.id, t1.d, t2.d FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id;

SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id;

SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id ORDER BY SUM(t1.d) DESC;

SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id ORDER BY SUM(t1.d) DESC LIMIT 1;

SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id ORDER BY SUM(t1.d) DESC LIMIT 1 OFFSET 1;

SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id GROUP BY t1.id HAVING SUM(t1.d) > 3 ORDER BY SUM(t1.d) DESC LIMIT 1 OFFSET 1;

SELECT t1.id, SUM(t1.d), SUM(t2.d) FROM t1 JOIN (SELECT id, d FROM t1 WHERE d > 2) t2 ON t1.id = t2.id WHERE t1.id > 1 GROUP BY t1.id HAVING SUM(t1.d) > 3 ORDER BY SUM(t1.d) DESC LIMIT 1 OFFSET 1;

drop database issue1402_test;
2 changes: 1 addition & 1 deletion sql/item.h
Original file line number Diff line number Diff line change
Expand Up @@ -3570,7 +3570,7 @@ class Item_decimal :public Item_num
enum_field_types field_type() const { return MYSQL_TYPE_NEWDECIMAL; }
longlong val_int();
double val_real();
String *val_str(String*);
virtual String *val_str(String*);
my_decimal *val_decimal(my_decimal *val) { return &decimal_value; }
bool get_date(MYSQL_TIME *ltime, my_time_flags_t fuzzydate)
{
Expand Down
85 changes: 47 additions & 38 deletions storage/tianmu/core/aggregation_algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t

for (uint i = 0; i < t->NumOfAttrs(); i++) { // first pass: find all grouping attributes
TempTable::Attr &cur_a = *(t->GetAttrP(i));
if (cur_a.mode == common::ColOperation::DELAYED) // delayed column (e.g. complex exp. on
// aggregations)

// delayed column (e.g. complex exp. on aggregations)
if (cur_a.mode == common::ColOperation::DELAYED)
continue;

if ((just_distinct && cur_a.alias) || cur_a.mode == common::ColOperation::GROUP_BY) {
if (cur_a.mode == common::ColOperation::GROUP_BY)
group_by_found = true;
Expand All @@ -62,10 +64,9 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
break;
}
}
if (already_added == false) {
if (!already_added) {
int new_attr_number = gbw.NumOfGroupingAttrs();
gbw.AddGroupingColumn(new_attr_number, i,
*(t->GetAttrP(i))); // GetAttrP(i) is needed
gbw.AddGroupingColumn(new_attr_number, i, *(t->GetAttrP(i))); // GetAttrP(i) is needed

// approximate a number of groups
if (upper_approx_of_groups < mind->NumOfTuples()) {
Expand All @@ -80,8 +81,9 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t

for (uint i = 0; i < t->NumOfAttrs(); i++) { // second pass: find all aggregated attributes
TempTable::Attr &cur_a = *(t->GetAttrP(i));
if (cur_a.mode == common::ColOperation::DELAYED) { // delayed column (e.g. complex exp.
// on aggregations)

// delayed column (e.g. complex exp.on aggregations)
if (cur_a.mode == common::ColOperation::DELAYED) {
MIDummyIterator m(1);
cur_a.term.vc->LockSourcePacks(m);
continue;
Expand Down Expand Up @@ -109,19 +111,22 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
max_size = cur_a.term.vc->MaxStringSize();
min_v = cur_a.term.vc->RoughMin();
max_v = cur_a.term.vc->RoughMax();
if (cur_a.distinct && cur_a.term.vc->IsDistinct() && cur_a.mode != common::ColOperation::LISTING)
if (cur_a.distinct && cur_a.term.vc->IsDistinct() && cur_a.mode != common::ColOperation::LISTING) {
cur_a.distinct = false; // "distinct" not needed, as values are distinct anyway
else if (cur_a.distinct) {
} else if (cur_a.distinct) {
max_no_of_distinct = cur_a.term.vc->GetApproxDistVals(false); // no nulls included
if (tianmu_control_.isOn())
tianmu_control_.lock(m_conn->GetThreadID())
<< "Adding dist. column, min = " << min_v << ", max = " << max_v << ", dist = " << max_no_of_distinct
<< system::unlock;
}
}

// special case: aggregations on empty result (should not
// be 0, because it triggers max. buffer settings)
if (max_no_of_distinct == 0)
max_no_of_distinct = 1; // special case: aggregations on empty result (should not
// be 0, because it triggers max. buffer settings)
max_no_of_distinct = 1;

gbw.AddAggregatedColumn(i, cur_a, max_no_of_distinct, min_v, max_v, max_size);
}
}
Expand Down Expand Up @@ -169,8 +174,7 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
gbw.PutAggregatedValueForCount(0, row, count_distinct);
all_done_in_one_row = true;
}
} // Special case 3: SELECT MIN(col) FROM ..... or SELECT MAX(col) FROM
// .....;
} // Special case 3: SELECT MIN(col) FROM ..... or SELECT MAX(col) FROM ...;
else if (t->GetWhereConds().Size() == 0 &&
((gbw.IsMinOnly() && t->NumOfAttrs() == 1 && min_v != common::MINUS_INF_64) ||
(gbw.IsMaxOnly() && t->NumOfAttrs() == 1 && max_v != common::PLUS_INF_64))) {
Expand All @@ -189,9 +193,9 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
t->GetAttrP(i)->page_size = 1;
t->GetAttrP(i)->CreateBuffer(1);
}
if (limit == -1 || (offset == 0 && limit >= 1)) { // limit is -1 (off), or a positive
// number, 0 means nothing should be
// displayed.

// limit is -1 (off), or a positive number, 0 means nothing should be displayed.
if (limit == -1 || (offset == 0 && limit >= 1)) {
--limit;
AggregateFillOutput(gbw, row, offset);
if (sender) {
Expand All @@ -206,11 +210,13 @@ void AggregationAlgorithm::Aggregate(bool just_distinct, int64_t &limit, int64_t
if (limit != -1)
limit = local_limit;
}
t->ClearMultiIndexP(); // cleanup (i.e. regarded as materialized,
// one-dimensional)

// cleanup (i.e. regarded as materialized, one-dimensional)
t->ClearMultiIndexP();

// to prevent another execution of HAVING on DISTINCT+GROUP BY
if (t->HasHavingConditions())
t->ClearHavingConditions(); // to prevent another execution of HAVING on
// DISTINCT+GROUP BY
t->ClearHavingConditions();
}

void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int64_t &limit, int64_t &offset,
Expand Down Expand Up @@ -553,8 +559,7 @@ AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw

if (require_locking_gr) {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
gbw.LockPackAlways(gr_a, *mit); // note: ColumnNotOmitted checked
// inside
gbw.LockPackAlways(gr_a, *mit); // note: ColumnNotOmitted checked inside
require_locking_gr = false;
}
if (require_locking_ag) {
Expand All @@ -579,11 +584,14 @@ AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw
return AggregaGroupingResult::AGR_FINISH; // aggregation finished
}
}

if (skip_packrow)
gbw.packrows_omitted++;
else if (part_omitted)
gbw.packrows_part_omitted++;
if (packrow_done) { // This packrow will not be needed any more

// This packrow will not be needed any more
if (packrow_done) {
gbw.TuplesResetBetween(cur_tuple, cur_tuple + packrow_length - 1);
}

Expand All @@ -592,10 +600,6 @@ AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw
return AggregaGroupingResult::AGR_OK; // success - roughly omitted
}

// bool require_locking_ag = true; // a new packrow,
// so locking will be needed bool require_locking_gr = (uniform_pos ==
// common::NULL_VALUE_64); // do not lock if the grouping row is uniform

while (mit->IsValid()) { // becomes invalid on pack end
if (m_conn->Killed())
return AggregaGroupingResult::AGR_KILLED; // killed
Expand All @@ -608,20 +612,24 @@ AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw

int64_t pos = 0;
bool existed = true;
if (uniform_pos != common::NULL_VALUE_64) // either uniform because of KNs, or = 0,
// because there is no grouping columns
pos = uniform_pos; // existed == true, as above

// either uniform because of KNs, or = 0, because there is no grouping columns
// existed == true, as above
if (uniform_pos != common::NULL_VALUE_64)
pos = uniform_pos;
else {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
if (gbw.ColumnNotOmitted(gr_a))
gbw.PutGroupingValue(gr_a, *mit);
existed = gbw.FindCurrentRow(pos);
}

if (pos != common::NULL_VALUE_64) { // Any place left? If not, just omit
// the tuple.
gbw.TuplesReset(cur_tuple); // internally delayed for optimization
// purposes - must be committed at the end
// Any place left? If not, just omit the tuple.
// internally delayed for optimization
// purposes - must be committed at the end
if (pos != common::NULL_VALUE_64) {
gbw.TuplesReset(cur_tuple);

if (!existed) {
aggregations_not_changeable = false;
gbw.AddGroup(); // successfully added
Expand All @@ -636,20 +644,21 @@ AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw
if (!aggregations_not_changeable) {
// Lock packs if needed
if (require_locking_ag) {
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
gbw.LockPack(gr_a,
*mit); // note: ColumnNotOmitted checked inside
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++) {
gbw.LockPack(gr_a, *mit); // note: ColumnNotOmitted checked inside
}
require_locking_ag = false;
}

// Prepare packs for aggregated columns
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++) {
if (gbw.ColumnNotOmitted(gr_a)) {
bool value_successfully_aggregated = gbw.PutAggregatedValue(gr_a, pos, *mit, factor);
if (!value_successfully_aggregated) {
gbw.DistinctlyOmitted(gr_a, cur_tuple);
}
}
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions storage/tianmu/core/item_tianmu_field.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ Item_tianmudecimal::Item_tianmudecimal(DataType t) : Item_decimal(0, false) {
}

void Item_tianmudecimal::Set(int64_t val) {
std::scoped_lock guard(mtx);
std::fill(decimal_value.buf, decimal_value.buf + decimal_value.len, 0);
if (val) {
int2my_decimal((uint)-1, val, 0, &decimal_value);
Expand All @@ -248,6 +249,11 @@ void Item_tianmudecimal::Set(int64_t val) {
decimal_value.frac = scale;
}

String *Item_tianmudecimal::val_str(String *str) {
std::scoped_lock guard(mtx);
return Item_decimal::val_str(str);
}

my_decimal *Item_tianmudatetime_base::val_decimal(my_decimal *d) {
int2my_decimal((uint)-1, val_int(), 0, d);
return d;
Expand Down
3 changes: 3 additions & 0 deletions storage/tianmu/core/item_tianmu_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once

#include <map>
#include <mutex>
#include <set>

#include "common/common_definitions.h"
Expand Down Expand Up @@ -133,10 +134,12 @@ class Item_tianmudecimal : public Item_decimal {
Item_tianmudecimal(DataType t);

void Set(int64_t val);
String *val_str(String *) override;

private:
int scale;
int64_t scaleCoef;
std::mutex mtx;
};

//! Base class for TIANMU's Item classes to store date/time values of columns
Expand Down
1 change: 1 addition & 0 deletions storage/tianmu/core/pack_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ void VCPackGuardian::LockPackrowOnLockOneByThread(const MIIterator &mit) {
last_pack_thread_[thread_id] = std::move(pack_value);
}
} else {
std::scoped_lock lock(mx_thread_);
auto &lock_thread = last_pack_thread_[thread_id];
auto &lock_dim = lock_thread[cur_dim];
lock_dim[col_index] = cur_pack;
Expand Down

0 comments on commit 4bd09ac

Please sign in to comment.