Skip to content

Commit

Permalink
merge scylla-3.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
fastio committed Apr 23, 2019
1 parent 35bdb67 commit a680b4b
Show file tree
Hide file tree
Showing 31 changed files with 878 additions and 893 deletions.
6 changes: 5 additions & 1 deletion auth/password_authenticator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,11 @@ future<authenticated_user> password_authenticator::authenticate(
}).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
try {
auto res = f.get0();
if (res->empty() || !passwords::check(password, res->one().get_as<sstring>(SALTED_HASH))) {
auto salted_hash = std::experimental::optional<sstring>();
if (!res->empty()) {
salted_hash = res->one().get_opt<sstring>(SALTED_HASH);
}
if (!salted_hash || !passwords::check(password, *salted_hash)) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
return make_ready_future<authenticated_user>(username);
Expand Down
85 changes: 37 additions & 48 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def endswith(self, end):
'tests/perf/perf_sstable',
'tests/cql_query_test',
'tests/secondary_index_test',
'tests/json_cql_query_test',
'tests/filtering_test',
'tests/storage_proxy_test',
'tests/schema_change_test',
Expand Down Expand Up @@ -365,21 +366,17 @@ def endswith(self, end):
'tests/perf/perf_idl',
]


redis_tests = [
'tests/redis/kv_test',
]
apps = [
'pedis',
'scylla',
]

tests = scylla_tests + perf_tests + redis_tests
tests = scylla_tests + perf_tests

other = [
'iotune',
]

all_artifacts = apps + tests + other
all_artifacts = apps + other

arg_parser = argparse.ArgumentParser('Configure scylla')
arg_parser.add_argument('--static', dest='static', action='store_const', default='',
Expand All @@ -397,7 +394,7 @@ def endswith(self, end):
help='Extra flags for the linker')
arg_parser.add_argument('--target', action='store', dest='target', default=default_target_arch(),
help='Target architecture (-march)')
arg_parser.add_argument('--compiler', action='store', dest='cxx', default='g++',
arg_parser.add_argument('--compiler', action='store', dest='cxx', default='/opt/scylladb/bin/g++-7.3',
help='C++ compiler path')
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='gcc',
help='C compiler path')
Expand Down Expand Up @@ -542,7 +539,37 @@ def endswith(self, end):
'thrift/handler.cc',
'thrift/server.cc',
'thrift/thrift_validation.cc',
'redis/query_processor.cc',
'utils/runtime.cc',
'utils/murmur_hash.cc',
'utils/uuid.cc',
'utils/big_decimal.cc',
'types.cc',
'validation.cc',
'service/priority_manager.cc',
'service/migration_manager.cc',
'service/storage_proxy.cc',
'cql3/operator.cc',
'cql3/relation.cc',
'cql3/column_identifier.cc',
'cql3/column_specification.cc',
'cql3/constants.cc',
'cql3/query_processor.cc',
'cql3/query_options.cc',
'cql3/single_column_relation.cc',
'cql3/token_relation.cc',
'cql3/column_condition.cc',
'cql3/user_types.cc',
'cql3/untyped_result_set.cc',
'cql3/selection/abstract_function_selector.cc',
'cql3/selection/simple_selector.cc',
'cql3/selection/selectable.cc',
'cql3/selection/selector_factories.cc',
'cql3/selection/selection.cc',
'cql3/selection/selector.cc',
'cql3/restrictions/statement_restrictions.cc',
'cql3/result_set.cc',
'cql3/variable_specifications.cc',
'redis/query_processor.cc',
'redis/redis_keyspace.cc',
'redis/protocol_parser.cc',
'redis/ragel_protocol_parser.rl',
Expand Down Expand Up @@ -582,36 +609,6 @@ def endswith(self, end):
'redis/commands/zrangebyscore.cc',
'redis/commands/zrank.cc',
'redis/commands/zrem.cc',
'utils/runtime.cc',
'utils/murmur_hash.cc',
'utils/uuid.cc',
'utils/big_decimal.cc',
'types.cc',
'validation.cc',
'service/priority_manager.cc',
'service/migration_manager.cc',
'service/storage_proxy.cc',
'cql3/operator.cc',
'cql3/relation.cc',
'cql3/column_identifier.cc',
'cql3/column_specification.cc',
'cql3/constants.cc',
'cql3/query_processor.cc',
'cql3/query_options.cc',
'cql3/single_column_relation.cc',
'cql3/token_relation.cc',
'cql3/column_condition.cc',
'cql3/user_types.cc',
'cql3/untyped_result_set.cc',
'cql3/selection/abstract_function_selector.cc',
'cql3/selection/simple_selector.cc',
'cql3/selection/selectable.cc',
'cql3/selection/selector_factories.cc',
'cql3/selection/selection.cc',
'cql3/selection/selector.cc',
'cql3/restrictions/statement_restrictions.cc',
'cql3/result_set.cc',
'cql3/variable_specifications.cc',
'db/consistency_level.cc',
'db/system_keyspace.cc',
'db/system_distributed_keyspace.cc',
Expand Down Expand Up @@ -814,7 +811,7 @@ def endswith(self, end):
]

deps = {
'pedis': idls + ['main.cc', 'release.cc'] + scylla_core + api,
'scylla': idls + ['main.cc', 'release.cc'] + scylla_core + api,
}

pure_boost_tests = set([
Expand Down Expand Up @@ -889,14 +886,6 @@ def endswith(self, end):
for t in perf_tests:
deps[t] = [t + '.cc'] + scylla_tests_dependencies + perf_tests_seastar_deps

for t in redis_tests:
deps[t] = [t + '.cc']
if t not in tests_not_using_seastar_test_framework:
deps[t] += scylla_tests_dependencies
deps[t] += scylla_tests_seastar_deps
else:
deps[t] += scylla_core + idls + ['tests/cql_test_env.cc']

deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc', 'tests/sstable_utils.cc', 'tests/normalizing_reader.cc']
deps['tests/mutation_reader_test'] += ['tests/sstable_utils.cc']

Expand Down
7 changes: 6 additions & 1 deletion cql3/Cql.g
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ insertStatement returns [::shared_ptr<raw::modification_statement> expr]
std::vector<::shared_ptr<cql3::column_identifier::raw>> column_names;
std::vector<::shared_ptr<cql3::term::raw>> values;
bool if_not_exists = false;
bool default_unset = false;
::shared_ptr<cql3::term::raw> json_value;
}
: K_INSERT K_INTO cf=columnFamilyName
Expand All @@ -487,13 +488,15 @@ insertStatement returns [::shared_ptr<raw::modification_statement> expr]
}
| K_JSON
json_token=jsonValue { json_value = $json_token.value; }
( K_DEFAULT K_UNSET { default_unset = true; } | K_DEFAULT K_NULL )?
( K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
( usingClause[attrs] )?
{
$expr = ::make_shared<raw::insert_json_statement>(std::move(cf),
std::move(attrs),
std::move(json_value),
if_not_exists);
if_not_exists,
default_unset);
}
)
;
Expand Down Expand Up @@ -1835,6 +1838,8 @@ K_OR: O R;
K_REPLACE: R E P L A C E;
K_DETERMINISTIC: D E T E R M I N I S T I C;
K_JSON: J S O N;
K_DEFAULT: D E F A U L T;
K_UNSET: U N S E T;

K_EMPTY: E M P T Y;

Expand Down
3 changes: 2 additions & 1 deletion cql3/statements/raw/insert_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private:
::shared_ptr<attributes::raw> _attrs;
::shared_ptr<term::raw> _json_value;
bool _if_not_exists;
bool _default_unset;
public:
/**
* A parsed <code>INSERT JSON</code> statement.
Expand All @@ -95,7 +96,7 @@ public:
* @param json_value JSON string representing names and values
* @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
insert_json_statement(::shared_ptr<cf_name> name, ::shared_ptr<attributes::raw> attrs, ::shared_ptr<term::raw> json_value, bool if_not_exists);
insert_json_statement(::shared_ptr<cf_name> name, ::shared_ptr<attributes::raw> attrs, ::shared_ptr<term::raw> json_value, bool if_not_exists, bool default_unset);

virtual ::shared_ptr<cql3::statements::modification_statement> prepare_internal(database& db, schema_ptr schema,
::shared_ptr<variable_specifications> bound_names, std::unique_ptr<attributes> attrs, cql_stats& stats) override;
Expand Down
21 changes: 15 additions & 6 deletions cql3/statements/update_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ parse(const sstring& json_string, const std::vector<column_definition>& expected
for (const auto& def : expected_receivers) {
sstring cql_name = def.name_as_text();
auto value_it = prepared_map.find(cql_name);
if (value_it == prepared_map.end() || value_it->second.isNull()) {
if (value_it == prepared_map.end()) {
continue;
} else if (value_it->second.isNull()) {
json_map.emplace(std::move(cql_name), bytes_opt{});
prepared_map.erase(value_it);
} else {
json_map.emplace(std::move(cql_name), def.type->from_json_object(value_it->second, sf));
prepared_map.erase(value_it);
Expand Down Expand Up @@ -255,8 +258,12 @@ void insert_prepared_json_statement::execute_operations_for_key(mutation& m, con
throw exceptions::invalid_request_exception(sprint("Cannot set the value of counter column %s in JSON", def.name_as_text()));
}

auto value = json_cache->at(def.name_as_text());
execute_set_value(m, prefix, params, def, value);
auto it = json_cache->find(def.name_as_text());
if (it != json_cache->end()) {
execute_set_value(m, prefix, params, def, it->second);
} else if (!_default_unset) {
execute_set_value(m, prefix, params, def, bytes_opt{});
}
}
}

Expand Down Expand Up @@ -322,12 +329,14 @@ insert_statement::prepare_internal(database& db, schema_ptr schema,
insert_json_statement::insert_json_statement( ::shared_ptr<cf_name> name,
::shared_ptr<attributes::raw> attrs,
::shared_ptr<term::raw> json_value,
bool if_not_exists)
bool if_not_exists,
bool default_unset)
: raw::modification_statement{name, attrs, conditions_vector{}, if_not_exists, false}
, _name(name)
, _attrs(attrs)
, _json_value(json_value)
, _if_not_exists(if_not_exists) { }
, _if_not_exists(if_not_exists)
, _default_unset(default_unset) { }

::shared_ptr<cql3::statements::modification_statement>
insert_json_statement::prepare_internal(database& db, schema_ptr schema,
Expand All @@ -337,7 +346,7 @@ insert_json_statement::prepare_internal(database& db, schema_ptr schema,
auto json_column_placeholder = ::make_shared<column_identifier>("", true);
auto prepared_json_value = _json_value->prepare(db, "", ::make_shared<column_specification>("", "", json_column_placeholder, utf8_type));
prepared_json_value->collect_marker_specification(bound_names);
return ::make_shared<cql3::statements::insert_prepared_json_statement>(bound_names->size(), schema, std::move(attrs), &stats.inserts, std::move(prepared_json_value));
return ::make_shared<cql3::statements::insert_prepared_json_statement>(bound_names->size(), schema, std::move(attrs), &stats.inserts, std::move(prepared_json_value), _default_unset);
}

update_statement::update_statement( ::shared_ptr<cf_name> name,
Expand Down
5 changes: 3 additions & 2 deletions cql3/statements/update_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ private:
*/
class insert_prepared_json_statement : public update_statement {
::shared_ptr<term> _term;
bool _default_unset;
public:
insert_prepared_json_statement(uint32_t bound_terms, schema_ptr s, std::unique_ptr<attributes> attrs, uint64_t* cql_stats_counter_ptr, ::shared_ptr<term> t)
: update_statement(statement_type::INSERT, bound_terms, s, std::move(attrs), cql_stats_counter_ptr), _term(t) {
insert_prepared_json_statement(uint32_t bound_terms, schema_ptr s, std::unique_ptr<attributes> attrs, uint64_t* cql_stats_counter_ptr, ::shared_ptr<term> t, bool default_unset)
: update_statement(statement_type::INSERT, bound_terms, s, std::move(attrs), cql_stats_counter_ptr), _term(t), _default_unset(default_unset) {
_restrictions = ::make_shared<restrictions::statement_restrictions>(s, false);
}
private:
Expand Down
4 changes: 0 additions & 4 deletions database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ class rp_handle;

namespace system_keyspace {
void make(database& db, bool durable, bool volatile_testing_only);
namespace redis {
void make(database& db, bool durable, bool volatile_testing_only);
}
}
}

Expand Down Expand Up @@ -1282,7 +1279,6 @@ private:
void add_keyspace(sstring name, keyspace k);
void create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only);
friend void db::system_keyspace::redis::make(database& db, bool durable, bool volatile_testing_only);
void setup_metrics();

friend class db_apply_executor;
Expand Down
2 changes: 2 additions & 0 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,8 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
// but all previous write/flush pairs.
return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable { ///////////////////////////////////////////////////
auto view = fragmented_temporary_buffer::view(buf);
view.remove_suffix(buf.size_bytes() - size);
assert(size == view.size_bytes());
return do_with(off, view, [&] (uint64_t& off, fragmented_temporary_buffer::view& view) {
if (view.empty()) {
return make_ready_future<>();
Expand Down
26 changes: 24 additions & 2 deletions db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,16 @@ class view_builder::consumer {
built_views _built_views;
std::vector<view_ptr> _views_to_build;
std::deque<mutation_fragment> _fragments;

// The compact_for_query<> that feeds this consumer is already configured
// to feed us up to view_builder::batchsize (128) rows and not an entire
// partition. Still, if rows contain large blobs, saving 128 of them in
// _fragments may be too much. So we want to track _fragment's memory
// usage, and flush the _fragments if it has grown too large.
// Additionally, limiting _fragment's size also solves issue #4213:
// A single view mutation can be as large as the size of the base rows
// used to build it, and we cannot allow its serialized size to grow
// beyond our limit on mutation size (by default 32 MB).
size_t _fragments_memory_usage = 0;
public:
consumer(view_builder& builder, build_step& step)
: _builder(builder)
Expand Down Expand Up @@ -1527,15 +1536,23 @@ class view_builder::consumer {
return stop_iteration::yes;
}

_fragments_memory_usage += cr.memory_usage(*_step.base->schema());
_fragments.push_back(std::move(cr));
if (_fragments_memory_usage > 1024*1024) {
// Although we have not yet completed the batch of base rows that
// compact_for_query<> planned for us (view_builder::batchsize),
// we've still collected enough rows to reach sizeable memory use,
// so let's flush these rows now.
flush_fragments();
}
return stop_iteration::no;
}

stop_iteration consume(range_tombstone&&) {
return stop_iteration::no;
}

stop_iteration consume_end_of_partition() {
void flush_fragments() {
_builder._as.check();
if (!_fragments.empty()) {
_fragments.push_front(partition_start(_step.current_key, tombstone()));
Expand All @@ -1544,7 +1561,12 @@ class view_builder::consumer {
_step.current_token(),
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
_fragments.clear();
_fragments_memory_usage = 0;
}
}

stop_iteration consume_end_of_partition() {
flush_fragments();
return stop_iteration(_step.build_status.empty());
}

Expand Down
20 changes: 11 additions & 9 deletions dht/boot_strapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,22 @@ future<> boot_strapper::bootstrap() {

auto streamer = make_lw_shared<range_streamer>(_db, _token_metadata, _tokens, _address, "Bootstrap", streaming::stream_reason::bootstrap);
streamer->add_source_filter(std::make_unique<range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector()));
for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) {
auto keyspaces = make_lw_shared<std::vector<sstring>>(_db.local().get_non_system_keyspaces());
return do_for_each(*keyspaces, [this, keyspaces, streamer] (sstring& keyspace_name) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
dht::token_range_vector ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
blogger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
streamer->add_ranges(keyspace_name, ranges);
}

return streamer->stream_async().then([streamer] () {
service::get_local_storage_service().finish_bootstrapping();
}).handle_exception([streamer] (std::exception_ptr eptr) {
blogger.warn("Error during bootstrap: {}", eptr);
return make_exception_future<>(std::move(eptr));
return streamer->add_ranges(keyspace_name, ranges);
}).then([this, streamer] {
return streamer->stream_async().then([streamer] () {
service::get_local_storage_service().finish_bootstrapping();
}).handle_exception([streamer] (std::exception_ptr eptr) {
blogger.warn("Error during bootstrap: {}", eptr);
return make_exception_future<>(std::move(eptr));
});
});

}

std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata metadata, database& db) {
Expand Down
Loading

0 comments on commit a680b4b

Please sign in to comment.