-
Notifications
You must be signed in to change notification settings - Fork 170
/
Copy pathmutation_partition_view.cc
326 lines (295 loc) · 13.6 KB
/
mutation_partition_view.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/simple-stream.hh>
#include "mutation_partition_view.hh"
#include "schema.hh"
#include "atomic_cell.hh"
#include "utils/data_input.hh"
#include "mutation_partition_serializer.hh"
#include "mutation_partition.hh"
#include "counters.hh"
#include "frozen_mutation.hh"
#include "partition_builder.hh"
#include "converting_mutation_partition_applier.hh"
#include "utils/UUID.hh"
#include "serializer.hh"
#include "idl/uuid.dist.hh"
#include "idl/keys.dist.hh"
#include "idl/mutation.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/uuid.dist.impl.hh"
#include "idl/keys.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
using namespace db;
namespace {
using atomic_cell_variant = boost::variant<ser::live_cell_view,
ser::expiring_cell_view,
ser::dead_cell_view,
ser::counter_cell_view,
ser::unknown_variant_type>;
atomic_cell read_atomic_cell(const abstract_type& type, atomic_cell_variant cv, atomic_cell::collection_member cm = atomic_cell::collection_member::no)
{
class atomic_cell_visitor : public boost::static_visitor<atomic_cell> {
const abstract_type& _type;
atomic_cell::collection_member _collection_member;
public:
explicit atomic_cell_visitor(const abstract_type& t, atomic_cell::collection_member cm)
: _type(t), _collection_member(cm) { }
atomic_cell operator()(ser::live_cell_view& lcv) const {
return atomic_cell::make_live(_type, lcv.created_at(), lcv.value().view(), _collection_member);
}
atomic_cell operator()(ser::expiring_cell_view& ecv) const {
return atomic_cell::make_live(_type, ecv.c().created_at(), ecv.c().value().view(), ecv.expiry(), ecv.ttl(), _collection_member);
}
atomic_cell operator()(ser::dead_cell_view& dcv) const {
return atomic_cell::make_dead(dcv.tomb().timestamp(), dcv.tomb().deletion_time());
}
atomic_cell operator()(ser::counter_cell_view& ccv) const {
class counter_cell_visitor : public boost::static_visitor<atomic_cell> {
api::timestamp_type _created_at;
public:
explicit counter_cell_visitor(api::timestamp_type ts)
: _created_at(ts) { }
atomic_cell operator()(ser::counter_cell_full_view& ccv) const {
// TODO: a lot of copying for something called view
counter_cell_builder ccb; // we know the final number of shards
for (auto csv : ccv.shards()) {
ccb.add_maybe_unsorted_shard(counter_shard(csv));
}
ccb.sort_and_remove_duplicates();
return ccb.build(_created_at);
}
atomic_cell operator()(ser::counter_cell_update_view& ccv) const {
return atomic_cell::make_live_counter_update(_created_at, ccv.delta());
}
atomic_cell operator()(ser::unknown_variant_type&) const {
throw std::runtime_error("Trying to deserialize counter cell in unknown state");
}
};
auto v = ccv.value();
return boost::apply_visitor(counter_cell_visitor(ccv.created_at()), v);
}
atomic_cell operator()(ser::unknown_variant_type&) const {
throw std::runtime_error("Trying to deserialize cell in unknown state");
}
};
return boost::apply_visitor(atomic_cell_visitor(type, cm), cv);
}
collection_mutation read_collection_cell(const collection_type_impl& ctype, ser::collection_cell_view cv)
{
collection_type_impl::mutation mut;
mut.tomb = cv.tomb();
auto&& elements = cv.elements();
mut.cells.reserve(elements.size());
for (auto&& e : elements) {
mut.cells.emplace_back(e.key(), read_atomic_cell(*ctype.value_comparator(), e.value(), atomic_cell::collection_member::yes));
}
return ctype.serialize_mutation_form(mut);
}
template<typename Visitor>
void read_and_visit_row(ser::row_view rv, const column_mapping& cm, column_kind kind, Visitor&& visitor)
{
for (auto&& cv : rv.columns()) {
auto id = cv.id();
auto& col = cm.column_at(kind, id);
class atomic_cell_or_collection_visitor : public boost::static_visitor<> {
Visitor& _visitor;
column_id _id;
const column_mapping_entry& _col;
public:
explicit atomic_cell_or_collection_visitor(Visitor& v, column_id id, const column_mapping_entry& col)
: _visitor(v), _id(id), _col(col) { }
void operator()(atomic_cell_variant& acv) const {
if (!_col.is_atomic()) {
throw std::runtime_error("A collection expected, got an atomic cell");
}
_visitor.accept_atomic_cell(_id, read_atomic_cell(*_col.type(), acv));
}
void operator()(ser::collection_cell_view& ccv) const {
if (_col.is_atomic()) {
throw std::runtime_error("An atomic cell expected, got a collection");
}
// FIXME: Pass view to cell to avoid copy
auto&& outer = current_allocator();
with_allocator(standard_allocator(), [&] {
auto cell = read_collection_cell(*static_pointer_cast<const collection_type_impl>(_col.type()), ccv);
with_allocator(outer, [&] {
_visitor.accept_collection(_id, cell);
});
});
}
void operator()(ser::unknown_variant_type&) const {
throw std::runtime_error("Trying to deserialize unknown cell type");
}
};
auto&& cell = cv.c();
boost::apply_visitor(atomic_cell_or_collection_visitor(visitor, id, col), cell);
}
}
row_marker read_row_marker(boost::variant<ser::live_marker_view, ser::expiring_marker_view, ser::dead_marker_view, ser::no_marker_view, ser::unknown_variant_type> rmv)
{
struct row_marker_visitor : boost::static_visitor<row_marker> {
row_marker operator()(ser::live_marker_view& lmv) const {
return row_marker(lmv.created_at());
}
row_marker operator()(ser::expiring_marker_view& emv) const {
return row_marker(emv.lm().created_at(), emv.ttl(), emv.expiry());
}
row_marker operator()(ser::dead_marker_view& dmv) const {
return row_marker(dmv.tomb());
}
row_marker operator()(ser::no_marker_view&) const {
return row_marker();
}
row_marker operator()(ser::unknown_variant_type&) const {
throw std::runtime_error("Trying to deserialize unknown row marker type");
}
};
return boost::apply_visitor(row_marker_visitor(), rmv);
}
}
template<typename Visitor>
GCC6_CONCEPT(requires MutationViewVisitor<Visitor>)
void mutation_partition_view::do_accept(const column_mapping& cm, Visitor& visitor) const {
auto in = _in;
auto mpv = ser::deserialize(in, boost::type<ser::mutation_partition_view>());
visitor.accept_partition_tombstone(mpv.tomb());
struct static_row_cell_visitor {
Visitor& _visitor;
void accept_atomic_cell(column_id id, atomic_cell ac) const {
_visitor.accept_static_cell(id, std::move(ac));
}
void accept_collection(column_id id, const collection_mutation& cm) const {
_visitor.accept_static_cell(id, cm);
}
};
read_and_visit_row(mpv.static_row(), cm, column_kind::static_column, static_row_cell_visitor{visitor});
for (auto&& rt : mpv.range_tombstones()) {
visitor.accept_row_tombstone(rt);
}
for (auto&& cr : mpv.rows()) {
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
visitor.accept_row(position_in_partition_view::for_key(cr.key()), t, read_row_marker(cr.marker()), is_dummy::no, is_continuous::yes);
struct cell_visitor {
Visitor& _visitor;
void accept_atomic_cell(column_id id, atomic_cell ac) const {
_visitor.accept_row_cell(id, std::move(ac));
}
void accept_collection(column_id id, const collection_mutation& cm) const {
_visitor.accept_row_cell(id, cm);
}
};
read_and_visit_row(cr.cells(), cm, column_kind::regular_column, cell_visitor{visitor});
}
}
void mutation_partition_view::accept(const schema& s, partition_builder& visitor) const
{
do_accept(s.get_column_mapping(), visitor);
}
void mutation_partition_view::accept(const column_mapping& cm, converting_mutation_partition_applier& visitor) const
{
do_accept(cm, visitor);
}
std::optional<clustering_key> mutation_partition_view::first_row_key() const
{
auto in = _in;
auto mpv = ser::deserialize(in, boost::type<ser::mutation_partition_view>());
auto rows = mpv.rows();
if (rows.empty()) {
return { };
}
return rows.front().key();
}
std::optional<clustering_key> mutation_partition_view::last_row_key() const
{
auto in = _in;
auto mpv = ser::deserialize(in, boost::type<ser::mutation_partition_view>());
auto rows = mpv.rows();
if (rows.empty()) {
return { };
}
return rows.back().key();
}
mutation_partition_view mutation_partition_view::from_view(ser::mutation_partition_view v)
{
return { v.v };
}
mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s)
{
auto in = ser::as_input_stream(_bytes);
auto view = ser::deserialize(in, boost::type<ser::mutation_fragment_view>());
return seastar::visit(view.fragment(),
[&] (ser::clustering_row_view crv) {
class clustering_row_builder {
const schema& _s;
mutation_fragment _mf;
public:
clustering_row_builder(const schema& s, clustering_key key, row_tombstone t, row_marker m)
: _s(s), _mf(mutation_fragment::clustering_row_tag_t(), std::move(key), std::move(t), std::move(m), row()) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_mf.as_mutable_clustering_row().cells().append_cell(id, atomic_cell_or_collection(std::move(ac)));
}
void accept_collection(column_id id, const collection_mutation& cm) {
auto& ctype = *static_pointer_cast<const collection_type_impl>(_s.regular_column_at(id).type);
_mf.as_mutable_clustering_row().cells().append_cell(id, atomic_cell_or_collection(collection_mutation(ctype, cm)));
}
mutation_fragment get_mutation_fragment() && { return std::move(_mf); }
};
auto cr = crv.row();
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
clustering_row_builder builder(s, cr.key(), std::move(t), read_row_marker(cr.marker()));
read_and_visit_row(cr.cells(), s.get_column_mapping(), column_kind::regular_column, builder);
return std::move(builder).get_mutation_fragment();
},
[&] (ser::static_row_view sr) {
class static_row_builder {
const schema& _s;
mutation_fragment _mf;
public:
explicit static_row_builder(const schema& s) : _s(s), _mf(static_row()) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_mf.as_mutable_static_row().cells().append_cell(id, atomic_cell_or_collection(std::move(ac)));
}
void accept_collection(column_id id, const collection_mutation& cm) {
auto& ctype = *static_pointer_cast<const collection_type_impl>(_s.static_column_at(id).type);
_mf.as_mutable_static_row().cells().append_cell(id, atomic_cell_or_collection(collection_mutation(ctype, cm)));
}
mutation_fragment get_mutation_fragment() && { return std::move(_mf); }
};
static_row_builder builder(s);
read_and_visit_row(sr.cells(), s.get_column_mapping(), column_kind::static_column, builder);
return std::move(builder).get_mutation_fragment();
},
[&] (ser::range_tombstone_view rt) {
return mutation_fragment(range_tombstone(rt));
},
[&] (ser::partition_start_view ps) {
auto dkey = dht::global_partitioner().decorate_key(s, ps.key());
return mutation_fragment(partition_start(std::move(dkey), ps.partition_tombstone()));
},
[] (partition_end) {
return mutation_fragment(partition_end());
},
[] (ser::unknown_variant_type) -> mutation_fragment {
throw std::runtime_error("Trying to deserialize unknown mutation fragment type");
}
);
}