Skip to content

Commit

Permalink
Move redis keyspace from the system keyspace
Browse files Browse the repository at this point in the history
  • Loading branch information
fastio committed Feb 22, 2019
1 parent f458cbf commit 7a86341
Show file tree
Hide file tree
Showing 16 changed files with 531 additions and 233 deletions.
9 changes: 1 addition & 8 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri
};

static const std::unordered_set<sstring> system_keyspaces = {
db::system_keyspace::NAME, db::schema_tables::NAME, db::system_keyspace::redis::NAME
db::system_keyspace::NAME, db::schema_tables::NAME
};

bool is_system_keyspace(const sstring& name) {
Expand All @@ -183,7 +183,6 @@ bool is_system_keyspace(const sstring& name) {
static const std::unordered_set<sstring> internal_keyspaces = {
db::system_distributed_keyspace::NAME,
db::system_keyspace::NAME,
db::system_keyspace::redis::NAME,
db::schema_tables::NAME,
auth::meta::AUTH_KS,
tracing::trace_keyspace_helper::KEYSPACE_NAME
Expand Down Expand Up @@ -2603,12 +2602,6 @@ future<> distributed_loader::init_system_keyspace(distributed<database>& db) {
bool durable = cfg.data_file_directories().size() > 0;
db::system_keyspace::make(db, durable, cfg.volatile_system_keyspace_for_testing());
}).get();
// redis keyspace
db.invoke_on_all([] (database& db) {
auto& cfg = db.get_config();
bool durable = cfg.data_file_directories().size() > 0;
db::system_keyspace::redis::make(db, durable, cfg.volatile_system_keyspace_for_testing());
}).get();

const auto& cfg = db.local().get_config();
for (auto& data_dir : cfg.data_file_directories()) {
Expand Down
140 changes: 0 additions & 140 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,144 +116,6 @@ table_schema_version generate_schema_version(utils::UUID table_id) {
// problems (we need the type variables to be constructed first), and using
// functions will solve this problem. So we use functions right now.

namespace redis {
schema_ptr simple_objects() {
static thread_local auto simple_object = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(NAME, SIMPLE_OBJECTS), NAME, SIMPLE_OBJECTS,
// partition key
{{"key", utf8_type}},
// clustering key
{},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"redis simple object"
)));
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy_options({{ "enabled", "true" }});
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}();
return simple_object;
}

schema_ptr lists() {
static thread_local auto list_schema = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(NAME, LISTS), NAME, LISTS,
// partition key
{{"key", utf8_type}},
// clustering key
{},
// regular columns
{{"data", list_type_impl::get_instance(utf8_type, true)}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"redis list"
)));
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy_options({{ "enabled", "true" }});
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}();
return list_schema;
}

schema_ptr sets() {
static thread_local auto set_schema = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(NAME, SETS), NAME, SETS,
// partition key
{{"key", utf8_type}},
// clustering key
{},
// regular columns
{{"data", set_type_impl::get_instance(utf8_type, true)}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"redis set"
)));
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy_options({{ "enabled", "true" }});
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}();
return set_schema;
}

schema_ptr maps() {
static thread_local auto map_schema = [] {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(NAME, MAPS), NAME, MAPS,
// partition key
{{"key", utf8_type}},
// clustering key
{},
// regular columns
{{"data", map_type_impl::get_instance(utf8_type, utf8_type, true)}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"redis map"
)));
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy_options({{ "enabled", "true" }});
builder.with_version(generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}();
return map_schema;
}

sstring redis_keyspace_name() {
return NAME;
}

std::vector<schema_ptr> all_tables() {
return { simple_objects(), lists(), sets(), maps() };
}

void make(database& db, bool durable, bool volatile_testing_only) {
for (auto&& table : redis::all_tables()) {
auto ks_name = table->ks_name();
if (!db.has_keyspace(ks_name)) {
auto ksm = make_lw_shared<keyspace_metadata>(ks_name,
"org.apache.cassandra.locator.SimpleStrategy",
std::map<sstring, sstring>{},
durable
);
auto kscfg = db.make_keyspace_config(*ksm);
kscfg.enable_disk_reads = !volatile_testing_only;
kscfg.enable_disk_writes = !volatile_testing_only;
kscfg.enable_commitlog = !volatile_testing_only;
kscfg.enable_cache = true;
// don't make system keyspace reads wait for user reads
kscfg.read_concurrency_semaphore = &db.user_read_concurrency_sem();
// don't make system keyspace writes wait for user writes (if under pressure)
kscfg.dirty_memory_manager = &db._dirty_memory_manager;
keyspace _ks{ksm, std::move(kscfg)};
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "SimpleStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
_ks.set_replication_strategy(std::move(rs));
db.add_keyspace(ks_name, std::move(_ks));
}
auto& ks = db.find_keyspace(ks_name);
auto cfg = ks.make_column_family_config(*table, db.get_config(), db.get_large_partition_handler());
cfg.dirty_memory_manager = &db._dirty_memory_manager;
// cfg.memtable_scheduling_group = default_scheduling_group();
// cfg.memtable_to_cache_scheduling_group = default_scheduling_group();
db.add_column_family(ks, table, std::move(cfg));
}
}

} // end redis namespace

schema_ptr hints() {
static thread_local auto hints = [] {
Expand Down Expand Up @@ -1253,8 +1115,6 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
}).then([] {
// #2514 - make sure "system" is written to system_schema.keyspaces.
return db::schema_tables::save_system_schema(NAME);
}).then([] {
return db::schema_tables::save_system_schema(redis::NAME);
}).then([] {
return netw::get_messaging_service().invoke_on_all([] (auto& ms){
return ms.init_local_preferred_ip_cache();
Expand Down
19 changes: 0 additions & 19 deletions db/system_keyspace.hh
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,6 @@ static constexpr auto SSTABLE_ACTIVITY = "sstable_activity";
static constexpr auto SIZE_ESTIMATES = "size_estimates";
static constexpr auto LARGE_PARTITIONS = "large_partitions";

namespace redis {
static constexpr auto NAME = "redis";
static constexpr auto SIMPLE_OBJECTS = "simple_objects";
static constexpr auto LISTS = "lists";
static constexpr auto SETS = "sets";
static constexpr auto MAPS = "maps";
}

namespace v3 {
static constexpr auto BATCHES = "batches";
static constexpr auto PAXOS = "paxos";
Expand Down Expand Up @@ -147,16 +139,6 @@ extern schema_ptr batchlog();
extern schema_ptr paxos();
extern schema_ptr built_indexes(); // TODO (from Cassandra): make private

namespace redis {
extern schema_ptr simple_objects();
extern schema_ptr lists();
extern schema_ptr sets();
extern schema_ptr maps();

std::vector<schema_ptr> all_tables();
void make(database& db, bool durable, bool volatile_testing_only = false);
}

namespace legacy {

schema_ptr keyspaces();
Expand Down Expand Up @@ -194,7 +176,6 @@ future<> update_hints_dropped(gms::inet_address ep, utils::UUID time_period, int
std::vector<schema_ptr> all_tables();
void make(database& db, bool durable, bool volatile_testing_only = false);


future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
query_mutations(distributed<service::storage_proxy>& proxy, const sstring& cf_name);

Expand Down
6 changes: 2 additions & 4 deletions redis/commands/get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ shared_ptr<abstract_command> get::prepare(service::storage_proxy& proxy, request

future<reply> get::execute(service::storage_proxy& proxy, db::consistency_level cl, db::timeout_clock::time_point now, const timeout_config& tc, service::client_state& cs)
{
auto& db = proxy.get_db().local();
auto schema = db.find_schema(db::system_keyspace::redis::NAME, db::system_keyspace::redis::SIMPLE_OBJECTS);
auto timeout = now + tc.read_timeout;
auto fetched = prefetch_partition_helper::prefetch_simple(proxy, schema, _key, cl, timeout, cs);
return fetched.then([this, &proxy, cl, timeout, &cs, schema] (auto pd) {
auto fetched = prefetch_partition_helper::prefetch_simple(proxy, _schema, _key, cl, timeout, cs);
return fetched.then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->fetched()) {
return reply_builder::build<message_tag>(std::move(pd->_data));
}
Expand Down
38 changes: 38 additions & 0 deletions redis/commands/lindex.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "redis/commands/lindex.hh"
#include "redis/commands/unexpected.hh"
#include "redis/reply_builder.hh"
#include "redis/request.hh"
#include "redis/reply.hh"
#include "timeout_config.hh"
#include "service/client_state.hh"
#include "service/storage_proxy.hh"
#include "db/system_keyspace.hh"
#include "partition_slice_builder.hh"
#include "gc_clock.hh"
#include "dht/i_partitioner.hh"
#include "log.hh"
namespace redis {
namespace commands {

shared_ptr<abstract_command> lindex::prepare(service::storage_proxy& proxy, request&& req)
{
if (req._args_count < 2) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
}
return make_shared<lindex>(std::move(req._command), lists_schema(proxy), std::move(req._args[0]), bytes2long(req._args[1]));
}

future<reply> lindex::execute(service::storage_proxy& proxy, db::consistency_level cl, db::timeout_clock::time_point now, const timeout_config& tc, service::client_state& cs)
{
auto timeout = now + tc.read_timeout;
auto fetched = prefetch_partition_helper::prefetch_list(proxy, _schema, _key, cl, timeout, cs, _index);
return fetched.then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->fetched()) {
auto& e = pd->cells().front();
return reply_builder::build<message_tag>(e._value);
}
return reply_builder::build<null_message_tag>();
});
}
}
}
33 changes: 33 additions & 0 deletions redis/commands/lindex.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once
#include "redis/command_with_single_schema.hh"
#include "redis/request.hh"

namespace service {
class storage_proxy;
}
class timeout_config;
namespace redis {
namespace commands {
class lindex : public command_with_single_schema {
protected:
bytes _key;
long _index;
public:
lindex(bytes&& name, const schema_ptr schema, bytes&& key, long index)
: command_with_single_schema(std::move(name), schema)
, _key(std::move(key))
, _index(index)
{
}
~lindex() {}
static shared_ptr<abstract_command> prepare(service::storage_proxy& proxy, request&& req);
virtual future<reply> execute(service::storage_proxy&,
db::consistency_level,
db::timeout_clock::time_point,
const timeout_config& tc,
service::client_state& cs
) override;
};

}
}
37 changes: 37 additions & 0 deletions redis/commands/llen.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "redis/commands/llen.hh"
#include "redis/commands/unexpected.hh"
#include "redis/reply_builder.hh"
#include "redis/request.hh"
#include "redis/reply.hh"
#include "timeout_config.hh"
#include "service/client_state.hh"
#include "service/storage_proxy.hh"
#include "db/system_keyspace.hh"
#include "partition_slice_builder.hh"
#include "gc_clock.hh"
#include "dht/i_partitioner.hh"
#include "log.hh"
namespace redis {
namespace commands {

shared_ptr<abstract_command> llen::prepare(service::storage_proxy& proxy, request&& req)
{
if (req._args_count < 1) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
}
return make_shared<llen>(std::move(req._command), lists_schema(proxy), std::move(req._args[0]));
}

future<reply> llen::execute(service::storage_proxy& proxy, db::consistency_level cl, db::timeout_clock::time_point now, const timeout_config& tc, service::client_state& cs)
{
auto timeout = now + tc.read_timeout;
auto fetched = prefetch_partition_helper::prefetch_list(proxy, _schema, _key, cl, timeout, cs, only_size_tag {});
return fetched.then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->fetched()) {
return reply_builder::build<number_tag>(pd->only_size());
}
return reply_builder::build<error_message_tag>();
});
}
}
}
31 changes: 31 additions & 0 deletions redis/commands/llen.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include "redis/command_with_single_schema.hh"
#include "redis/request.hh"

namespace service {
class storage_proxy;
}
class timeout_config;
namespace redis {
namespace commands {
class llen : public command_with_single_schema {
protected:
bytes _key;
public:
llen(bytes&& name, const schema_ptr schema, bytes&& key)
: command_with_single_schema(std::move(name), schema)
, _key(std::move(key))
{
}
~llen() {}
static shared_ptr<abstract_command> prepare(service::storage_proxy& proxy, request&& req);
virtual future<reply> execute(service::storage_proxy&,
db::consistency_level,
db::timeout_clock::time_point,
const timeout_config& tc,
service::client_state& cs
) override;
};

}
}
Loading

0 comments on commit 7a86341

Please sign in to comment.