Skip to content

Commit

Permalink
keep the query result alive util the data was send to clients
Browse files Browse the repository at this point in the history
  • Loading branch information
fastio committed Jun 16, 2019
1 parent 26b1694 commit 801f9ab
Show file tree
Hide file tree
Showing 46 changed files with 178 additions and 215 deletions.
3 changes: 3 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ def endswith(self, end):
'cql3/restrictions/statement_restrictions.cc',
'cql3/result_set.cc',
'cql3/variable_specifications.cc',
'redis/reply.cc',
'redis/query_processor.cc',
'redis/redis_keyspace.cc',
'redis/protocol_parser.cc',
Expand All @@ -577,6 +578,7 @@ def endswith(self, end):
'redis/abstract_command.cc',
'redis/prefetcher.cc',
'redis/redis_mutation.cc',
'redis/native_protocol_parser.cc',
'redis/commands/set.cc',
'redis/commands/get.cc',
'redis/commands/append.cc',
Expand All @@ -597,6 +599,7 @@ def endswith(self, end):
'redis/commands/hget.cc',
'redis/commands/hdel.cc',
'redis/commands/hexists.cc',
'redis/commands/hincrby.cc',
'redis/commands/sset.cc',
'redis/commands/spop.cc',
'redis/commands/srandmember.cc',
Expand Down
13 changes: 9 additions & 4 deletions redis/command_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "redis/commands/hset.hh"
#include "redis/commands/hget.hh"
#include "redis/commands/hdel.hh"
#include "redis/commands/hincrby.hh"
#include "redis/commands/hexists.hh"
#include "redis/commands/sset.hh"
#include "redis/commands/smembers.hh"
Expand All @@ -38,17 +39,19 @@
#include "redis/commands/spop.hh"
#include "redis/commands/srandmember.hh"
#include "redis/commands/scard.hh"
#include "log.hh"
namespace redis {
static logging::logger logging("command_factory");
shared_ptr<abstract_command> command_factory::create(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
static thread_local std::unordered_map<bytes, std::function<shared_ptr<abstract_command> (service::storage_proxy& proxy, const service::client_state& cs, request&& req)>> _commands = {
{ "set", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::set::prepare(proxy, cs, std::move(req)); } },
{ "setnx", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::setnx::prepare(proxy, cs, std::move(req)); } },
// { "setnx", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::setnx::prepare(proxy, cs, std::move(req)); } },
{ "setex", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::setex::prepare(proxy, cs, std::move(req)); } },
{ "mset", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::mset::prepare(proxy, cs, std::move(req)); } },
{ "msetnx", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::msetnx::prepare(proxy, cs, std::move(req)); } },
// { "msetnx", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::msetnx::prepare(proxy, cs, std::move(req)); } },
{ "get", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::get::prepare(proxy, cs, std::move(req)); } },
{ "getset", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::getset::prepare(proxy, cs, std::move(req)); } },
// { "getset", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::getset::prepare(proxy, cs, std::move(req)); } },
{ "mget", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::mget::prepare(proxy, cs, std::move(req)); } },
{ "del", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::del::prepare(proxy, cs, std::move(req)); } },
{ "exists", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::exists::prepare(proxy, cs, std::move(req)); } },
Expand Down Expand Up @@ -77,6 +80,7 @@ shared_ptr<abstract_command> command_factory::create(service::storage_proxy& pro
{ "hget", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hget::prepare(proxy, cs, std::move(req), false); } },
{ "hmget", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hget::prepare(proxy, cs, std::move(req), true); } },
{ "hdel", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hdel::prepare(proxy, cs, std::move(req)); } },
{ "hincrby", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hincrby::prepare(proxy, cs, std::move(req)); } },
{ "hexists", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hexists::prepare(proxy, cs, std::move(req)); } },
{ "hkeys", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hkeys::prepare(proxy, cs, std::move(req)); } },
{ "hvals", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::hvals::prepare(proxy, cs, std::move(req)); } },
Expand All @@ -101,12 +105,13 @@ shared_ptr<abstract_command> command_factory::create(service::storage_proxy& pro
{ "zrem", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::zrem::prepare(proxy, cs, std::move(req)); } },
{ "zremrangebyrank", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::zremrangebyrank::prepare(proxy, cs, std::move(req)); } },
{ "zremrangebyscore", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::zremrangebyscore::prepare(proxy, cs, std::move(req)); } },
{ "cluster", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::cluster_slots::prepare(proxy, cs, std::move(req)); } },
//{ "cluster", [] (service::storage_proxy& proxy, const service::client_state& cs, request&& req) { return commands::cluster_slots::prepare(proxy, cs, std::move(req)); } },
};
auto&& command = _commands.find(req._command);
if (command != _commands.end()) {
return (command->second)(proxy, cs, std::move(req));
}
logging.error("unkown command = {}", req._command);
return commands::unexpected::prepare(std::move(req._command));
}

Expand Down
2 changes: 1 addition & 1 deletion redis/commands/append.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace commands {
shared_ptr<abstract_command> append::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count < 2) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
return unexpected::make_wrong_arguments_exception(std::move(req._command), 2, req._args_count);
}
return make_shared<append>(std::move(req._command), simple_objects_schema(proxy, cs.get_keyspace()), std::move(req._args[0]), std::move(req._args[1]));
}
Expand Down
7 changes: 4 additions & 3 deletions redis/commands/cluster_slots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ future<redis_message> cluster_slots::execute(service::storage_proxy& proxy, db::
return gms::inet_address::lookup(proxy.get_db().local().get_config().listen_address()).then([peers = std::move(peers), &proxy] (auto&& local) mutable {
peers.emplace_back(local);
auto slots_per_peer = REDIS_CLUSTER_SLOTS / peers.size();
std::vector<std::tuple<size_t, size_t, bytes, uint16_t>> ret;
using slots_type = std::vector<std::tuple<size_t, size_t, bytes, uint16_t>>;
lw_shared_ptr<slots_type> ret = make_lw_shared<slots_type>();
auto port = proxy.get_db().local().get_config().redis_transport_port();
size_t start = 0;
for (auto& peer : peers) {
auto end = start + slots_per_peer - 1;
if (end >= (REDIS_CLUSTER_SLOTS - slots_per_peer)) {
end = REDIS_CLUSTER_SLOTS - 1;
}
ret.emplace_back(std::tuple<size_t, size_t, bytes, uint16_t> (start, end, to_bytes(peer.to_sstring()), uint16_t { port }));
ret->emplace_back(std::tuple<size_t, size_t, bytes, uint16_t> (start, end, to_bytes(peer.to_sstring()), uint16_t { port }));
start = end + 1;
}
return redis_message::make(std::move(ret));
return redis_message::make_slots(ret);
});
});
}
Expand Down
6 changes: 3 additions & 3 deletions redis/commands/counter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace commands {
shared_ptr<abstract_command> counter_by_prepare_impl(service::storage_proxy& proxy, const service::client_state& cs, request&& req, bool incr)
{
if (req._args_count < 2) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
return unexpected::make_wrong_arguments_exception(std::move(req._command), 2, req._args_count);
}
return make_shared<counter>(std::move(req._command), simple_objects_schema(proxy, cs.get_keyspace()), std::move(req._args[0]), std::move(req._args[1]), incr);
}
Expand Down Expand Up @@ -49,7 +49,7 @@ future<redis_message> counter::execute(service::storage_proxy& proxy, db::consis
long result = 0;
if (pd && pd->has_data()) {
if (is_number(pd->_data) == false) {
return redis_message::make(bytes("-ERR value is not an integer or out of range\r\n"));
return redis_message::make_exception(sstring("-ERR value is not an integer or out of range\r\n"));
}
result = bytes2long(pd->_data);
}
Expand All @@ -62,7 +62,7 @@ future<redis_message> counter::execute(service::storage_proxy& proxy, db::consis
} catch(...) {
return redis_message::err();
}
return redis_message::make(result);
return redis_message::make_long(result);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion redis/commands/del.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace commands {
shared_ptr<abstract_command> del::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count < 1) {
return unexpected::prepare(std::move(req._command), std::move(bytes {msg_syntax_err}));
return unexpected::make_wrong_arguments_exception(std::move(req._command), 1, req._args_count);
}
std::vector<schema_ptr> schemas {
simple_objects_schema(proxy, cs.get_keyspace()),
Expand Down
2 changes: 1 addition & 1 deletion redis/commands/exists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace commands {
shared_ptr<abstract_command> exists::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count < 1) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
return unexpected::make_wrong_arguments_exception(std::move(req._command), 1, req._args_count);
}
std::vector<schema_ptr> schemas {
simple_objects_schema(proxy, cs.get_keyspace()),
Expand Down
16 changes: 7 additions & 9 deletions redis/commands/get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace commands {
shared_ptr<abstract_command> get::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count != 1) {
return unexpected::prepare(std::move(req._command), std::move(to_bytes(sprint("-wrong number of arguments (given %ld, expected 1)\r\n", req._args_count))));
return unexpected::make_wrong_arguments_exception(std::move(req._command), 1, req._args_count);
}
return make_shared<get>(std::move(req._command), simple_objects_schema(proxy, cs.get_keyspace()), std::move(req._args[0]));
}
Expand All @@ -33,12 +33,12 @@ future<redis_message> get::execute(service::storage_proxy& proxy, db::consistenc
auto fetched = prefetch_simple(proxy, _schema, _key, cl, timeout, cs);
return fetched.then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->has_data()) {
return redis_message::make(std::move(pd->_data));
return redis_message::make_bytes(std::move(pd));
}
return redis_message::null();
});
}

/*
shared_ptr<abstract_command> getset::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count < 2) {
Expand All @@ -58,17 +58,18 @@ future<redis_message> getset::execute(service::storage_proxy& proxy, db::consist
return redis_message::err();
}
if (pd && pd->has_data()) {
return redis_message::make(std::move(pd->_data));
return redis_message::make_bytes(std::move(pd));
}
return redis_message::null();
});
});
}
*/
shared_ptr<abstract_command> mget::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count < 1) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
return unexpected::make_wrong_arguments_exception(std::move(req._command), 1, req._args_count);
}
return seastar::make_shared<mget>(std::move(req._command), simple_objects_schema(proxy, cs.get_keyspace()), std::move(req._args));
}
Expand All @@ -78,13 +79,10 @@ future<redis_message> mget::execute(service::storage_proxy& proxy, db::consisten
auto timeout = now + tc.read_timeout;
return prefetch_simple(proxy, _schema, _keys, cl, timeout, cs).then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->has_data()) {
//FIXME: if key was not exists, we should return nil message to client.
auto&& values = boost::copy_range<std::vector<std::optional<bytes>>>(pd->data() | boost::adaptors::transformed([] (auto& p) { return std::optional<bytes>(std::move(p.second)); }));
return redis_message::make(std::move(values));
return redis_message::make_mbytes(pd);
}
return redis_message::null();
});
}

}
}
3 changes: 2 additions & 1 deletion redis/commands/get.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public:
~get() {}
future<redis_message> execute(service::storage_proxy&, db::consistency_level, db::timeout_clock::time_point, const timeout_config& tc, service::client_state& cs) override;
};

/*
class getset : public get {
bytes _data;
public:
Expand All @@ -36,6 +36,7 @@ public:
future<redis_message> execute(service::storage_proxy&, db::consistency_level, db::timeout_clock::time_point, const timeout_config& tc, service::client_state& cs) override;
};
*/
class mget : public command_with_single_schema {
protected:
std::vector<bytes> _keys;
Expand Down
2 changes: 1 addition & 1 deletion redis/commands/hdel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace commands {
shared_ptr<abstract_command> hdel::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count != 2) {
return unexpected::prepare(std::move(req._command), std::move(bytes {msg_syntax_err}));
return unexpected::make_wrong_arguments_exception(std::move(req._command), 2, req._args_count);
}
std::vector<bytes> map_keys;
map_keys.reserve(req._args.size() - 1);
Expand Down
2 changes: 1 addition & 1 deletion redis/commands/hexists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace commands {
shared_ptr<abstract_command> hexists::prepare(service::storage_proxy& proxy, const service::client_state& cs, request&& req)
{
if (req._args_count != 2) {
return unexpected::prepare(std::move(req._command), std::move(bytes {msg_syntax_err}));
return unexpected::make_wrong_arguments_exception(std::move(req._command), 2, req._args_count);
}
return seastar::make_shared<hexists> (std::move(req._command), maps_schema(proxy, cs.get_keyspace()), std::move(req._args[0]), std::move(req._args[1]));
}
Expand Down
27 changes: 6 additions & 21 deletions redis/commands/hget.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ shared_ptr<abstract_command> hget::prepare(service::storage_proxy& proxy, const
template<typename Type>
shared_ptr<abstract_command> prepare_impl(service::storage_proxy& proxy, const service::client_state& cs, request&& req) {
if (req._args_count != 1) {
return unexpected::prepare(std::move(req._command), std::move(bytes { msg_syntax_err }) );
return unexpected::make_wrong_arguments_exception(std::move(req._command), 1, req._args_count);
}
return seastar::make_shared<Type>(std::move(req._command), maps_schema(proxy, cs.get_keyspace()), std::move(req._args[0]));
}
Expand All @@ -51,10 +51,7 @@ future<redis_message> hget::execute(service::storage_proxy& proxy, db::consisten
auto timeout = now + tc.read_timeout;
return prefetch_map(proxy, _schema, _key, _map_keys, fetch_options::values, cl, timeout, cs).then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->has_data()) {
auto&& vals = boost::copy_range<std::vector<std::optional<bytes>>> (pd->data() | boost::adaptors::transformed([this] (auto& data) {
return std::move(data.first);
}));
return redis_message::make(std::move(vals));
return redis_message::make_map_key_bytes(pd);
}
return redis_message::null();
});
Expand All @@ -64,26 +61,14 @@ future<redis_message> hall::execute(service::storage_proxy& proxy, db::consisten
auto timeout = now + tc.read_timeout;
return prefetch_map(proxy, _schema, _key, _option, cl, timeout, cs).then([this, &proxy, cl, timeout, &cs] (auto pd) {
if (pd && pd->has_data()) {
std::vector<std::optional<bytes>> result;
//std::vector<std::optional<bytes>> result;
if (_option == redis::fetch_options::keys) {
auto&& keys = boost::copy_range<std::vector<std::optional<bytes>>> (pd->data() | boost::adaptors::transformed([this] (auto& data) {
return std::move(data.first);
}));
result = std::move(keys);
return redis_message::make_map_key_bytes(pd);
} else if (_option == redis::fetch_options::values) {
// values was saved in the first of the pair.
auto&& vals = boost::copy_range<std::vector<std::optional<bytes>>> (pd->data() | boost::adaptors::transformed([this] (auto& data) {
return std::move(data.first);
}));
result = std::move(vals);
return redis_message::make_map_key_bytes(pd);
} else {
result.reserve(pd->data().size() * 2);
for (auto&& data : pd->data()) {
result.emplace_back(data.first);
result.emplace_back(data.second);
}
return redis_message::make_map_bytes(pd);
}
return redis_message::make(std::move(result));
}
return redis_message::null();
});
Expand Down
2 changes: 1 addition & 1 deletion redis/commands/hset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ future<redis_message> hset::execute(service::storage_proxy& proxy, db::consisten
} catch (std::exception& e) {
return redis_message::err();
}
return _multi == false ? redis_message::make(total) : redis_message::ok();
return _multi == false ? redis_message::make_long(static_cast<long>(total)) : redis_message::ok();
});
}

Expand Down
Loading

0 comments on commit 801f9ab

Please sign in to comment.