Skip to content

Commit

Permalink
Create keyspaces & tables for redis
Browse files Browse the repository at this point in the history
  • Loading branch information
fastio committed Apr 25, 2019
1 parent 70a3368 commit b12cade
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 32 deletions.
2 changes: 1 addition & 1 deletion redis/abstract_command.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class redis_message;
}
namespace redis {

static inline decltype(auto) simple_objects() { return redis::SIMPLE_OBJECTS; }
static inline decltype(auto) simple_objects() { return redis::STRINGS; }
static inline decltype(auto) lists() { return redis::LISTS; }
static inline decltype(auto) sets() { return redis::SETS; }
static inline decltype(auto) maps() { return redis::MAPS; }
Expand Down
162 changes: 136 additions & 26 deletions redis/redis_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,151 @@
#include <memory>
#include "log.hh"
#include "db/query_context.hh"
#include "auth/service.hh"
#include "service/client_state.hh"
#include "transport/server.hh"
#include "db/system_keyspace.hh"
#include "schema.hh"
using namespace seastar;
namespace redis {

static logging::logger log("redis_keyspace");
schema_ptr strings_schema(sstring ks_name) {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(ks_name, redis::STRINGS), ks_name, redis::STRINGS,
// partition key
{{"pkey", utf8_type}},
// clustering key
{},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"save strings for redis"
)));
builder.set_gc_grace_seconds(0);
builder.with_version(db::system_keyspace::generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}

schema_ptr lists_schema(sstring ks_name) {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(ks_name, redis::LISTS), ks_name, redis::LISTS,
// partition key
{{"pkey", utf8_type}},
// clustering key
{{"ckey", bytes_type}},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"save lists for redis"
)));
builder.set_gc_grace_seconds(0);
builder.with_version(db::system_keyspace::generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}

schema_ptr maps_schema(sstring ks_name) {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(ks_name, redis::MAPS), ks_name, redis::MAPS,
// partition key
{{"pkey", utf8_type}},
// clustering key
{{"ckey", utf8_type}},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"save maps for redis"
)));
builder.set_gc_grace_seconds(0);
builder.with_version(db::system_keyspace::generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}

schema_ptr sets_schema(sstring ks_name) {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(ks_name, redis::SETS), ks_name, redis::SETS,
// partition key
{{"pkey", utf8_type}},
// clustering key
{{"ckey", utf8_type}},
// regular columns
{{"data", boolean_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"save sets for redis"
)));
builder.set_gc_grace_seconds(0);
builder.with_version(db::system_keyspace::generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}

schema_ptr zsets_schema(sstring ks_name) {
schema_builder builder(make_lw_shared(schema(generate_legacy_id(ks_name, redis::ZSETS), ks_name, redis::ZSETS,
// partition key
{{"pkey", utf8_type}},
// clustering key
{{"ckey", utf8_type}},
// regular columns
{{"data", utf8_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"save sorted sets for redis"
)));
builder.set_gc_grace_seconds(0);
builder.with_version(db::system_keyspace::generate_schema_version(builder.uuid()));
return builder.build(schema_builder::compact_storage::yes);
}
future<> redis_keyspace_helper::create_if_not_exists(lw_shared_ptr<db::config> config) {
// FIXME: read the properties from config.
auto keyspace_maker = [config] (sstring keyspace_name) {
auto create_keyspace = sprint("create keyspace if not exists %s with replication = {'class' : '%s', 'replication_factor' : %d }",
keyspace_name, "SimpleStrategy", 1);
return db::execute_cql(create_keyspace).then_wrapped([keyspace_name] (auto f) {
try {
f.get();
} catch(std::exception& e) {
throw e;
}
return when_all(
db::execute_cql(sprint("create table if not exists %s.%s (pkey text primary key, data text)", keyspace_name, redis::SIMPLE_OBJECTS)).discard_result(),
db::execute_cql(sprint("create table if not exists %s.%s (pkey text, ckey blob, data text, primary key(pkey, ckey))", keyspace_name, redis::LISTS)).discard_result(),
db::execute_cql(sprint("create table if not exists %s.%s (pkey text, ckey text, data boolean, primary key(pkey, ckey))", keyspace_name, redis::SETS)).discard_result(),
db::execute_cql(sprint("create table if not exists %s.%s (pkey text, ckey text, data text, primary key(pkey, ckey))", keyspace_name, redis::MAPS)).discard_result(),
db::execute_cql(sprint("create table if not exists %s.%s (pkey text, ckey text, data text, primary key(pkey, ckey))", keyspace_name, redis::ZSETS)).discard_result()
).then_wrapped([] (auto f) {
try {
f.get();
} catch(std::exception& e) {
throw e;
}
auto keyspace_gen = [config] (sstring name) {
auto& proxy = service::get_local_storage_proxy();
if (proxy.get_db().local().has_keyspace(name)) {
return make_ready_future<>();
}
auto attrs = make_shared<cql3::statements::ks_prop_defs>();
attrs->add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true");
std::map<sstring, sstring> replication_properties;
for (auto&& e : config->redis_keyspace_replication_properties()) {
replication_properties.emplace(e.first, e.second);
}
attrs->add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs->validate();
return service::get_local_migration_manager().announce_new_keyspace(attrs->as_ks_metadata(name), false);
};
auto table_gen = [] (sstring ks_name, sstring cf_name, schema_ptr schema) {
auto& proxy = service::get_local_storage_proxy();
if (proxy.get_db().local().has_schema(ks_name, cf_name)) {
return make_ready_future<>();
}
return service::get_local_migration_manager().announce_new_column_family(schema, false);
};
// create 16 default database for redis.
return parallel_for_each(boost::irange<unsigned>(0, 16), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) {
auto ks_name = sprint("redis_%d", c);
return keyspace_gen(ks_name).then([ks_name, table_gen] {
return when_all_succeed(
table_gen(ks_name, redis::STRINGS, strings_schema(ks_name)),
table_gen(ks_name, redis::LISTS, lists_schema(ks_name)),
table_gen(ks_name, redis::SETS, sets_schema(ks_name)),
table_gen(ks_name, redis::MAPS, maps_schema(ks_name)),
table_gen(ks_name, redis::ZSETS, zsets_schema(ks_name))
).then([] {
return make_ready_future<>();
});
});
};
return parallel_for_each(boost::irange<unsigned>(0, 16), [keyspace_maker = std::move(keyspace_maker)] (auto c) {
auto keyspace_name = sprint("redis_%d", c);
return keyspace_maker(keyspace_name);
});
}
}
10 changes: 8 additions & 2 deletions redis/redis_keyspace.hh
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
#include "db/config.hh"
#include "seastar/core/future.hh"
#include "seastar/core/shared_ptr.hh"

#include "seastar/core/sharded.hh"
using namespace seastar;
namespace auth {
class service;
}
namespace service {
class client_state;
}
namespace redis {

static constexpr auto REDIS_DATABASE_NAME_PREFIX = "redis_";
static constexpr auto DEFAULT_DATABASE_NAME = "redis_0";
static constexpr auto SIMPLE_OBJECTS = "simple_objects";
static constexpr auto STRINGS = "strings";
static constexpr auto LISTS = "lists";
static constexpr auto SETS = "sets";
static constexpr auto MAPS = "maps";
Expand Down
6 changes: 3 additions & 3 deletions tests/redis/kv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ SEASTAR_TEST_CASE(test_redis_set) {
assert_that(std::move(reply)).is_redis_reply()
.with_status(bytes("OK"));

auto msg = e.execute_cql(sprint("select * from redis.%s where pkey = \'a\'", redis::SIMPLE_OBJECTS)).get0();
auto msg = e.execute_cql(sprint("select * from redis.%s where pkey = \'a\'", redis::STRINGS)).get0();
assert_that(msg).is_rows()
.with_size(1)
.with_row({
Expand All @@ -32,7 +32,7 @@ SEASTAR_TEST_CASE(test_redis_set_3_args) {
assert_that(std::move(reply)).is_redis_reply()
.with_error(bytes("ERR syntax error"));

auto msg = e.execute_cql(sprint("select * from redis.%s where pkey = \'a1\'", redis::SIMPLE_OBJECTS)).get0();
auto msg = e.execute_cql(sprint("select * from redis.%s where pkey = \'a1\'", redis::STRINGS)).get0();
assert_that(msg).is_rows()
.is_empty();
return make_ready_future<>();
Expand All @@ -45,7 +45,7 @@ SEASTAR_TEST_CASE(test_redis_get) {
assert_that(std::move(reply)).is_redis_reply()
.with_status(bytes("OK"));

auto msg = e.execute_cql(sprint("select * from redis.%s where pkey = \'a\'", redis::SIMPLE_OBJECTS)).get0();
auto msg = e.execute_cql(sprint("select * from redis.%s where pkey = \'a\'", redis::STRINGS)).get0();
assert_that(msg).is_rows()
.with_size(1)
.with_row({
Expand Down

0 comments on commit b12cade

Please sign in to comment.