/* * Pedis is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * You may obtain a copy of the License at * * http://www.gnu.org/licenses * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * * Copyright (c) 2016-2026, Peng Jian, pstack@163.com. All rights reserved. * */ #pragma once #include "core/app-template.hh" #include "core/future-util.hh" #include "core/timer-set.hh" #include "core/shared_ptr.hh" #include "core/stream.hh" #include "core/memory.hh" #include "core/units.hh" #include "core/distributed.hh" #include "core/vector-data-sink.hh" #include "core/bitops.hh" #include "core/slab.hh" #include "core/align.hh" #include "net/api.hh" #include "net/packet-data-source.hh" #include "cache.hh" #include "structures/dict_lsa.hh" #include "structures/sset_lsa.hh" #include "structures/geo.hh" #include "keys.hh" namespace redis { using scattered_message_ptr = foreign_ptr<lw_shared_ptr<scattered_message<char>>>; static const bytes msg_crlf {"\r\n"}; static const bytes msg_ok {"+OK\r\n"}; static const bytes msg_pong {"+PONG\r\n"}; static const bytes msg_err = {"-ERR\r\n"}; static const bytes msg_zero = {":0\r\n"}; static const bytes msg_one = {":1\r\n"}; static const bytes msg_neg_one = {":-1\r\n"}; static const bytes msg_neg_two = {":-2\r\n"}; static const bytes msg_null_blik = {"$-1\r\n"}; static const bytes msg_null_multi_bulk = {"*-1\r\n"}; static const bytes msg_empty_multi_bulk = {"*0\r\n"}; static const bytes msg_type_err = {"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"}; static const bytes msg_nokey_err = {"-ERR no such key\r\n"}; static const bytes msg_nocmd_err = {"-ERR no such command\r\n"}; static const bytes msg_syntax_err = {"-ERR syntax error\r\n"}; static const bytes msg_same_object_err = {"-ERR source and destination objects are the same\r\n"}; static const bytes msg_out_of_range_err = {"-ERR index out of range\r\n"}; static const bytes msg_not_integer_err = {"-ERR ERR hash value is not an integer\r\n" }; static const bytes msg_not_float_err = {"-ERR ERR hash value is not an float\r\n" }; static const bytes msg_str_tag = {"+"}; static const bytes msg_num_tag = {":"}; static const bytes msg_sigle_tag = {"*"}; static const bytes msg_batch_tag = {"$"}; static const bytes msg_not_found = {"+(nil)\r\n"}; static const bytes msg_nil = {"+(nil)\r\n"}; static constexpr const int REDIS_OK = 0; static constexpr const int REDIS_ERR = 1; static constexpr const int REDIS_NONE = -1; static constexpr const int REDIS_WRONG_TYPE = -2; static const bytes msg_type_string {"+string\r\n"}; static const bytes msg_type_none {"+none\r\n"}; static const bytes msg_type_list {"+list\r\n"}; static const bytes msg_type_hll {"+hyperloglog\r\n"}; static const bytes msg_type_set {"+set\r\n"}; static const bytes msg_type_zset {"+zset\r\n"}; static const bytes msg_type_hash {"+hash\r\n"}; class reply_builder final { public: static future<scattered_message_ptr> build(size_t size) { auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_num_tag); m->append(to_sstring(size)); m->append_static(msg_crlf); return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } static future<scattered_message_ptr> build(double number) { auto m = make_lw_shared<scattered_message<char>>(); auto&& n = to_sstring(number); m->append_static(msg_batch_tag); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(std::move(n)); m->append_static(msg_crlf); return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } static future<scattered_message_ptr> build(const bytes& message) { auto m = make_lw_shared<scattered_message<char>>(); m->append(message); return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } template<bool Key, bool Value> static future<scattered_message_ptr> build(const cache_entry* e) { if (e) { //build reply auto m = make_lw_shared<scattered_message<char>>(); if (Key) { m->append(msg_batch_tag); m->append(to_sstring(e->key_size())); m->append_static(msg_crlf); m->append(sstring{e->key_data(), e->key_size()}); m->append_static(msg_crlf); } if (Value) { m->append_static(msg_batch_tag); if (e->type_of_integer()) { auto&& n = to_sstring(e->value_integer()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } else if (e->type_of_float()) { auto&& n = to_sstring(e->value_float()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } else if (e->type_of_bytes()) { m->append(to_sstring(e->value_bytes_size())); m->append_static(msg_crlf); m->append(sstring{e->value_bytes_data(), e->value_bytes_size()}); m->append_static(msg_crlf); } else { m->append_static(msg_type_err); } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } } else { return reply_builder::build(msg_not_found); } } template<bool Key, bool Value> static future<scattered_message_ptr> build(const std::vector<const dict_entry*>& entries) { if (!entries.empty()) { //build reply auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_sigle_tag); if (Key && Value) { m->append(std::move(to_sstring(entries.size() * 2))); } else { m->append(std::move(to_sstring(entries.size()))); } m->append_static(msg_crlf); for (size_t i = 0; i < entries.size(); ++i) { const auto e = entries[i]; if (Key) { if (e) { m->append_static(msg_batch_tag); m->append(to_sstring(e->key_size())); m->append_static(msg_crlf); m->append(sstring{e->key_data(), e->key_size()}); m->append_static(msg_crlf); } else { m->append_static(msg_not_found); } } if (Value) { if (e) { m->append_static(msg_batch_tag); if (e->type_of_integer()) { auto&& n = to_sstring(e->value_integer()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } else if (e->type_of_float()) { auto&& n = to_sstring(e->value_float()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } else if (e->type_of_bytes()) { m->append(to_sstring(e->value_bytes_size())); m->append_static(msg_crlf); m->append(sstring{e->value_bytes_data(), e->value_bytes_size()}); m->append_static(msg_crlf); } else { m->append_static(msg_type_err); } } else { m->append_static(msg_not_found); } } } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } else { return reply_builder::build(msg_nil); } } template<bool Key, bool Value> static future<scattered_message_ptr> build(const dict_entry* e) { if (e) { //build reply auto m = make_lw_shared<scattered_message<char>>(); if (Key) { m->append(msg_batch_tag); m->append(to_sstring(e->key_size())); m->append_static(msg_crlf); m->append(sstring{e->key_data(), e->key_size()}); m->append_static(msg_crlf); } if (Value) { if (e->type_of_integer()) { auto&& n = to_sstring(e->value_integer()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } else if (e->type_of_float()) { auto&& n = to_sstring(e->value_float()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } else if (e->type_of_bytes()) { m->append_static(msg_batch_tag); m->append(to_sstring(e->value_bytes_size())); m->append_static(msg_crlf); m->append(sstring{e->value_bytes_data(), e->value_bytes_size()}); m->append_static(msg_crlf); } else { m->append_static(msg_type_err); } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } else { return reply_builder::build(msg_nil); } } static future<scattered_message_ptr> build(const std::vector<const managed_bytes*>& data) { auto m = make_lw_shared<scattered_message<char>>(); m->append(msg_sigle_tag); m->append(to_sstring(data.size())); m->append_static(msg_crlf); for (size_t i = 0; i < data.size(); ++i) { m->append_static(msg_batch_tag); m->append(to_sstring(data[i]->size())); m->append_static(msg_crlf); m->append(sstring{reinterpret_cast<const char*>(data[i]->data()), data[i]->size()}); m->append_static(msg_crlf); } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } static future<scattered_message_ptr> build(const managed_bytes& data) { auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_batch_tag); m->append(to_sstring(data.size())); m->append_static(msg_crlf); m->append(sstring{reinterpret_cast<const char*>(data.data()), data.size()}); m->append_static(msg_crlf); return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } static future<scattered_message_ptr> build(const std::vector<const sset_entry*>& entries, bool with_score) { if (!entries.empty()) { //build reply auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_sigle_tag); if (with_score) { m->append(std::move(to_sstring(entries.size() * 2))); } else { m->append(std::move(to_sstring(entries.size()))); } m->append_static(msg_crlf); for (size_t i = 0; i < entries.size(); ++i) { const auto& e = entries[i]; assert(e != nullptr); m->append_static(msg_batch_tag); m->append(to_sstring(e->key_size())); m->append_static(msg_crlf); m->append(sstring{e->key_data(), e->key_size()}); m->append_static(msg_crlf); if (with_score) { m->append_static(msg_batch_tag); auto&& n = to_sstring(e->score()); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } else { return reply_builder::build(msg_nil); } } static future<scattered_message_ptr> build(std::vector<bytes>& data) { auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_sigle_tag); m->append(std::move(to_sstring(data.size()))); m->append_static(msg_crlf); for (size_t i = 0; i < data.size(); ++i) { auto& uu = data[i]; m->append_static(msg_batch_tag); m->append(to_sstring(uu.size())); m->append_static(msg_crlf); m->append(std::move(uu)); m->append_static(msg_crlf); } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } static future<scattered_message_ptr> build(std::vector<foreign_ptr<lw_shared_ptr<bytes>>>& entries) { if (!entries.empty()) { auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_sigle_tag); m->append(std::move(to_sstring(entries.size()))); m->append_static(msg_crlf); for (size_t i = 0; i < entries.size(); ++i) { auto& e = entries[i]; m->append_static(msg_batch_tag); m->append(to_sstring(e->size())); m->append_static(msg_crlf); m->append(*e); m->append_static(msg_crlf); } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } return reply_builder::build(msg_nil); } static future<scattered_message_ptr> build(std::unordered_map<sstring, double>& data, bool with_score) { auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_sigle_tag); if (with_score) { m->append(std::move(to_sstring(data.size() * 2))); } else { m->append(std::move(to_sstring(data.size()))); } m->append_static(msg_crlf); for (auto& d : data) { m->append_static(msg_batch_tag); m->append(to_sstring(d.first.size())); m->append_static(msg_crlf); m->append(std::move(d.first)); m->append_static(msg_crlf); if (with_score) { m->append_static(msg_batch_tag); auto&& n = to_sstring(d.second); m->append(to_sstring(n.size())); m->append_static(msg_crlf); m->append(n); m->append_static(msg_crlf); } } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } static future<scattered_message_ptr> build(std::vector<std::tuple<bytes, double, double, double, double>>& u, int flags) { auto m = make_lw_shared<scattered_message<char>>(); m->append_static(msg_sigle_tag); m->append(std::move(to_sstring(u.size()))); m->append_static(msg_crlf); int temp = 1, temp2 = 2; bool wd = flags & GEORADIUS_WITHDIST; bool wh = flags & GEORADIUS_WITHHASH; bool wc = flags & GEORADIUS_WITHCOORD; if (wd) temp++; if (wh) temp++; if (wc) temp++; for (size_t i = 0; i < u.size(); ++i) { m->append_static(msg_sigle_tag); m->append(std::move(to_sstring(temp))); m->append_static(msg_crlf); //key bytes& key = std::get<0>(u[i]); m->append_static(msg_batch_tag); m->append(to_sstring(key.size())); m->append_static(msg_crlf); m->append(std::move(key)); m->append_static(msg_crlf); //dist if (wd) { double dist = std::get<2>(u[i]); geo::from_meters(dist, flags); auto&& n2 = to_sstring(dist); m->append_static(msg_batch_tag); m->append(to_sstring(n2.size())); m->append_static(msg_crlf); m->append(std::move(n2)); m->append_static(msg_crlf); } //coord if (wc) { m->append_static(msg_sigle_tag); m->append(std::move(to_sstring(temp2))); m->append_static(msg_crlf); auto&& n1 = to_sstring(std::get<3>(u[i])); m->append_static(msg_batch_tag); m->append(to_sstring(n1.size())); m->append_static(msg_crlf); m->append(std::move(n1)); m->append_static(msg_crlf); auto&& n2 = to_sstring(std::get<4>(u[i])); m->append_static(msg_batch_tag); m->append(to_sstring(n2.size())); m->append_static(msg_crlf); m->append(std::move(n2)); m->append_static(msg_crlf); } //hash if (wh) { double& score = std::get<1>(u[i]); bytes hashstr; geo::encode_to_geohash_string(score, hashstr); m->append_static(msg_batch_tag); m->append(to_sstring(hashstr.size())); m->append_static(msg_crlf); m->append(std::move(hashstr)); m->append_static(msg_crlf); } } return make_ready_future<scattered_message_ptr>(foreign_ptr<lw_shared_ptr<scattered_message<char>>>(m)); } }; // end of class }