-
Notifications
You must be signed in to change notification settings - Fork 0
/
Patronus.h
2133 lines (1969 loc) · 74.4 KB
/
Patronus.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#pragma once
#ifndef PATRONUS_H_
#define PATRONUS_H_
#include <mutex>
#include <set>
#include <unordered_set>
#include "Common.h"
#include "DSM.h"
#include "Result.h"
#include "patronus/Batch.h"
#include "patronus/Config.h"
#include "patronus/Coro.h"
#include "patronus/DDLManager.h"
#include "patronus/Lease.h"
#include "patronus/LeaseContext.h"
#include "patronus/LockManager.h"
#include "patronus/ProtectionRegion.h"
#include "patronus/TimeSyncer.h"
#include "patronus/Type.h"
#include "patronus/memory/allocator.h"
#include "patronus/memory/slab_allocator.h"
#include "util/Debug.h"
#include "util/PerformanceReporter.h"
#include "util/RetCode.h"
#include "util/Tracer.h"
#include "util/Util.h"
using namespace util::literals;
namespace patronus
{
using TraceView = util::TraceView;
class Patronus;
class ServerCoroBatchExecutionContext;
struct PatronusConfig
{
constexpr static size_t kDefaultDSMSize = ::config::kDefaultDSMSize;
size_t machine_nr{0};
size_t lease_buffer_size{(kDefaultDSMSize - 1_GB) / 2};
size_t alloc_buffer_size{(kDefaultDSMSize - 1_GB) / 2};
size_t reserved_buffer_size{1_GB};
// for sync time
size_t time_parent_node_id{0};
// for default allocator
// see also @alloc_buffer_size
std::vector<size_t> block_class{2_MB};
std::vector<double> block_ratio{1};
mem::SlabAllocatorConfig client_rdma_buffer{
{config::patronus::kClientRdmaBufferSize, 8_B}, {0.95, 0.05}};
// in simplified mode, time syncer is diabled to avoid generating RDMA
// access.
bool simplify{false};
size_t total_buffer_size() const
{
return lease_buffer_size + alloc_buffer_size + reserved_buffer_size;
}
};
inline std::ostream &operator<<(std::ostream &os, const PatronusConfig &conf)
{
os << "{PatronusConfig machine_nr: " << conf.machine_nr
<< ", lease_buffer_size: " << conf.lease_buffer_size
<< ", alloc_buffer_size: " << conf.alloc_buffer_size
<< ", reserved_buffer_size: " << conf.reserved_buffer_size
<< ". (Total: " << conf.total_buffer_size() << ")"
<< "}";
return os;
}
struct TraceContext
{
uint64_t thread_name_id;
uint32_t coro_id;
RetrieveTimer timer;
};
struct RpcContext
{
Lease *ret_lease{nullptr};
BaseMessage *request{nullptr};
std::atomic<bool> ready{false};
size_t dir_id{0};
RetCode ret_code{RC::kOk};
char *buffer_addr{nullptr}; // for rpc_{read|write|cas}
};
struct RWContext
{
ibv_wc_status wc_status{IBV_WC_SUCCESS};
std::atomic<bool> ready{false};
coro_t coro_id;
size_t target_node;
size_t dir_id;
util::TraceView trace_view{util::nulltrace};
};
struct PatronusThreadResourceDesc
{
ThreadResourceDesc dsm_desc;
std::unique_ptr<ThreadUnsafeBufferPool<config::patronus::kMessageSize>>
rdma_message_buffer_pool;
std::unique_ptr<ThreadUnsafeBufferPool<config::patronus::kSmallMessageSize>>
rdma_small_message_buffer_pool;
char *client_rdma_buffer;
size_t client_rdma_buffer_size;
std::unique_ptr<mem::SlabAllocator> rdma_client_buffer;
};
class pre_patronus_explain;
class Patronus
{
public:
constexpr static size_t V = ::config::verbose::kPatronus;
constexpr static size_t SV = ::config::verbose::kSystem;
using pointer = std::shared_ptr<Patronus>;
constexpr static size_t kMaxCoroNr = ::config::patronus::kMaxCoroNr;
constexpr static size_t kMessageSize = ::config::patronus::kMessageSize;
constexpr static size_t kSmallMessageSize =
::config::patronus::kSmallMessageSize;
// TODO(patronus): try to tune this parameter up.
constexpr static size_t kMwPoolSizePerThread = 50_K;
constexpr static uint64_t kDefaultHint = 0;
constexpr static size_t kMaxSyncKey = 128;
using msg_desc_t = UnreliableConnection<kMaxAppThread>::msg_desc_t;
static pointer ins(const PatronusConfig &conf)
{
return std::make_shared<Patronus>(conf);
}
Patronus &operator=(const Patronus &) = delete;
Patronus(const Patronus &) = delete;
Patronus(const PatronusConfig &conf);
~Patronus();
/**
* @brief Get the rlease object
*
* @param node_id the node_id
* @param dir_id the dir_id.
* @param bind_gaddr
* - w/ allocation sementics: should be nullgaddr
* - w/o allocation semantics: should be one valid global address
* @param alloc_hint
* - w/ allocation semantics: the hint to select allocators
* - w/o allocation semantics: should be 0.
* @param size the length of the object / address
* @param ns the length of term requesting for protection
* @param flag @see AcquireRequestFlag
* @param ctx sync call if ctx is nullptr. Otherwise coroutine context.
* @return Read Lease
*/
[[nodiscard]] inline Lease get_rlease(uint16_t node_id,
uint16_t dir_id,
GlobalAddress bind_gaddr,
uint64_t alloc_hint,
size_t size,
std::chrono::nanoseconds ns,
flag_t flag /* AcquireRequestFlag */,
CoroContext *ctx);
/**
* @brief Alloc a piece of remote memory, without any permission binded
*
* @param node_id the target node id
* @param dir_id the directory id
* @param size the requested size
* @param hint the hint to the allocator
* @param ctx
* @return @see GlobalAddress
*/
[[nodiscard]] inline GlobalAddress alloc(uint16_t node_id,
uint16_t dir_id,
size_t size,
uint64_t hint,
CoroContext *ctx);
/**
* @brief @see get_rlease
*/
[[nodiscard]] inline Lease get_wlease(uint16_t node_id,
uint16_t dir_id,
GlobalAddress bind_gaddr,
uint64_t alloc_hint,
size_t size,
std::chrono::nanoseconds ns,
flag_t flag /* AcquireRequestFlag */,
CoroContext *ctx = nullptr);
[[nodiscard]] inline Lease upgrade(Lease &lease,
flag_t flag /*LeaseModifyFlag */,
CoroContext *ctx = nullptr);
/**
* @brief extend the lifecycle of lease for another more ns.
*/
[[nodiscard]] inline RetCode extend(Lease &lease,
std::chrono::nanoseconds ns,
flag_t flag /* LeaseModifyFlag */,
CoroContext *ctx = nullptr,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode rpc_extend(Lease &lease,
std::chrono::nanoseconds ns,
flag_t flag /* LeaseModifyFlag */,
CoroContext *ctx = nullptr);
/**
* when allocation is ON, hint is sent to the server
*/
inline void relinquish(Lease &lease,
uint64_t hint,
flag_t flag /* LeaseModifyFlag */,
CoroContext *ctx = nullptr);
/**
* @brief Deallocate to a piece of remote memory, without any permission
* modification
*
* @param gaddr the address got from p->get_gaddr(lease)
* @param dir_id the directory id
* @param size the size of that piece of memory
* @param hint a hint to the allocator
* @param ctx
*/
inline void dealloc(GlobalAddress gaddr,
uint16_t dir_id,
size_t size,
uint64_t hint,
CoroContext *ctx = nullptr);
[[nodiscard]] inline RetCode read(Lease &lease,
char *obuf,
size_t size,
size_t offset,
flag_t flag /* RWFlag */,
CoroContext *ctx,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode rpc_read(Lease &lease,
char *obuf,
size_t size,
size_t offset,
flag_t flag,
CoroContext *ctx,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode write(Lease &lease,
const char *ibuf,
size_t size,
size_t offset,
flag_t flag /* RWFlag */,
CoroContext *ctx,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode rpc_write(Lease &lease,
const char *ibuf,
size_t size,
size_t offset,
flag_t flag,
CoroContext *ctx,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode cas(Lease &lease,
char *iobuf,
size_t offset,
uint64_t compare,
uint64_t swap,
flag_t flag /* RWFlag */,
CoroContext *ctx,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode faa(Lease &lease,
char *iobuf,
size_t offset,
int64_t value,
flag_t flag /* RWFlag */,
CoroContext *ctx,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode rpc_cas(Lease &lease,
char *iobuf,
size_t offset,
uint64_t compare,
uint64_t swap,
CoroContext *ctx,
TraceView = util::nulltrace);
// below for batch API
[[nodiscard]] RetCode prepare_write(PatronusBatchContext &batch,
Lease &lease,
const char *ibuf,
size_t size,
size_t offset,
flag_t flag,
CoroContext *ctx = nullptr,
util::TraceView = util::nulltrace);
[[nodiscard]] RetCode prepare_read(PatronusBatchContext &batch,
Lease &lease,
char *obuf,
size_t size,
size_t offset,
flag_t flag,
CoroContext *ctx = nullptr,
TraceView = util::nulltrace);
[[nodiscard]] RetCode prepare_cas(PatronusBatchContext &batch,
Lease &lease,
char *iobuf,
size_t offset,
uint64_t compare,
uint64_t swap,
flag_t flag,
CoroContext *ctx = nullptr,
TraceView = util::nulltrace);
[[nodiscard]] RetCode prepare_faa(PatronusBatchContext &batch,
Lease &lease,
char *iobuf,
size_t offset,
int64_t value,
flag_t flag,
CoroContext *ctx = nullptr,
TraceView = util::nulltrace);
[[nodiscard]] inline RetCode rpc_faa(Lease &lease,
char *iobuf,
size_t offset,
int64_t value,
CoroContext *ctx = nullptr,
TraceView = util::nulltrace);
[[nodiscard]] RetCode remote_memcpy(
size_t times, size_t size, size_t nid, size_t dir_id, CoroContext *ctx);
[[nodiscard]] RetCode commit(PatronusBatchContext &batch,
CoroContext *ctx = nullptr,
TraceView = util::nulltrace);
auto patronus_now() const
{
return time_syncer_->patronus_now();
}
// handy cluster-wide put/get. Not very performance, but convenient
template <typename V,
typename T,
std::enable_if_t<std::is_trivially_copyable_v<V>, bool> = true,
std::enable_if_t<!std::is_array_v<V>, bool> = true>
void put(const std::string &key, const V &v, const T &sleep_time)
{
DVLOG(::config::verbose::kSystem)
<< "[keeper] PUT key: `" << key << "` value: `" << v;
auto value = std::string((const char *) &v, sizeof(V));
return dsm_->put(key, value, sleep_time);
}
template <typename T>
void put(const std::string &key,
const std::string &value,
const T &sleep_time)
{
DVLOG(::config::verbose::kSystem)
<< "[keeper] PUT key: `" << key << "` value: `" << value;
return dsm_->put(key, value, sleep_time);
}
template <typename T>
std::string try_get(const std::string &key, const T &sleep_time)
{
return dsm_->try_get(key, sleep_time);
}
template <typename T>
std::string get(const std::string &key, const T &sleep_time)
{
auto value = dsm_->get(key, sleep_time);
DVLOG(::config::verbose::kSystem)
<< "[keeper] GET key: `" << key << "` value: `" << value;
return value;
}
template <typename V, typename T>
V get_object(const std::string &key, const T &sleep_time)
{
auto v = dsm_->get(key, sleep_time);
V ret;
CHECK_GE(v.size(), sizeof(V));
memcpy((char *) &ret, v.data(), sizeof(V));
DVLOG(::config::verbose::kSystem)
<< "[keeper] GET_OBJ key: " << key << ", obj: " << ret;
return ret;
}
template <typename Duration>
void keeper_barrier(const std::string &key, Duration sleep_time)
{
return dsm_->keeper_barrier(key, sleep_time);
}
template <typename Duration>
void client_barrier(const std::string &key, Duration sleep_time)
{
CHECK(is_client_) << "** Only client can enter client_barrier";
size_t expect_nr = ::config::get_client_nids().size();
bool is_master = get_node_id() == ::config::get_client_nids().front();
return dsm_->keeper_partial_barrier(
key, expect_nr, is_master, sleep_time);
}
template <typename Duration>
void server_barrier(const std::string &key, Duration sleep_time)
{
CHECK(is_server_) << "** Only server can enter server_barrier";
size_t expect_nr = ::config::get_server_nids().size();
bool is_master = get_node_id() == ::config::get_server_nids().front();
return dsm_->keeper_partial_barrier(
key, expect_nr, is_master, sleep_time);
}
/**
* @brief After all the node call this function, @should_exit() will
* return true
*
* @param ctx
*/
void finished(uint64_t key);
bool should_exit(uint64_t key) const
{
return should_exit_[key].load(std::memory_order_relaxed);
}
/**
* @brief give the server thread to Patronus.
*
*/
void server_serve(uint64_t key);
size_t try_get_client_continue_coros(coro_t *coro_buf, size_t limit);
PatronusThreadResourceDesc prepare_client_thread(
bool is_registering_thread);
bool apply_client_resource(PatronusThreadResourceDesc &&,
bool bind_core,
TraceView v = util::nulltrace);
bool has_registered() const;
/**
* @brief Any thread should call this function before calling any
* function of Patronus
*
*/
void registerServerThread();
/**
* @brief see @registerServerThread
*
*/
void registerClientThread();
size_t get_node_id() const
{
return dsm_->get_node_id();
}
size_t get_thread_id() const
{
return dsm_->get_thread_id();
}
size_t get_thread_name_id() const
{
return dsm_->get_thread_name_id();
}
Buffer get_server_internal_buffer()
{
return dsm_->get_server_buffer();
}
void prepare_handle_request_messages(const msg_desc_t *msg_descs,
size_t msg_nr,
ServerCoroBatchExecutionContext &,
CoroContext *ctx);
void commit_handle_request_messages(ServerCoroBatchExecutionContext &,
CoroContext *ctx);
void post_handle_request_messages(const msg_desc_t *msg_descs,
size_t msg_nr,
ServerCoroBatchExecutionContext &ex_ctx,
CoroContext *ctx);
void post_handle_request_acquire(AcquireRequest *req,
HandleReqContext &req_ctx,
CoroContext *ctx);
void post_handle_request_lease_relinquish(LeaseModifyRequest *req,
HandleReqContext &req_ctx,
CoroContext *ctx);
void post_handle_request_memory_access(MemoryRequest *req,
CoroContext *ctx);
void post_handle_request_memcpy(MemcpyRequest *req, CoroContext *ctx);
void post_handle_request_lease_extend(LeaseModifyRequest *req,
CoroContext *ctx);
void post_handle_request_admin(AdminRequest *req, CoroContext *ctx);
[[nodiscard]] Buffer get_self_managed_rdma_buffer()
{
CHECK(!self_managing_client_rdma_buffer_);
self_managing_client_rdma_buffer_ = true;
return Buffer(client_rdma_buffer_, client_rdma_buffer_size_);
}
void put_self_managed_rdma_buffer(Buffer buffer)
{
CHECK_EQ(buffer.buffer, client_rdma_buffer_);
CHECK_EQ(buffer.size, client_rdma_buffer_size_);
CHECK(self_managing_client_rdma_buffer_);
self_managing_client_rdma_buffer_ = false;
}
[[nodiscard]] Buffer get_rdma_buffer(size_t size)
{
CHECK(!self_managing_client_rdma_buffer_);
auto *buf = (char *) rdma_client_buffer_->alloc(size);
if (likely(buf != nullptr))
{
return Buffer(buf, size);
}
return Buffer(nullptr, 0);
}
void put_rdma_buffer(Buffer &&buffer)
{
DCHECK_NE(buffer.size, 0);
if (buffer.buffer)
{
rdma_client_buffer_->free(buffer.buffer);
}
}
void hack_trigger_rdma_protection_error(size_t node_id,
size_t dir_id,
CoroContext *ctx);
DSM::pointer get_dsm()
{
return dsm_;
}
const time::TimeSyncer &time_syncer() const
{
return *time_syncer_;
}
using PatronusLockManager = LockManager<NR_DIRECTORY, 4096 * 8>;
constexpr std::pair<PatronusLockManager::bucket_t,
PatronusLockManager::slot_t>
locate_key(id_t key) const
{
auto hash = key_hash(key);
auto bucket_nr = lock_manager_.bucket_nr();
auto slot_nr = lock_manager_.slot_nr();
auto bucket_id = (hash / slot_nr) % bucket_nr;
auto slot_id = hash % slot_nr;
return {bucket_id, slot_id};
}
constexpr size_t admin_dir_id() const
{
return 0;
}
size_t lease_buffer_size() const
{
return conf_.lease_buffer_size;
}
size_t alloc_buffer_size() const
{
return conf_.alloc_buffer_size;
}
size_t user_reserved_buffer_size() const
{
return conf_.reserved_buffer_size;
}
[[nodiscard]] Buffer get_lease_buffer() const
{
auto server_buf = dsm_->get_server_buffer();
auto *buf_addr = server_buf.buffer;
auto buf_size = lease_buffer_size();
DCHECK_GE(server_buf.size, buf_size);
return Buffer(buf_addr, buf_size);
}
[[nodiscard]] Buffer get_alloc_buffer() const
{
auto server_buf = dsm_->get_server_buffer();
auto *buf_addr = server_buf.buffer + lease_buffer_size();
auto buf_size = alloc_buffer_size();
DCHECK_GE(server_buf.size, lease_buffer_size() + alloc_buffer_size());
return Buffer(buf_addr, buf_size);
}
[[nodiscard]] Buffer get_user_reserved_buffer() const
{
auto server_buffer = dsm_->get_server_buffer();
auto *buf_addr =
server_buffer.buffer + lease_buffer_size() + alloc_buffer_size();
auto buf_size = user_reserved_buffer_size();
DCHECK_GE(lease_buffer_size() + alloc_buffer_size() + buf_size,
server_buffer.size);
return Buffer(buf_addr, buf_size);
}
void thread_explain() const;
inline GlobalAddress get_gaddr(const Lease &lease) const;
// exposed buffer offset
GlobalAddress to_exposed_gaddr(void *addr)
{
return GlobalAddress(0 /* node_id */,
dsm_->addr_to_buffer_offset(addr));
}
void *from_exposed_gaddr(GlobalAddress gaddr)
{
return dsm_->buffer_offset_to_addr(gaddr.offset);
}
inline void *patronus_alloc(size_t size, uint64_t hint);
inline void patronus_free(void *addr, size_t size, uint64_t hint);
void reg_allocator(uint64_t hint, mem::IAllocator::pointer allocator);
mem::IAllocator::pointer get_allocator(uint64_t hint);
friend std::ostream &operator<<(std::ostream &,
const pre_patronus_explain &);
void prepare_fast_backup_recovery(size_t prepare_nr);
inline RetCode signal_modify_qp_flag(size_t node_id,
size_t dir_id,
bool to_ro,
CoroContext *);
inline RetCode signal_reinit_qp(size_t node_id,
size_t dir_id,
CoroContext *);
friend class ServerCoroBatchExecutionContext;
void set_configure_reuse_mw_opt(bool val);
bool get_configure_reuse_mw_opt();
void set_configure_mw_locality_opt(bool val);
bool get_configure_mw_locality_opt();
private:
PatronusConfig conf_;
// the default_allocator_ is set on registering server thread.
// With the config from PatronusConfig.
static thread_local mem::SlabAllocator::pointer default_allocator_;
static thread_local std::unordered_map<uint64_t, mem::IAllocator::pointer>
reg_allocators_;
void *allocator_buf_addr_{nullptr};
size_t allocator_buf_size_{0};
// How many leases on average may a tenant hold?
// It determines how much resources we should reserve
constexpr static size_t kGuessActiveLeasePerCoro = 16;
constexpr static size_t kClientThreadPerServerThread =
::config::patronus::kClientThreadPerServerThread;
constexpr static size_t kClientRdmaBufferSize =
::config::patronus::kClientRdmaBufferSize;
constexpr static size_t kLeaseContextNr =
kMaxCoroNr * kGuessActiveLeasePerCoro * kClientThreadPerServerThread;
static_assert(
kLeaseContextNr <
std::numeric_limits<decltype(AcquireResponse::lease_id)>::max());
constexpr static size_t kServerCoroNr = kMaxCoroNr;
constexpr static size_t kProtectionRegionPerThreadNr =
NR_DIRECTORY * kMaxCoroNr * kGuessActiveLeasePerCoro *
kClientThreadPerServerThread;
constexpr static size_t kTotalProtectionRegionNr =
kProtectionRegionPerThreadNr * kMaxAppThread;
// kMaxCoroNr
thread_local static std::vector<ServerCoroBatchExecutionContext>
coro_batch_ex_ctx_;
ServerCoroBatchExecutionContext &coro_ex_ctx(size_t coro_id)
{
DCHECK_LT(coro_id, kMaxCoroNr);
return coro_batch_ex_ctx_[coro_id];
}
void explain(const PatronusConfig &);
constexpr static id_t key_hash(id_t key)
{
// TODO(patronus): should use real hash to distribute the keys.
return key;
}
inline RetCode admin_request_impl(size_t node_id,
size_t dir_id,
uint64_t data,
flag_t flag,
bool need_response,
CoroContext *ctx);
ibv_mw *get_mw(size_t dirID)
{
DCHECK(!mw_pool_[dirID]->empty());
auto *ret = mw_pool_[dirID]->alloc();
return DCHECK_NOTNULL(ret);
}
void put_mw(size_t dirID, ibv_mw *mw)
{
if (mw != nullptr)
{
mw_pool_[dirID]->free(mw);
}
}
[[nodiscard]] inline Lease do_get_wlease(
uint16_t node_id,
uint16_t dir_id,
GlobalAddress bind_gaddr,
uint64_t alloc_hint,
size_t size,
std::chrono::nanoseconds ns,
flag_t flag /* AcquireRequestFlag */,
CoroContext *ctx = nullptr);
[[nodiscard]] inline Lease do_get_rlease(
uint16_t node_id,
uint16_t dir_id,
GlobalAddress bind_gaddr,
uint64_t alloc_hint,
size_t size,
std::chrono::nanoseconds ns,
flag_t flag /* AcquireRequestFlag */,
CoroContext *ctx);
public:
Buffer get_rdma_message_buffer(size_t size)
{
if (size <= kSmallMessageSize)
{
return get_small_rdma_message_buffer(size);
}
else
{
return get_large_rdma_message_buffer(size);
}
}
Buffer get_small_rdma_message_buffer(size_t size)
{
DCHECK_GE(kSmallMessageSize, size);
return Buffer((char *) rdma_small_message_buffer_pool_->get(),
kSmallMessageSize);
}
Buffer get_large_rdma_message_buffer(size_t size)
{
DCHECK_GE(kMessageSize, size);
return Buffer((char *) rdma_message_buffer_pool_->get(), kMessageSize);
}
void put_rdma_message_buffer(Buffer &&buffer)
{
if (buffer.size <= kSmallMessageSize)
{
put_small_rdma_message_buffer(std::move(buffer));
}
else
{
put_large_rdma_message_buffer(std::move(buffer));
}
}
void put_large_rdma_message_buffer(Buffer buffer)
{
DCHECK_GT(buffer.size, kSmallMessageSize);
DCHECK_LE(buffer.size, kMessageSize);
rdma_message_buffer_pool_->put(buffer.buffer);
}
void put_small_rdma_message_buffer(Buffer buffer)
{
DCHECK_GT(buffer.size, 0);
DCHECK_LE(buffer.size, kSmallMessageSize);
rdma_small_message_buffer_pool_->put(buffer.buffer);
}
void debug_valid_rdma_buffer(const void *buf)
{
if constexpr (debug())
{
CHECK_GE((uint64_t) buf, (uint64_t) client_rdma_buffer_);
CHECK_LT((uint64_t) buf,
(uint64_t) client_rdma_buffer_ + client_rdma_buffer_size_);
}
}
private:
RpcContext *get_rpc_context()
{
return rpc_context_.get();
}
void put_rpc_context(RpcContext *ctx)
{
if (ctx != nullptr)
{
rpc_context_.put(ctx);
}
}
uint16_t get_rpc_context_id(RpcContext *ctx)
{
auto ret = rpc_context_.obj_to_id(ctx);
DCHECK_LT(ret, std::numeric_limits<uint16_t>::max());
return ret;
}
RWContext *get_rw_context()
{
return DCHECK_NOTNULL(rw_context_.get());
}
void put_rw_context(RWContext *ctx)
{
if (ctx != nullptr)
{
rw_context_.put(ctx);
}
}
uint16_t get_rw_context_id(RWContext *ctx)
{
auto ret = rw_context_.obj_to_id(ctx);
DCHECK_LT(ret, std::numeric_limits<uint16_t>::max());
return ret;
}
LeaseContext *get_lease_context()
{
auto *ret = DCHECK_NOTNULL(lease_context_.get());
DCHECK(!ret->valid)
<< "** A newly allocated context should not be valid.";
// set protection_region_id to an invalid value.
ret->protection_region_id =
std::numeric_limits<decltype(ret->protection_region_id)>::max();
return ret;
}
LeaseContext *get_lease_context(uint16_t id)
{
auto *ret = DCHECK_NOTNULL(lease_context_.id_to_obj(id));
if (unlikely(!ret->valid))
{
DVLOG(V) << "[Patronus] get_lease_context(id) related to invalid "
"contexts";
return nullptr;
}
return ret;
}
size_t remain_lease_nr() const
{
return lease_context_.size();
}
void put_lease_context(LeaseContext *ctx)
{
if (ctx != nullptr)
{
ctx->protection_region_id =
std::numeric_limits<decltype(ctx->protection_region_id)>::max();
ctx->valid = false;
lease_context_.put(ctx);
}
}
ProtectionRegion *get_protection_region()
{
auto *ret =
(ProtectionRegion *) DCHECK_NOTNULL(protection_region_pool_->get());
return ret;
}
void put_protection_region(ProtectionRegion *p)
{
if (p != nullptr)
{
auto aba_unit_nr_to_ddl =
p->aba_unit_nr_to_ddl.load(std::memory_order_acq_rel);
// to avoid ABA problem, add the 32 bits by one each time.
aba_unit_nr_to_ddl.u32_1++;
p->aba_unit_nr_to_ddl.store(aba_unit_nr_to_ddl,
std::memory_order_acq_rel);
p->valid = false;
protection_region_pool_->put(p);
}
}
ProtectionRegion *get_protection_region(size_t id)
{
auto *ret = protection_region_pool_->id_to_buf(id);
return (ProtectionRegion *) ret;
}
bool fast_switch_backup_qp(TraceView = util::nulltrace);
// clang-format off
/**
* The layout of dsm_->get_server_reserve_buffer():
* [required_time_sync_size()] [required_protection_region_size()]
*
* ^-- get_time_sync_buffer()
* ^-- get_protection_region_buffer()
*
* Total of them: reserved_buffer_size();
*/
// clang-format on
size_t reserved_buffer_size() const
{
return required_dsm_reserve_size();
}
size_t required_dsm_reserve_size() const
{
return required_protection_region_size() + required_time_sync_size();
}
static size_t required_protection_region_size()
{
size_t ret = sizeof(ProtectionRegion) * kTotalProtectionRegionNr;
return ROUND_UP(ret, 4096);
}
Buffer get_protection_region_buffer()
{
auto reserve_buffer = dsm_->get_server_reserved_buffer();
auto *buf_addr = reserve_buffer.buffer + required_time_sync_size();
auto buf_size = required_protection_region_size();
return Buffer(buf_addr, buf_size);
}
static size_t required_time_sync_size()
{
size_t ret = sizeof(time::ClockInfo);
return ROUND_UP(ret, 4096);
}
Buffer get_time_sync_buffer()
{
auto reserve_buffer = dsm_->get_server_reserved_buffer();
DCHECK_GE(reserve_buffer.size, required_dsm_reserve_size());
auto *buf_addr = reserve_buffer.buffer;
auto buf_size = required_time_sync_size();
return Buffer(buf_addr, buf_size);
}
// for clients
size_t handle_response_messages(msg_desc_t *msg_descs,
size_t msg_nr,
coro_t *o_coro_buf);
size_t handle_rdma_finishes(
ibv_wc *buffer,
size_t rdma_nr,
coro_t *o_coro_buf,
std::map<std::pair<size_t, size_t>, TraceView> &recov);
void signal_server_to_recover_qp(size_t node_id,
size_t dir_id,
TraceView = util::nulltrace);
void prepare_handle_request_acquire(AcquireRequest *,
HandleReqContext &req_ctx,
CoroContext *ctx);
void prepare_handle_request_lease_modify(LeaseModifyRequest *,
HandleReqContext &req_ctx,
CoroContext *ctx);
// for servers
void handle_response_acquire(AcquireResponse *);
void handle_response_lease_relinquish(LeaseModifyResponse *);
void handle_response_lease_extend(LeaseModifyResponse *resp);
inline void handle_response_admin_qp_modification(AdminResponse *resp,
CoroContext *ctx);
void handle_response_memory_access(MemoryResponse *resp, CoroContext *ctx);
void handle_response_memcpy(MemcpyResponse *resp, CoroContext *ctx);
void handle_admin_exit(AdminRequest *req, CoroContext *ctx);
void handle_admin_recover(AdminRequest *req, CoroContext *ctx);
void handle_admin_barrier(AdminRequest *req, CoroContext *ctx);
void handle_admin_qp_access_flag(AdminRequest *req, CoroContext *ctx);
void prepare_handle_request_lease_relinquish(LeaseModifyRequest *,
HandleReqContext &req_ctx,
CoroContext *ctx);
void prepare_handle_request_lease_extend(LeaseModifyRequest *,
CoroContext *ctx);
void handle_request_lease_extend(LeaseModifyRequest *, CoroContext *ctx);
void handle_request_lease_upgrade(LeaseModifyRequest *, CoroContext *ctx);
/**
* @param lease_id the id of LeaseContext
* @param cid the client id. Will use to detect GC-ed lease (when cid
* mismatch)
* @param expect_unit_nr the expected unit numbers for this lease
* period. If this field changed, it means the client extended the lease
* by one-sided CAS. Server should sustain the GC till the next period.
* @param flag LeaseModifyFlag
* @param ctx
*/
void task_gc_lease(uint64_t lease_id,
ClientID cid,
compound_uint64_t expect_unit_nr,
flag_t flag /* LeaseModifyFlag */,
CoroContext *ctx = nullptr);
void prepare_gc_lease(uint64_t lease_id,
HandleReqContext &req_ctx,
ClientID cid,
flag_t flag,
CoroContext *ctx = nullptr);
// server coroutines
void server_coro_master(CoroYield &yield, uint64_t key);
void server_coro_worker(coro_t coro_id, CoroYield &yield, uint64_t key);
// helpers, actual impls
Lease get_lease_impl(uint16_t node_id,
uint16_t dir_id,
id_t key,
size_t size,
time::ns_t ns,
RpcType type,
flag_t flag,
CoroContext *ctx = nullptr);
inline bool already_passed_ddl(time::PatronusTime time) const;
// flag should be RWFlag
inline RetCode handle_rwcas_flag(Lease &lease,
flag_t flag,
CoroContext *ctx);
inline RetCode handle_batch_op_flag(flag_t flag) const;
inline RetCode validate_lease(const Lease &lease);
RetCode protection_region_rw_impl(Lease &lease,
char *io_buf,
size_t size,
size_t offset,
bool is_read,
CoroContext *ctx = nullptr,
util::TraceView = util::nulltrace);
RetCode protection_region_cas_impl(Lease &lease,
char *iobuf,
size_t offset,
uint64_t compare,
uint64_t swap,