-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredisxslot.h
252 lines (237 loc) · 9.68 KB
/
redisxslot.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
/*
* Copyright (c) 2023, weedge <weege007 at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/* 6.0 need open REDISMODULE_EXPERIMENTAL_API */
#define REDISMODULE_EXPERIMENTAL_API
#ifndef REDISXSLOT_H
#define REDISXSLOT_H
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/syscall.h>
#include <syslog.h>
#include <time.h>
#include <unistd.h>
#include "dep/dict.h"
#include "dep/list.h"
#include "dep/skiplist.h"
#include "dep/util.h"
#include "hiredis/hiredis.h"
#include "redismodule.h"
#include "threadpool/thpool.h"
// define error
#define REDISXSLOT_ERRORMSG_SYNTAX "ERR syntax error"
#define REDISXSLOT_ERRORMSG_MGRT "ERR migrate error"
#define REDISXSLOT_ERRORMSG_DEL "ERR del error"
#define REDISXSLOT_ERRORMSG_CLI_DISCONN "ERR client disconnected error"
// define const
#define DEFAULT_HASH_SLOTS_MASK 0x000003ff
#define DEFAULT_HASH_SLOTS_SIZE (DEFAULT_HASH_SLOTS_MASK + 1)
#define MAX_HASH_SLOTS_MASK 0x0000ffff
#define MAX_HASH_SLOTS_SIZE (MAX_HASH_SLOTS_MASK + 1)
#define MGRT_BATCH_KEY_TIMEOUT 30 // 30s
#define REDIS_LONGSTR_SIZE 42 // Bytes needed for long -> str
#define REDIS_MGRT_CMD_PARAMS_SIZE 1024 * 1024 // send redis cmd params size
#define SLOTS_MGRT_NOTHING 0
#define SLOTS_MGRT_ERR -1
#define MAX_NUM_THREADS 128
#define REDISXSLOT_APIVER_1 1
/* Hash table cron loop pre call db,slot num for resize rehash(hotkey) */
#define CRON_DBS_PER_CALL 16
#define CRON_DB_SLOTS_PER_CALL 1024
/* Hash table parameters for resize */
#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
#define HASHTABLE_MAX_LOAD_FACTOR 1.618 /* Maximum hash table load factor. */
/* sub generic cmd for evnet handle */
#define CMD_NONE 0
#define CMD_RENAME 1
#define CMD_MOVE 2
/* redis version */
#define REDIS_VERSION 60000 /*6.0.0*/
/* redisxslot version for linker */
#define REDISXSLOT_MAJOR 0
#define REDISXSLOT_MINOR 1
#define REDISXSLOT_PATCH 0
#define REDISXSLOT_SONAME 0.1.0
// define macro
#define UNUSED(V) ((void)V)
#define CREATE_CMD(name, tgt, attr, firstkey, lastkey, keystep) \
do { \
if (RedisModule_CreateCommand(ctx, name, tgt, attr, firstkey, lastkey, \
keystep) \
!= REDISMODULE_OK) { \
RedisModule_Log(ctx, "warning", "reg cmd error"); \
return REDISMODULE_ERR; \
} \
} while (0);
#define CREATE_ROMCMD(name, tgt, firstkey, lastkey, keystep) \
CREATE_CMD(name, tgt, "readonly", firstkey, lastkey, keystep);
#define CREATE_WRMCMD(name, tgt, firstkey, lastkey, keystep) \
CREATE_CMD(name, tgt, "write deny-oom", firstkey, lastkey, keystep);
/* Using the following macro you can run code inside serverCron() with the
* specified period, specified in milliseconds.
* The actual resolution depends on server.hz. */
#define run_with_period(_ms_, _hz_) \
if (((_ms_) <= 1000 / _hz_) \
|| !(g_slots_meta_info.cronloops % ((_ms_) / (1000 / _hz_))))
#define ASYNC_LOCK(ctx) \
do { \
if (g_slots_meta_info.async) { \
RedisModule_ThreadSafeContextLock(ctx); \
} \
} while (0);
#define ASYNC_UNLOCK(ctx) \
do { \
if (g_slots_meta_info.async) { \
RedisModule_ThreadSafeContextUnlock(ctx); \
} \
} while (0);
// define struct type
typedef struct _slots_meta_info {
uint32_t hash_slots_size;
// from config databases
int databases;
// from config activerehashing yes(1)/no(0)
int activerehashing;
// async block mgrt client
int async;
// setcpuaffinity for async block mgrt thread
const char* async_cpulist;
// cronloop event callback cn
int cronloops;
// thread pool size (dump mgrt restore)
int slots_dump_threads;
int slots_mgrt_threads;
int slots_restore_threads;
} slots_meta_info;
typedef struct _db_slot_info {
// current db
int db;
// rehash flag
int slotkey_table_rehashing;
// hash table entry: RedisModuleString* key,val(crc)
dict** slotkey_tables;
// slotkey_table db slot dict's rwlocks
pthread_rwlock_t* slotkey_table_rwlocks;
// member: RedisModuleString* key, score: uint32_t crc
m_zskiplist* tagged_key_list;
// tagged_key_list per db's rwlock
pthread_rwlock_t tagged_key_list_rwlock;
} db_slot_info;
typedef struct _slot_mgrt_connet_meta {
int db;
// todo: use `slotsmgrt.authset` cmd set host port pwd
int authorized;
sds host;
sds port;
struct timeval timeout;
} slot_mgrt_connet_meta;
typedef struct _db_slot_mgrt_connet {
// pointer only one meta info per conn
slot_mgrt_connet_meta* meta;
time_t last_time;
redisContext* conn_ctx;
} db_slot_mgrt_connect;
// declare struct and define diff type
struct _rdb_obj {
RedisModuleString* key;
RedisModuleString* val;
time_t ttlms;
};
typedef struct _rdb_obj rdb_dump_obj;
typedef struct _rdb_obj rdb_parse_obj;
typedef struct _slots_restore_one_task_params {
rdb_dump_obj* obj;
int result_code;
} slots_restore_one_task_params;
typedef struct _slots_split_restore_params {
slot_mgrt_connet_meta* meta;
char** argv;
size_t* argvlen;
int start_pos;
int end_pos;
int result_code;
} slots_split_restore_params;
typedef struct _dump_obj_params {
RedisModuleString* key;
// return a new obj
rdb_dump_obj** obj;
int result_code;
} dump_obj_params;
typedef struct _bg_call_params {
RedisModuleBlockedClient* bc;
RedisModuleString** argv;
int argc;
} bg_call_params;
// declare defined extern var to out use
extern slots_meta_info g_slots_meta_info;
extern db_slot_info* db_slot_infos;
// declare api function
void crc32_init();
uint32_t crc32_checksum(const char* buf, int len);
int slots_num(const char* s, uint32_t* pcrc, int* phastag);
RedisModuleString* takeAndRef(RedisModuleCtx* ctx, RedisModuleString* str);
void Slots_Init(RedisModuleCtx* ctx, uint32_t hash_slots_size, int databases,
int num_threads, int activerehashing, int async,
const char* async_cpulist);
void Slots_Free(RedisModuleCtx* ctx);
int SlotsMGRT_OneKey(RedisModuleCtx* ctx, const char* host, const char* port,
time_t timeout, RedisModuleString* key,
const char* mgrtType);
int SlotsMGRT_SlotOneKey(RedisModuleCtx* ctx, const char* host,
const char* port, time_t timeout, int slot,
const char* mgrtType, int* left);
int SlotsMGRT_TagKeys(RedisModuleCtx* ctx, const char* host, const char* port,
time_t timeout, RedisModuleString* key,
const char* mgrtType, int* left);
int SlotsMGRT_TagSlotKeys(RedisModuleCtx* ctx, const char* host,
const char* port, time_t timeout, int slot,
const char* mgrtType, int* left);
int SlotsMGRT_Restore(RedisModuleCtx* ctx, rdb_dump_obj* objs[], int n);
unsigned long SlotsMGRT_Scan(RedisModuleCtx* ctx, int slot, unsigned long count,
unsigned long cursor, list* l);
int SlotsMGRT_DelSlotKeys(RedisModuleCtx* ctx, int db, int slots[], int n);
void SlotsMGRT_CloseTimedoutConns(RedisModuleCtx* ctx);
void Slots_Add(RedisModuleCtx* ctx, int db, RedisModuleString* key);
void Slots_Del(RedisModuleCtx* ctx, int db, RedisModuleString* key);
void FreeDumpObjs(RedisModuleCtx* ctx, rdb_dump_obj** objs, int n);
/* Check if we can use setcpuaffinity(). */
#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ \
|| defined __OpenBSD__)
#define USE_SETCPUAFFINITY
void setcpuaffinity(const char* cpulist);
#endif
void SlotsMGRT_SetCpuAffinity(const char* cpulist);
#endif /* REDISXSLOT_H */